In [1]:
import math, random
from collections import defaultdict
import os

In [2]:
################################################################################
# Part 0: Utility Functions
################################################################################

COUNTRY_CODES = ['af', 'cn', 'de', 'fi', 'fr', 'in', 'ir', 'pk', 'za']

def start_pad(n):
    ''' Returns a padding string of length n to append to the front of text
        as a pre-processing step to building n-grams '''
    return '~' * n

def ngrams(n, text):
    ''' Returns the ngrams of the text as tuples where the first element is
        the length-n context and the second is the character '''
    text = start_pad(n) + text
    grams = []
    for j in range(len(text)-n):
        context = text[j:j+n]
        char = text[j+n]
        grams.append((context, char))
    return grams

def create_ngram_model(model_class, path, n = 2, k = 0):
    ''' Creates and returns a new n-gram model trained on the city names
        found in the path file '''
    model = model_class(n, k)
    with open(path, encoding = 'utf-8', errors = 'ignore') as f:
        model.update(f.read())
    return model

def create_ngram_model_lines(model_class, path, n = 2, k = 0):
    ''' Creates and returns a new n-gram model trained on the city names
        found in the path file '''
    model = model_class(n, k)
    with open(path, encoding = 'utf-8', errors = 'ignore') as f:
        for line in f:
            model.update(line.strip())
    return model

In [3]:
################################################################################
# Part 1: Basic N-Gram Model
################################################################################

class NgramModel(object):
    ''' A basic n-gram model using add-k smoothing '''

    def __init__(self, n, k):
        self.n = n # order of n-gram model
        self.vocab = set() # initialize vocabulary
        self.context_counts = defaultdict(lambda:0) # frequency of contexts
        self.sequence_counts = defaultdict(lambda:0) # frequency of (context, char) sequences
        self.k = k # smoothing parameter

    def get_vocab(self):
        ''' Returns the set of characters in the vocab '''
        return self.vocab

    def update(self, text):
        ''' Updates the model n-grams based on text '''
        all_ngrams = ngrams(self.n, text)
        for (context, char) in all_ngrams:
            self.vocab.add(char)
            self.context_counts[context] += 1 # increment the context count
            self.sequence_counts[(context, char)] += 1 # increment the (context, character) sequence count

    def prob(self, context, char):
        ''' Returns the probability of char appearing after context '''
        # print(self.n)
        if context in self.context_counts.keys():
            denominator = self.context_counts[context] # frequency of context followed by any token
            numerator = self.sequence_counts[(context, char)] # frequency of exact (context, character) sequence
            prob = (numerator + self.k)/(denominator + (self.k * len(self.vocab)))
            return prob
        else:
            return 1/len(self.vocab)
                
    def random_char(self, context):
        ''' Returns a random character based on the given context and the 
            n-grams learned by this model '''
        r = random.random()
        pre_sum = 0
        for i, char in enumerate(sorted(self.vocab)):
            # pre_sum is sum of probabilities up to, but excluding, the current token
            post_sum = pre_sum + self.prob(context, char)
            # post_sum also includes the probability of the current token
            if pre_sum <= r < post_sum:
                return char
            pre_sum = post_sum
                
    def random_text(self, length):
        output_text = ''
        all_context = start_pad(self.n) # keep a running context list initialized with '~'s
        for i in range(length):
            curr_context = all_context[len(all_context)-self.n:] # extract context from running context list
            next_char = self.random_char(curr_context)
            output_text += next_char
            all_context += next_char
        return output_text

    def perplexity(self, text):
        ''' Returns the perplexity of text based on the n-grams learned by
            this model '''
        m = len(text)
        all_ngrams = ngrams(self.n, text)
        log_prob_sum = 0
        for (context, char) in all_ngrams:
            prob = self.prob(context, char)
            if prob == 0:
                return float('inf')
            log_prob_sum += math.log(prob)
        perplexity = math.exp(-1/(m) * log_prob_sum)
        return perplexity

In [4]:
def uniform_lambdas(n):
    lambdas = {}
    for order in range(n + 1):
        lambdas[order] = 1/(n + 1)
    return lambdas

import numpy as np

def increasing_lambdas(n):
    lambdas = {}
    lambdas_array = np.linspace(0, 1, n + 1)
    lambdas_array = lambdas_array/sum(lambdas_array) # ensure sum is 1
    for order, weight in enumerate(lambdas_array):
        lambdas[order] = weight
    return lambdas

