In [1]:
import numpy as np
import random
import matplotlib.pyplot as plt



In [2]:
def get_labels(corpus): # to get the list of distinct labels
    labels=set()
    
    
    for sentence in corpus:
        for word in sentence:
            labels.add(word[1])
    
   
    
    return labels 


def get_tokens(corpus): # to get the list of distinct tokens (i.e. words)

    tokens=set()
    for sentence in corpus:
        for word in sentence:
            tokens.add(word[0])
            
    return tokens

In [60]:
def import_corpus(path_to_file):
    sentences = []
    sentence = []
    
    
    with open(path_to_file) as f:
        for line in f:
            line = line.strip()
            
            if len(line) == 0:
                sentences.append(sentence)    
                sentence = []
                continue
                    
            pair = line.split(' ')
            sentence.append((pair[0], pair[-1]))
            
        if len(sentence) > 0:
            sentences.append(sentence)
                
    return sentences




class MaxEntModel(object):
    # training corpus
    def __init__(self, path) :
    
        self.corpus = import_corpus(path)
        
        # (numpy) array containing the parameters of the model
        # has to be initialized by the method 'initialize'
        self.theta = None
        
        # dictionary containing all possible features of a corpus and their corresponding index;
        # has to be set by the method 'initialize'; hint: use a Python dictionary
        self.feature_indices = None
        
        # set containing a list of possible lables
        # has to be set by the method 'initialize'
        self.labels = None
        
        self.tokens=None 

        self.train_size=0

        self.words_used=[]

    def initialize(self, train_test_split=0.9):
        '''
        Initialize the maximun entropy model, i.e., build the set of all features, the set of all labels
        and create an initial array 'theta' for the parameters of the model.
        Parameters: corpus: list of list representing the corpus, returned by the function 'import_corpus'
        '''
        features={}
        # create train/test split
        num_train_sentences = round(len(self.corpus) * train_test_split)
        self.train_sentences = self.corpus[:num_train_sentences]
        self.test_sentences = self.corpus[num_train_sentences:]
        self.tokens=get_tokens(self.corpus)
        self.labels=get_labels(self.corpus)

        i=0
        
        for token in self.tokens:
            for label in self.labels:
                features[(f'{token}'),(f'{label}')]=i
                i+=1

        for label1 in self.labels:
            for label2 in self.labels:
                features[(f'{label1}'),(f'{label2}')]=i
                i+=1

        for label in self.labels:
            features[('START'),(f'{label}')]=i
            i+=1

        self.feature_indices=features
        self.theta=np.zeros(len(self.feature_indices)) +1

        for sentence in self.train_sentences:
            self.train_size+=len(sentence)




    def get_active_features(self, word, label, prev_label):
        '''
        Compute the vector of active features.
        Parameters: word: string; a word at some position i of a given sentence
                    label: string; a label assigned to the given word
                    prev_label: string; the label of the word at position i-1
        Returns: (numpy) array containing only zeros and ones.
        '''
        
        # your code here
        active_features= np.zeros(len(self.feature_indices))


        for sentence in self.train_sentences:
            for i, (word_i,label_i) in enumerate(sentence):
                prev_label_i= 'START' if i==0 else sentence[i-1][1]
                
                if (word_i==word and label_i==label):
                    index=self.feature_indices[(word_i),(label_i)]
                    active_features[index]=1

                if (label_i==label and prev_label_i== prev_label):
                    index=self.feature_indices[(prev_label_i),(label_i)]
                    active_features[index]=1

        return active_features


    def cond_normalization_factor(self, word, prev_label):
        '''
        Compute the normalization factor 1/Z(x_i).
        Parameters: word: string; a word x_i at some position i of a given sentence
                    prev_label: string; the label of the word at position i-1
        Returns: float
        '''
        z=0.0

        for label in self.labels:
            z += np.dot(self.theta,self.get_active_features(word, label, prev_label))

        return 1/z
     
        
    
    
    
    
    # Exercise 2 b) ###################################################################
    def conditional_probability(self, word,label, prev_label):
        '''
        Compute the conditional probability of a label given a word x_i.
        Parameters: label: string; we are interested in the conditional probability of this label
                    word: string; a word x_i some position i of a given sentence
                    prev_label: string; the label of the word at position i-1
        Returns: float
        '''
        return np.exp(np.dot(self.theta, self.get_active_features(word,label, prev_label)))*self.cond_normalization_factor(word, prev_label)
         
    
    
    def empirical_feature_count(self, word, label, prev_label):
        '''
        Compute the empirical feature count given a word, the actual label of this word and the label of the previous word.
        Parameters: word: string; a word x_i some position i of a given sentence
                    label: string; the actual label of the given word
                    prev_label: string; the label of the word at position i-1
        Returns: (numpy) array containing the empirical feature count
        '''
        
        emp_feat_count=np.zeros(len(self.feature_indices) ) 
        count1=0 # this is for w,l
        count2=0 # this is for pl,l

        for sentence in self.train_sentences:
            
            for i, (word_i, label_i) in enumerate (sentence):
                prev_label_i= 'START' if i==0 else sentence[i-1][1]

                if word_i==word and label_i==label:
                    count1+=1
                
                if label_i==label and prev_label_i==prev_label:
                    count2+=1

            index1=self.feature_indices[(word),(label)]
            emp_feat_count[index1]=count1

            index2=self.feature_indices[(prev_label),(label)]
            emp_feat_count[index2]=count2


        return emp_feat_count/self.train_size

            
    
    


    # Exercise 3 b) ###################################################################
    def expected_feature_count(self, word, prev_label):
        '''
        Compute the expected feature count given a word, the label of the previous word and the parameters of the current model
        (see variable theta)
        Parameters: word: string; a word x_i some position i of a given sentence
                    prev_label: string; the label of the word at position i-1
        Returns: (numpy) array containing the expected feature count
        '''
    
        exp_feat_count= np.zeros(len(self.feature_indices)) # final vector

        for label in self.labels:
            cond_p=self.conditional_probability(word, label, prev_label)
            af= self.get_active_features(word, label, prev_label) # current active feats
      
            exp_feat_count+= (cond_p*af) # update for all labels 

        return exp_feat_count/self.train_size
    

    def parameter_update(self, word, label, prev_label, learning_rate=0.1):
        '''
        Do one learning step.
        Parameters: word: string; a randomly selected word x_i at some position i of a given sentence
                    label: string; the actual label of the selected word
                    prev_label: string; the label of the word at position i-1
                    learning_rate: float
        '''
        
        new_theta=self.theta+learning_rate*(self.empirical_feature_count(word,label,prev_label)
                                            - self.expected_feature_count(word, prev_label))
    
        self.theta=new_theta

    def train(self, number_iterations, learning_rate=0.1):
        '''
        Implement the training procedure.
        Parameters: number_iterations: int; number of parameter updates to do
                    learning_rate: float
        '''
        
        
        for _ in range (number_iterations):
            sentence=random.choice(self.train_sentences)
            
            i,(word,label)=random.choice(list(enumerate(sentence)))
            prev_label='START' if i==0 else sentence[i-1][1] 
            self.words_used.append(i+1)
            self.parameter_update(word, label, prev_label)
            
        
    
    
    
    
    # Exercise 4 c) ###################################################################
    def predict(self, word, prev_label):
        '''
        Predict the most probable label of the word referenced by 'word'
        Parameters: word: string; a word x_i at some position i of a given sentence
                    prev_label: string; the label of the word at position i-1
        Returns: string; most probable label
        '''
        preds=[]
        
        for label in self.labels:
            prob=self.conditional_probability(word, label, prev_label)
            preds.append((label, prob))

        preds.sort(reverse=True)


        return preds[0] 

    
    def empirical_feature_count_batch(self, sentences):
        '''
        Predict the empirical feature count for a set of sentences
        Parameters: sentences: list; a list of sentences; should be a sublist of the list returnd by 'import_corpus'
        Returns: (numpy) array containing the empirical feature count
        '''
        
        emp_feat_count=np.zeros(len(self.feature_indices))

        for sentence in sentences:
            if self.iter==0:
                self.words_used.append(len(sentence))
            else:
                self.words_used.append(self.words_used[-1] + len(sentence))  
            
            for i, (word,label) in enumerate(sentence):
                prev_label='START' if i==0 else sentence[i-1][1]
                emp_feat_count+=self.empirical_feature_count(word, label, prev_label)
        
        return emp_feat_count/self.train_size
    
    
    
    
    # Exercise 5 a) ###################################################################
    def expected_feature_count_batch(self, sentences):
        '''
        Predict the expected feature count for a set of sentences
        Parameters: sentences: list; a list of sentences; should be a sublist of the list returnd by 'import_corpus'
        Returns: (numpy) array containing the expected feature count
        '''
        
        exp_feat_count=np.zeros(len(self.feature_indices))

        for sentence in sentences:
            for i, (word,label) in enumerate(sentence):
                prev_label='START' if i==0 else sentence[i-1][1]
                exp_feat_count+=self.expected_feature_count(word, prev_label)

        return exp_feat_count/self.train_size
    
    
    def train_batch(self, number_iterations, batch_size, learning_rate=0.1):
        '''
        Implement the training procedure which uses 'batch_size' sentences from to training corpus
        to compute the gradient.
        Parameters: number_iterations: int; number of parameter updates to do
                    batch_size: int; number of sentences to use in each iteration
                    learning_rate: float
        '''
        
        sentences=random.choices(self.train_sentences, k=batch_size)

        for i in range(number_iterations):
            self.iter=i
            new_theta=self.theta+ learning_rate*(self.empirical_feature_count_batch(sentences)
                                                 -self.expected_feature_count_batch(sentences))
            

        self.theta=new_theta


    def accuracy(self, test):

        correct=0
        total=0

        for sentence in test:

            for i, (word,true) in enumerate(sentence):
                prev_label = "START" if i == 0 else sentence[i - 1][1]
                pred=self.predict(word, prev_label)
                total+=1
                if pred==true:
                    correct+=1

        return correct/total



    
    def evaluate(self, test, num_iter, model_type):

        accuracies=[]
        
        
        if model_type == 'Normal':
                # Train using train method
                self.train(1)
                accuracies.append(self.accuracy(test))
        
        elif model_type == 'Batch':
                # Train using train_batch method with one sentence at a time
            self.train_batch(1, 1)
            accuracies.append(self.accuracy(test))


        plt.plot(self.words_used, accuracies)
        plt.xlabel('Accuracy')
        plt.ylabel('N of words used')
        plt.show()

        





        


    
    

In [61]:
model_train=MaxEntModel('corpus_pos.txt')
model_train.initialize()

model_batch=MaxEntModel('corpus_pos.txt')
model_batch.initialize()

In [62]:
model_train.evaluate(model_train.test_sentences, 100, 'Normal')

KeyboardInterrupt: 

In [None]:
model_batch.evaluate(model_train.test_sentences, 100, 'Batch')