In [None]:
import json
import numpy as np
import pandas as pd
import itertools
from sklearn.metrics import accuracy_score
import time
from collections import defaultdict

In [None]:
def get_vocabulary(D):
    """
    Given a list of documents, where each document is represented as
    a list of tokens, the vocabulary is returned. The vocabulary
    is a set of tokens which appear more than once in the entire
    document collection plus the "<unk>" token.
    """
#   firstly, i have to convert the list of docs into a single list and then find the 
# vocab as a dictionary with the frequency in values. 
    
    fin_doc=[]

    fin_doc = list(itertools.chain(*D))
    fin_doc_set = list(set(fin_doc))
        
    vocab_dict = {}
    
    for word in fin_doc:
        if word in vocab_dict.keys():
            vocab_dict[word]+=1
        else:
            vocab_dict[word] = 1

    vocab_dict_final = {}
    vocab_dict_final['<unk>'] = 0
    
    for key in vocab_dict.keys():
        if vocab_dict[key] > 1:
            vocab_dict_final[key] = vocab_dict[word]
        elif vocab_dict[key] == 1:                     # mapping the values with freq =1 as <unk>
            vocab_dict_final['<unk>']+=1
            

    
    return vocab_dict_final
    
        
    #raise NotImplementedError

In [None]:
class BBoWFeaturizer(object):
    def convert_document_to_feature_dictionary(self, doc, vocab):
        """
        Given a document represented as a list of tokens and the vocabulary
        as a set of tokens, computed the binary bag-of-words feature representation.
        This function returns a dictionary which maps from the name of the
        feature to the value of that feature.
        """

    # if word is in doc and in vocab then value=1 
        
        vocab_dict = {}

        for word in doc:
            if word in vocab.keys():
                vocab_dict[word] = 1
            else:
                vocab_dict['<unk>'] = 1                
       
        return vocab_dict
                
                
         
        #raise NotImplementedError

In [None]:
class CBoWFeaturizer(object):
    def convert_document_to_feature_dictionary(self, doc, vocab):
        """
        Given a document represented as a list of tokens and the vocabulary
        as a set of tokens, computed the count bag-of-words feature representation.
        This function returns a dictionary which maps from the name of the
        feature to the value of that feature.
        """
    # for words present in doc, we will find the word in vocab and its count in doc
        
        vocab_dict={}

        for word in doc:
            if word in vocab.keys():
                vocab_dict[word] = doc.count(word)
            else:
                if '<unk>' in vocab_dict.keys():
                    vocab_dict['<unk>']+= doc.count(word)
                else:
                    vocab_dict['<unk>'] = doc.count(word)
        

        return vocab_dict     
#         #raise NotImplementedError

In [None]:
def compute_idf(D, vocab):
    """
    Given a list of documents D and the vocabulary as a set of tokens,
    where each document is represented as a list of tokens, returned the IDF scores
    for every token in the vocab. The IDFs represented a dictionary that
    maps from the token to the IDF value. If a token is not present in the
    vocab, it was mapped to "<unk>".
    """
    idf={}
    num=len(D)

    for i in range(0,len(D)):
        unk_presence = False
        for word in set(D[i]):
            if word in vocab.keys():
                if word in idf.keys():
                    idf[word]+=1
                else:
                    idf[word]=1
            else:
                unk_presence = True
                
        if unk_presence:
            if '<unk>' in idf.keys():
                idf['<unk>']+= 1
            else:
                idf['<unk>'] = 1
    
    for word in vocab.keys():
        if word not in idf.keys():
            idf[word] = 0
    
    for word in idf.keys():
        if idf[word]==0:
            idf.pop(word)
        else:
            idf[word]=np.log(num/idf[word])

    return idf
    #raise NotImplementedError
    
class TFIDFFeaturizer(object):
    def __init__(self,idf):
        """The idf scores computed via `compute_idf`."""
        self.idf = idf
    
    
    
    def convert_document_to_feature_dictionary(self, doc, vocab):
        """
        Given a document represented as a list of tokens and
        the vocabulary as a set of tokens, computed
        the TF-IDF feature representation. This function
         returns a dictionary which maps from the name of the
        feature to the value of that feature.
        """
        tfidf={}