def decreasing_lambdas(n):
    lambdas = {}
    lambdas_array = np.linspace(0, 1, n + 1)
    lambdas_array = np.flip(lambdas_array/sum(lambdas_array)) # ensure sum is 1
    for order, weight in enumerate(lambdas_array):
        lambdas[order] = weight
    return lambdas

In [16]:
################################################################################
# Part 2: N-Gram Model with Interpolation
################################################################################

class NgramModelWithInterpolation(NgramModel):
    ''' An n-gram model with interpolation '''

    def __init__(self, n, k):
        self.n = n # highest order n-gram model
        self.models = {} # initialize empty dictionary for NgramModels
        # self.weights = uniform_lambdas(n) # lambdas corresponding to each NgramModel
        # self.weights = increasing_lambdas(n) # lambdas corresponding to each NgramModel
        self.weights = decreasing_lambdas(n) # lambdas corresponding to each NgramModel
        print(self.weights)
        for order in range(n + 1): # extra model accounts for zeroth order
            self.models[order] = NgramModel(order, k)
        self.k = k # smoothing parameter
            
    def get_vocab(self):
        vocab = set()
        for order in range(self.n + 1):
            model = self.models[order]
            vocab = vocab.union(model.get_vocab()) # merge vocabularies 
        return vocab

    def update(self, text):
        for order in range(self.n + 1):
            model = self.models[order]
            model.update(text)

    def prob(self, context, char):
        prob = 0
        for order in range(self.n + 1):
            model = self.models[order]
            weight = self.weights[order]
            if model.n == 0:
                sliced_context = ''
            else:
                sliced_context = context[-model.n:]
            prob += weight * model.prob(sliced_context, char)
        return prob

In [17]:
n = 3
k = 1
m = create_ngram_model(NgramModelWithInterpolation, 'Data/shakespeare_input.txt', n, k)
path = '/Users/sppatankar/Desktop/CIS-530/Homework 3/Data/nytimes_article.txt'
with open(path, encoding = 'utf-8', errors = 'ignore') as f:
        text = f.read()
m.perplexity(text)

{0: 0.5, 1: 0.3333333333333333, 2: 0.16666666666666666, 3: 0.0}


15.023420948467447

In [None]:
################################################################################
# Part 3: Your N-Gram Model Experimentation
################################################################################

def accuracy(all_pred_labels, all_true_labels):
    num_correct = 0
    for i, pred in enumerate(all_pred_labels):
        if pred == all_true_labels[i]:
            num_correct += 1
    accuracy = num_correct/len(all_true_labels)
    return accuracy

if __name__ == '__main__':
    
    # training data
    all_training_data = '/Users/sppatankar/Desktop/CIS-530/Homework 3/Data/train'
    models = {}
    n = 4
    k = 1 
    for filename in os.listdir(all_training_data):
        filepath = os.path.join(all_training_data, filename) # of the form 'country_code.txt'
        country = filename.split('.')[0] # split on the period to isolate country code 
        models[country] = create_ngram_model_lines(NgramModelWithInterpolation, filepath, n, k)
    
    # validation data
    all_validation_data = '/Users/sppatankar/Desktop/CIS-530/Homework 3/Data/val'
    all_true_labels = []
    all_pred_labels = []
    for filename in os.listdir(all_validation_data):
        filepath = os.path.join(all_validation_data, filename) 
        true_country = filename.split('.')[0]
        with open(filepath, encoding = 'utf-8', errors = 'ignore') as f:
            for line in f:
                perplexities = {}
                for country in models.keys(): # loop over the models for all countries
                    perplexities[country] = models[country].perplexity(line)
                # find the model that has the lowest perplexity and assign prediction
                # print(line, perplexities)
                pred_country = min(perplexities, key = perplexities.get)
                all_true_labels.append(true_country)
                all_pred_labels.append(pred_country)
                
    print('Development Accuracy = %f\n' % accuracy(all_pred_labels, all_true_labels))
                
    test_file = '/Users/sppatankar/Desktop/CIS-530/Homework 3/cities_test.txt'
    all_pred_labels_test = [] 
    with open(test_file, encoding = 'utf-8', errors = 'ignore') as f:
        for line in f:
            perplexities = {}
            for country in models.keys(): 
                perplexities[country] = models[country].perplexity(line)
            pred_country = min(perplexities, key = perplexities.get)
            all_pred_labels_test.append(pred_country)
    with open('test_labels.txt', 'w') as f:
        for label in all_pred_labels_test:
            f.write('%s\n' % label)