In [22]:
import re 
import math
import os
import sys

In [17]:
class Word_Bigram_Model():
    def __init__(self):
        self.tokens = [] #the preprocssed/tokenized/UNK treated training set  
        self.bigram_freq = {}
        self.unigram_freq = {}
        self.bigram_prob = {}
    
    def load_input(self, file):
        f = open(file, "r")
        text = f.read()
        f.close()
        return text 
    
    # helper function: called in preprocess()
    # takes in a string & returns if tokenized by character
    # adds beginning/end sentence markers 
    def tokenize(self, string):
        tokens_list = [] 
        lines = str.splitlines(string)
        for line in lines:
            tokens_list.append("<s>")
            tokens_list = tokens_list + line.split() #split line by whitespaces 
            tokens_list.append("</s>")
        return tokens_list 
    
    # helper function: called in UNK_treated_input()
    # replaces a given item in tokens_list with UNK based on frequency 
    def insert_UNK(self, item_to_replace, tokens_list):
        for index, token in enumerate(tokens_list):
            if token == item_to_replace: tokens_list[index] = "<UNK>"
        return tokens_list
       

    # helper function: called in preprocess()
    # returns UNK modified training set --> all rare tokens are replaced with <UNK>
    def UNK_treated_input(self, tokens_list, UNK_threshold = 5):
        freq_dict = self.calc_ngram_freq(tokens_list, keys = tokens_list)
        for key, value in freq_dict.items(): #replace rare terms in training set based on frequency
            if value <= UNK_threshold: tokens_list = self.insert_UNK(key, tokens_list)
        return tokens_list
       
    # will perform various regex sub steps (ex. remove all numeric characters)
    # will tokenize text & add beginning & end sentence markers 
    # will replace low frequency words with UNK token 
    def preprocess_training(self, string, UNK_threshold = 5):
        string = self.clean_string(string)
        tokens_list = self.tokenize(string)
        tokens_list = self.UNK_treated_input(tokens_list, UNK_threshold)
        return tokens_list
    
    # called in preprocess_training()
    # performs a series of regex substitutions to clean up raw text string 
    def clean_string(self, text_string):
        text_string = text_string.lower()
        text_string = re.sub(r"\d+", "", text_string) #remove numerical characters
        text_string = re.sub(r"[$&%#-\*]", "", text_string)
        text_string = re.sub(' +', ' ', text_string) #remove multiple spaces
        return text_string
       
    def generate_ngrams(self, n, tokens):
        ngrams = zip(*[tokens[i:] for i in range(n)])
        ngrams = list(ngrams)
        for ngram in ngrams: ngram = ''.join(ngram)
        #print(ngrams)
        return ngrams 
    
    # given UNK_treated vocabulary, will generate all possible bigrams & return as list 
    def generate_bigram_matrix(self):
        bigram_matrix = []
        vocabulary = self.get_vocabulary()
        for token_1 in vocabulary:
            for token_2 in vocabulary:
                bigram = (token_1, token_2)
                bigram_matrix.append(bigram)
        return bigram_matrix
    
    def get_vocabulary(self):
        return list(self.unigram_freq.keys())
    
    # helper function: called in train() & UNK_treated_input()
    # creates a freq dict from a list of ngrams (keys) and a list of ngrams occurring in the text (ngram_list)
    # called in 
    def calc_ngram_freq(self, ngram_list, keys = []):
        freq_dict = {}
        for key in keys:
            freq_dict[key] = 0 # initialize dict & set all values to 0
        for item in ngram_list:
            if item in freq_dict: freq_dict[item] = freq_dict[item] + 1
            else: freq_dict[item] = 1 # create key if not in list [error catching]
        return freq_dict
    
    # helper function: called in calc_bigram_prob()
    def calc_mle(self, num, denom):
        if denom == 0:
            print("DNE, returning 0")
            return 0
        return num/denom
        
    # used to calculate perplexity
    # returns the number of tokens in a given list, exluding beginning of sentence markers 
    def token_count(self, tokens_list):
        count = 0 
        for token in tokens_list:
            if token == "<s>": continue #do not count 
            else: count = count + 1 #increment count for all other tokens 
        return count 
        
    def calc_perplexity(self, sum_log_prob, N):
        #sum_log_prob = sum of all ngrams prob (in log) 
        #N = self.token_count # N = total number of tokens in input 
        return math.exp(-1 * sum_log_prob / N)       
    
    # Good-Turing Smoothing implemented here 
    # N = total # number of bigrams in input 
    def calc_bigram_prob(self, N):
        # 1. make a frequency of frequency of bigrams table
        freq_of_freq = {} 
        for bigram, count in self.bigram_freq.items():
            if count in freq_of_freq: freq_of_freq[count] = freq_of_freq[count] + 1
            else: freq_of_freq[count] = 1 
        max_freq_count = max(list(freq_of_freq.keys()))
        # 2. calculate GT probability of each bigram 
        bigram_prob_dict = {} 
        prob = 0 
        prob_sum = 0 # to be used for renormalization 
        for bigram in self.bigram_freq.keys():
            bigram_count = self.bigram_freq[bigram]
            if bigram_count <= 1: #if not observed 
                prob = freq_of_freq[1] / N
            elif 1 < bigram_count <= 10: #if observed between 1 and r times 
                prob = (bigram_count + 1)*(freq_of_freq[bigram_count +1] / freq_of_freq[bigram_count])
            elif 10 < bigram_count <= max_freq_count: #if bigram count is the max freq count --> calc mle 
                unigram = bigram[0]
                prob = self.calc_mle(bigram_count, self.unigram_freq[unigram]) 
                
            prob_sum = prob_sum + prob + bigram_count 
            bigram_prob_dict[bigram] = prob 
        
        # 3. renormalize probabilities to add to 1 
        for bigram, prob in bigram_prob_dict.items():
            bigram_prob_dict[bigram] = prob / prob_sum 
    
        return bigram_prob_dict 
         
    def train(self, input_file, UNK_threshold = 5):
        print("----Training Model----")
        input_text = self.load_input(input_file)
        self.tokens = self.preprocess_training(input_text, UNK_threshold)
        self.unigram_freq = self.calc_ngram_freq(self.tokens, keys = self.tokens)
        bigram_matrix = self.generate_bigram_matrix() # generates all possible bigrams given the vocabulary 
        bigrams = self.generate_ngrams(2, self.tokens) # generates all bigrams that occur in training text
        self.bigram_freq = self.calc_ngram_freq(bigrams, bigram_matrix)
        self.bigram_prob = self.calc_bigram_prob(N = len(bigrams))
        return 
        
    def test(self, test_file):
        print("----Testing Model----\n")
        test = self.load_input(test_file)
        test = self.clean_string(test)
        test_lines = str.splitlines(test)
        perplexity_list = []
        for line in test_lines:
            test_tokens = self.tokenize(line)
            for index, token in enumerate(test_tokens):
                if token in self.unigram_freq: continue
                else: test_tokens[index] = "<UNK>"
            sum_log_prob = 0
            test_bigrams = self.generate_ngrams(2, tokens = test_tokens)
            
            for test_bigram in test_bigrams: 
                unigram_prob = self.unigram_freq[test_bigram[0]] / len(self.tokens)
                bigram_prob = self.bigram_prob[test_bigram] / unigram_prob #conditional probability of each bigram
                sum_log_prob = sum_log_prob + math.log(bigram_prob)
                
            perplexity = self.calc_perplexity(sum_log_prob, self.token_count(test_tokens))
            perplexity_list.append(perplexity)
        return perplexity_list
        