#       using the count from CBOW
        for word in doc:
            if word in vocab.keys():
                tfidf[word] = doc.count(word)
            else:
                if '<unk>' in tfidf.keys():
                    tfidf['<unk>']+= doc.count(word)
                else:
                    tfidf['<unk>'] = doc.count(word)
                
                
            
        for word in tfidf.keys():
            tfidf[word]=tfidf[word]*self.idf[word]   

        return tfidf
        #raise NotImplementedError

In [None]:
def load_dataset(file_path):
    D = []
    y = []
    with open(file_path, 'r') as f:
        for line in f:
            instance = json.loads(line)
            D.append(instance['document'])
            y.append(instance['label'])
    return D, y

def convert_to_features(D, featurizer, vocab):
    X = []
    for doc in D:
        X.append(featurizer.convert_document_to_feature_dictionary(doc, vocab))
    return X

In [None]:
def train_naive_bayes(X, y, k, vocab):
    """
    Computes the statistics for the Naive Bayes classifier.
    X is a list of feature representations, where each representation
    is a dictionary that maps from the feature name to the value.
    y is a list of integers that represent the labels.
    k is a float which is the smoothing parameters.
    vocab is the set of vocabulary tokens.
    
    Returns two values:
        p_y: A dictionary from the label to the corresponding p(y) score
        p_v_y: A nested dictionary where the outer dictionary's key is
            the label and the innner dictionary maps from a feature
            to the probability p(v|y). For example, `p_v_y[1]["hello"]`
            should be p(v="hello"|y=1).
    """
    
    #for class=1
    p_y1 = sum(y_train)/len(y_train)
    
    #for class=0
    p_y0= 1 - p_y1
    
    
    p_y={"0":p_y0, "1":p_y1}
    #separating for class 0 and class 1 
    class0=[]
    class1=[]
    
    for i in range(len(X)):
        if y[i]==0:
            class0.append(X[i])
        else:
            class1.append(X[i])
            
   # calculating p_v_y for class 0 
    prob_num0={}
    for i in range(len(class0)):
        for word in class0[i].keys():
            if word not in prob_num0.keys():
                prob_num0[word] = class0[i][word]
            else:
                prob_num0[word]+= class0[i][word]
                
    for word in vocab.keys():
        if word not in prob_num0.keys():
            prob_num0[word] = 0
            
                  
    den0=sum(list(prob_num0.values())) + k*len(prob_num0.keys())
#     den0=sum(list(prob_num0.values()))
    for word in prob_num0.keys():
        prob_num0[word]= (k + prob_num0[word])/den0
        
              
    # calculating p_v_y for class 1
    prob_num1={}
    for i in range(len(class1)):
        for word in class1[i].keys():
            if word not in prob_num1.keys():
                prob_num1[word] = class1[i][word]
            else:
                prob_num1[word]+= class1[i][word]
                
    for word in vocab.keys():
        if word not in prob_num1.keys():
            prob_num1[word] = 0
            
       
    den1=sum(list(prob_num1.values())) + k*len(prob_num1.keys())
    
    for word in prob_num1.keys():
        prob_num1[word]= (k + prob_num1[word])/den1
    
            
    p_v_y={"0":prob_num0, "1":prob_num1} 
    
    return p_y , p_v_y

    #raise NotImplementedError