In [23]:
def get_rel_path(file_name):
    absolutepath = os.path.abspath('')
    #print(absolutepath)
    fileDirectory = os.path.dirname(absolutepath)
    file_path = os.path.join(fileDirectory, 'Data/' + str(file_name))   
    #print(file_path)
    return file_path

In [24]:
english_train = get_rel_path("Input/LangId.train.English")
french_train = get_rel_path("Input/LangId.train.French")
italian_train = get_rel_path("Input/LangId.train.Italian")
training_files = [english_train, french_train, italian_train]

test_file = get_rel_path("Validation/LangId.test")
validation_file = get_rel_path("Validation/labels.sol")
f = open(validation_file, "r")
validation = f.read()
f.close()
validation = str.splitlines(validation)
expected_results = []
for line in validation:
    line = line.split()
    expected_results.append(line[-1])

def train_and_evaluate(training_files, test_file, expected_results, UNK_threshold = 5):
    # 1. train 3 models on each language 
    models_perplexities = [] 
    for i in range(len(training_files)):
        model = Word_Bigram_Model()
        model.train(training_files[i], UNK_threshold = UNK_threshold) 
        perplexities = model.test(test_file) # 2. test model & obtain perplexities for each sentence 
        models_perplexities.append(perplexities)
        
    
    # 3. Compare perplexities of each sentence/model & assign a language 
    test_results = [] 
    for i in range(len(models_perplexities[0])):
        e = models_perplexities[0][i] #pull perplexities of each models for specific sentence
        f = models_perplexities[1][i]
        i = models_perplexities[2][i]
        sort = sorted([e, f, i])[:3] #sorts from least to greatest ; lower perplexity --> higher probability 
        if sort[0] == e: test_results.append("English")
        if sort[0] == f: test_results.append("French")
        if sort[0] == i: test_results.append("Italian")
    
    # 4. Compare observed results with expected results & return % accurate 
    correct_count = 0 
    for i in range(len(expected_results)):
        if expected_results[i] == test_results[i]: 
            correct_count = correct_count + 1
    accuracy = (correct_count / len(expected_results)) * 100
    print("Accuracy of Word Bigram Model: ", accuracy, "%")
    return test_results 

In [18]:
#test_results = train_and_evaluate(training_files, test_file, expected_results)

----Training Model----
----Testing Model----

----Training Model----
----Testing Model----

----Training Model----
----Testing Model----

Accuracy of Word Bigram Model:  98.33333333333333 %


In [25]:
test_results = train_and_evaluate(training_files, test_file, expected_results, UNK_threshold = 2)

----Training Model----
----Testing Model----

----Training Model----
----Testing Model----

----Training Model----
----Testing Model----

Accuracy of Word Bigram Model:  99.0 %


In [26]:
# write results to output file 
output_file = get_rel_path("Output/wordLangId2.out")
f = open(output_file, "w")
for index, result in enumerate(test_results):
    f.write(str(index + 1) + " " + str(result) + "\n")
f.close()