In [None]:
def predict_naive_bayes(D, p_y, p_v_y):
    """
    Runs the prediction rule for Naive Bayes. D is a list of documents,
    where each document is a list of tokens.
    p_y and p_v_y are output from `train_naive_bayes`.
    
    Any token which is not in p_v_y is mapped to
    "<unk>". Further, the input dictionaries are probabilities. I converted them to log-probabilities while I computed
    the Naive Bayes prediction rule to prevent underflow errors.
    
    Returns two values:
        predictions: A list of integer labels, one for each document,
            that is the predicted label for each instance.
        confidences: A list of floats, one for each document, that is
            p(y|d) for the corresponding label that is returned.
    """
  
    log_p_y= {}
    log_p_y["0"]=np.log(p_y["0"])
    log_p_y["1"]=np.log(p_y["1"])
    
    prob_num0={}
    prob_num1={}
    
    
    for word in p_v_y["0"].keys():
        prob_num0[word]= np.log(p_v_y["0"][word])
        
    for word in p_v_y["1"].keys():
        prob_num1[word]= np.log(p_v_y["1"][word])
        
    log_p_v_y= {"0":prob_num0, "1":prob_num1}
    
    predictions=[]
    for doc in D:
        sum0=0
        sum1=0
        for word in doc:
            if (word not in log_p_v_y["0"].keys()) and (word not in log_p_v_y["1"].keys()) :
                sum0+= log_p_v_y["0"]["<unk>"]
                sum1+= log_p_v_y["1"]["<unk>"]
            elif word not in log_p_v_y["0"].keys():
                sum0+=  log_p_v_y["0"]["<unk>"]      
                sum1+= log_p_v_y["1"][word]
            elif word not in log_p_v_y["1"].keys():
                sum0+= log_p_v_y["0"][word]
                sum1+=  log_p_v_y["1"]["<unk>"]  
            else:
                sum0+= log_p_v_y["0"][word]
                sum1+= log_p_v_y["1"][word]
            
        sum0= sum0+ log_p_y["0"]
        sum1= sum1+ log_p_y["1"]
        
        if sum0 > sum1:
            predictions.append(0)
        else:
            predictions.append(1)
            
    return predictions
    
    #raise NotImplementedError

In [None]:
D_train, y_train = load_dataset('data/imdb/train.jsonl')
D_valid, y_valid = load_dataset('data/imdb/valid.jsonl')
D_test, y_test = load_dataset('data/imdb/test.jsonl')
vocab = get_vocabulary(D_train)
len(vocab.keys())

In [None]:
# Computed the features, for example, using the BBowFeaturizer.
# Converted the training instances to their feature-based representations.
import time
j = time.time()
k=0.001
vocab = get_vocabulary(D_train)
featurizer = BBoWFeaturizer()
X_train = convert_to_features(D_train, featurizer, vocab) 

k=0.01
print("k=0.01")
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)
y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))
y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))


k=0.1
print("k=0.1")
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)
y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))
y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))

k=1
print("k=1")
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)
y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))
y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))

k=10
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)
y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))
y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))
print((time.time() - j)/60)
print(time.time() - j)

In [None]:
# using the CBowFeaturizer.

import time
j = time.time()
k=0.001
vocab = get_vocabulary(D_train)
featurizer = CBoWFeaturizer()

X_train = convert_to_features(D_train, featurizer, vocab) 
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)

y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))

y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))


k=0.01
print("k=0.01")
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)

y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))

y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))



k=0.1
print("k=0.1")
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)

y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))

y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))
# print(y_pred)

k=1
print("k=1")
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)

y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))

y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))

k=10

p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)
y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))
y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))
print((time.time() - j)/60)
print(time.time() - j)

print((time.time() - j)/60)
print(time.time() - j)

In [None]:
#  using the TFIDFFeaturizer.
import time
j = time.time()
k=0.001
print("k=0.001")

idf=compute_idf(D_train, vocab)
featurizer = TFIDFFeaturizer(idf)
X_train = convert_to_features(D_train, featurizer, vocab) 
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)
y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))
y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))

k=0.01
print("k=0.01")
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)

y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))

y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))



k=0.1
print("k=0.1")
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)

y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))

y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))
# print(y_pred)

k=1

p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)
y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))

y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))

k=10
p_y , p_v_y= train_naive_bayes(X_train, y_train, k, vocab)

y_pred_valid= predict_naive_bayes(D_valid, p_y, p_v_y)
print("Validation accuracy: ", accuracy_score(y_pred_valid, y_valid))

y_pred_test = predict_naive_bayes(D_test, p_y, p_v_y)
print("Test accuracy: ", accuracy_score(y_pred_test, y_test))
print((time.time() - j)/60)
print(time.time() - j)

print((time.time() - j)/60)
print(time.time() - j)