In [None]:
import glob
import json
import xml.etree.ElementTree as xml
import xml.etree.cElementTree as ET
import numpy as np
import os
import string
from bs4 import BeautifulSoup

import nltk
# nltk.download("punkt")
# nltk.download('stopwords')
# nltk.download('wordnet')
from nltk.corpus import stopwords 
stop_words = set(stopwords.words('english')) 

from nltk.stem import PorterStemmer
porter = PorterStemmer()

from nltk.tokenize import word_tokenize 

from nltk.stem import WordNetLemmatizer   
lemmatizer = WordNetLemmatizer() 

import re

In [None]:
import pickle

def save_data(data, filename):
    with open(filename, "wb") as f:
        pickle.dump(data, f)
        
def load_data(filename):
    try:
        with open(filename, "rb") as f:
            data = pickle.load(f)
    except Exception as e:
        print(e)
        print("No file found!")
        data = []
    return data

In [None]:
def remove_tags(textbody):
    text = BeautifulSoup(textbody).text
    text = re.sub(r'[-+]?\d+', '', text)
    text=text.translate((str.maketrans('','',string.punctuation)))  
    return wordprocess(text)
    #return wordprocess(TAG_RE.sub('', text))

def wordprocess(text):    
    # https://www.geeksforgeeks.org/removing-stop-words-nltk-python/
    # https://www.datacamp.com/community/tutorials/stemming-lemmatization-python
    # https://www.geeksforgeeks.org/python-lemmatization-with-nltk    
    words = nltk.word_tokenize(text)
    new_words= [porter.stem(lemmatizer.lemmatize(word)) for word in words if not word in stop_words]
    return new_words


# Parse XML with ElementTree
def parseXML(file_name):
    tree = ET.ElementTree(file=file_name)
    root = tree.getroot()
    rows = root.getchildren()
    iter = 0
    large_vocab = []
    for row in rows:
        textbody = row.attrib['Body']
        file_vocab = remove_tags(textbody.lower())        
        #print(textbody,cleantext,file_vocab)
        large_vocab.extend(file_vocab)        
        iter += 1
        if iter == 500:
            large_vocab = list(set(large_vocab))
            print(len(large_vocab))
            return large_vocab

In [None]:
def vocab_creation():
    all_files= glob.glob('./Data/Data/Training/*.xml')
    final_vocab = []
    for each_file in all_files:
        final_vocab.extend(parseXML(each_file))
        final_vocab = list(set(final_vocab))
    print(len(final_vocab))
    save_data(final_vocab, "vocab.pt")
    
# vocab_creation()

In [None]:
            
# Parse XML with ElementTree
def idf_distance_measure(file_name, start_no, end_no, final_vocab):
    tree = ET.ElementTree(file=file_name)
    root = tree.getroot()
    rows = root.getchildren()   
    
    # zero vector for counter
    df_vector = np.zeros(len(final_vocab)) 
    
    for iter in range(start_no, end_no):        
        row = rows[iter]
        textbody = row.attrib['Body']
        doc_vocab = remove_tags(textbody.lower())      
        doc_vocab = list(set(doc_vocab))  
        
        if len(doc_vocab) == 0:
            continue
               
        # check if a word from vocab appears in the document
        for vocab_iter, each_tok in enumerate(final_vocab):
            if each_tok in doc_vocab:
                df_vector[vocab_iter] += 1
                
    return np.copy(df_vector)   

def idf_measure():
    all_files=  glob.glob('./Data/Data/Training/*.xml')
    total_doc_count = len(all_files) * 500
    final_vocab = load_data("vocab.pt")
    
    final_df_vector = np.zeros(len(final_vocab))        
    temp_vector = np.empty(len(final_vocab))
    
    
    for each_file in all_files:        
        df_vector = idf_distance_measure(each_file, 0, 500, final_vocab)
        final_df_vector = np.copy(final_df_vector) + np.copy(df_vector)
        
    print(final_df_vector)    
    temp_vector = np.log( total_doc_count / (final_df_vector))
    print(temp_vector)    
    save_data(temp_vector, "idf.pt")
    
# idf_measure()

In [None]:
def vector_representation(file_name, no, final_vocab, type=None):
    tree = ET.ElementTree(file=file_name)
    root = tree.getroot()
    rows = root.getchildren()  
    
    row = rows[no]
    textbody = row.attrib['Body']
    doc_vocab = remove_tags(textbody.lower()) 

    if len(doc_vocab) == 0:
        return None

    doc_vector = np.zeros(len(final_vocab))

    # hamming calculation
    if type == "hamming":            
        doc_vocab = list(set(doc_vocab))            
        for vocab_iter, each_tok in enumerate(final_vocab):
            if each_tok in doc_vocab:
                doc_vector[vocab_iter] = 1

        #print(doc_vector)
        return doc_vector

    # euclidean calculation            
    elif type == "euclidean":
        for vocab_iter, each_tok in enumerate(final_vocab):
            doc_vector[vocab_iter] = doc_vocab.count(each_tok)

        #print(doc_vector)
        return doc_vector

    # tf-idf calculation
    elif type == "tf":            
        tf_vector = np.zeros(len(final_vocab))            
        for vocab_iter, each_tok in enumerate(final_vocab):
            tf_vector[vocab_iter] = doc_vocab.count(each_tok)

        doc_vector = np.copy(tf_vector) / len(doc_vocab) 

        idf_vector = load_data("idf.pt")
        tf_idf_vector = np.copy(idf_vector) * np.copy(doc_vector) 

        #print(tf_idf_vector, len(doc_vocab))
        return tf_idf_vector  

In [None]:
def prepare_train_test_val(type_name):
    all_files= glob.glob('./Data/Data/Training/*.xml')
    final_vocab = load_data("vocab.pt")
    
    X_Train, Y_Train, X_val, Y_val, X_Test, Y_Test= [], [], [], [], [], []
    
    for label_iter, each_file in enumerate(all_files): 
        for iter in range(0,500):
            instance = vector_representation(each_file, iter, final_vocab, type=type_name)
            if instance is not None:
                X_Train.append(np.copy(instance))
                Y_Train.append(label_iter)
                
        for iter in range(500,700):
            instance = vector_representation(each_file, iter, final_vocab, type=type_name)
            if instance is not None:
                X_val.append(np.copy(instance))
                Y_val.append(label_iter)
                
        for iter in range(700,1200):
            instance = vector_representation(each_file, iter, final_vocab, type=type_name)
            if instance is not None:
                X_Test.append(np.copy(instance))
                Y_Test.append(label_iter)
                
    return X_Train, Y_Train, X_val, Y_val, X_Test, Y_Test

def save_dictionary(type):
    X_train, Y_train, X_val, Y_val, X_test, Y_test = prepare_train_test_val(type)
    save_data(X_train, "X_train_"+type+".pt")
    save_data(Y_train, "Y_train_"+type+".pt")
    save_data(X_val, "X_val_"+type+".pt")
    save_data(Y_val, "Y_val_"+type+".pt")
    save_data(X_test, "X_test_"+type+".pt")
    save_data(Y_test, "Y_test_"+type+".pt")
    
def load_dictionary(type):
    X_train, Y_train, X_test, Y_test = load_data("X_train_"+type+".pt"), load_data("Y_train_"+type+".pt"), load_data("X_test_"+type+".pt"), load_data("Y_test_"+type+".pt")
    X_val, Y_val = load_data("X_val_"+type+".pt"), load_data("Y_val_"+type+".pt")
    return X_train, Y_train, X_val, Y_val, X_test, Y_test

# save_dictionary("hamming")
# save_dictionary("euclidean")
# save_dictionary("tf")


In [None]:
#Make prediction of the test points using training points
class K_Nearest_Neighbor:
    def __init__(self, type):
        self.type = type
        self.X_train, self.Y_train, self.X_val, self.Y_val, self.X_test, self.Y_test = load_dictionary(self.type)
        
    #Hamming Distance calculation between two data points
    def hamming_distance(self, instance1, instance2):    
        return np.sum(instance1 != instance2)

    #Euclidean Distance calculation between two data points
    def euclidean_distance(self, instance1, instance2):
        return np.linalg.norm(instance1-instance2)

    #cos Distance calculation between two data points
    def cosine_sim(self, instance1, instance2):
        distance = np.dot(instance1, instance2.T) / ( np.linalg.norm(instance1) * np.linalg.norm(instance2))
        return distance
    
    def KNN(self, X_train, Y_train, X_test, n_neighbors=3):

        #Determine Number of unique class lebels    
        uniqueOutputCount = len(list(set(Y_train)))


        allDistances = []
        for trainInput, trainActualOutput in zip(X_train, Y_train):
            if self.type == "hamming":
                distance = self.hamming_distance(X_test, trainInput)
            elif self.type == "euclidean":
                distance = self.euclidean_distance(X_test, trainInput)
            else:
                distance = self.cosine_sim(X_test, trainInput)

            allDistances.append((trainInput, trainActualOutput, distance))

        #Sort (in ascending order) the training data points based on distances from the test point     
        allDistances.sort(key=lambda x: x[2])
        if self.type == "tf":
            allDistances.reverse()


        #Assuming output labels are from 0 to uniqueOutputCount-1
        voteCount = np.zeros(uniqueOutputCount)
        neighbors = []
        for n in range(n_neighbors):
            neighbors.append(allDistances[n][0])
            class_label = int(allDistances[n][1])
            voteCount[class_label] += 1

        #Determine the Majority Voting (Equal weight considered)
        predictedOutput = np.argmax(voteCount)

        return predictedOutput, neighbors


    def performanceEvaluation(self, X_test, Y_test, n_neighbors=3):
        totalCount = 0
        correctCount = 0    

        for testInput, testActualOutput in zip(X_test, Y_test):
            predictedOutput,_ = self.KNN(self.X_train, self.Y_train, testInput, n_neighbors)

            if int(predictedOutput) == int(testActualOutput):
                #print("correct")
                correctCount += 1
            #else:
                #print("wrong")
            totalCount += 1

        print("Total Correct Count: ",correctCount," Total Wrong Count: ",totalCount-correctCount," Accuracy: ",(correctCount*100)/(totalCount))
        return (correctCount*100)/(totalCount)


    def KNN_runner(self): 
        
        val_Acc_all=[]
        types = ["hamming", "euclidean", "tf"]
        for type in types:
            self.change_type(type)
            for i in range(1,6,2):
                print("Running val for type: ", self.type, " and K = ", i)
                val_acc = self.performanceEvaluation(self.X_val, self.Y_val, i)      
                val_Acc_all.append([self.type,i,val_acc])
        
        json_object = json.dumps(val_Acc_all, indent = 4) 
        with open("knn_val_result.json", "w") as outfile: 
            outfile.write(json_object) 
                
                
    def change_type(self, type):
        self.type = type
        self.X_train, self.Y_train, self.X_val, self.Y_val, self.X_test, self.Y_test = load_dictionary(self.type)
        
    def test_runner(self, best_n):
        
        test_Acc_all = [] 
        
        len_all_files= len(glob.glob('./Data/Data/Training/*.xml'))
        len_all_files= len_all_files*10
        
        
            
        print("Running test for type: ", self.type, " and K = ", best_n)
        for i in range(0,len(self.X_test),len_all_files):
            test_acc = self.performanceEvaluation(self.X_test[i:(i+len_all_files)], self.Y_test[i:(i+len_all_files)], best_n) 
            test_Acc_all.append(test_acc)
        
        json_object = json.dumps(test_Acc_all, indent = 4) 
        with open("knn_best_test.json", "w") as outfile: 
            outfile.write(json_object) 

KNN = K_Nearest_Neighbor(type="tf")
# KNN.KNN_runner()
KNN.test_runner(5)

In [None]:
class Naive_Bayes:
    def __init__(self, alpha):
        print("initialized")
        self.alpha = alpha
        self.all_files= glob.glob('./Data/Data/Training/*.xml')
        #self.topic_probab = np.zeros(len(self.all_files))        
        self.len_final_vocab = len(load_data("vocab.pt"))
        
        
    def change_alpha(self, alpha):
        self.alpha = alpha
        if not os.path.isfile("topic_wise_prob_"+str(alpha)+".pt"):
            self.word_probab_creation()
        
    # Parse XML with ElementTree
    def parseXML(self, file_name):
        tree = ET.ElementTree(file=file_name)
        root = tree.getroot()
        rows = root.getchildren()
        iter = 0
        large_vocab = []
        for row in rows:
            textbody = row.attrib['Body']
            file_vocab = remove_tags(textbody.lower())
            large_vocab.extend(file_vocab)        
            iter += 1
            if iter == 500:
                #print(len(large_vocab))
                return large_vocab          

    def word_probab_creation(self):
        
        final_vocab = load_data("vocab.pt")
        len_final_vocab = len(final_vocab)
        final_prob = []
        
        for each_file in self.all_files:
            topic_wise_vocab = self.parseXML(each_file)
            len_topic_wise_vocab = len(topic_wise_vocab)
            topic_wise_prob = {}
            
            for vocab_iter, each_tok in enumerate(final_vocab):
                # total instance of that word divided by total word count, together with smoothing factor
                topic_wise_prob[each_tok] = (topic_wise_vocab.count(each_tok) + self.alpha)/ (len_topic_wise_vocab + (self.alpha * len_final_vocab))
                
            
            final_prob.append([topic_wise_prob])
            
            # Writing to sample.json 
            # Serializing json  
#             json_object = json.dumps(final_prob, indent = 4) 
#             with open("sample_"+str(self.alpha)+".json", "w") as outfile: 
#                 outfile.write(json_object) 

        save_data(final_prob, "topic_wise_prob_"+str(self.alpha)+".pt")
    
    # Parse XML with ElementTree
    def single_row(self, file_name, row_num):
        tree = ET.ElementTree(file=file_name)
        root = tree.getroot()
        rows = root.getchildren()
        row = rows[row_num]
        textbody = row.attrib['Body']
        file_vocab = remove_tags(textbody.lower())
        return file_vocab
    
    def get_result(self, file_name, row_num, len_topic_wise_vocab):
        words = self.single_row(file_name, row_num)
        
        if len(words) == 0:
            return None, None
        
        final_prob = load_data("topic_wise_prob_"+str(self.alpha)+".pt")
        best_acc, best_class = 0, 0       
        
        allacc = np.zeros(len(self.all_files))
        
        for label_iter, each_topic_prob in enumerate(final_prob):
            current_acc = np.log(1 / len(self.all_files)) #P(c)
            #current_acc = 1 / len(self.all_files) #P(c)
            
            for each_word in words:
                try:
                    current_acc = current_acc + np.log(np.finfo(float).eps+each_topic_prob[0][each_word])
#                     current_acc = current_acc * (each_topic_prob[0][each_word])
                except:
                    continue
                    #current_acc = current_acc + np.log(self.alpha / (len_topic_wise_vocab + (self.alpha * self.len_final_vocab)))
            
            allacc[label_iter] += current_acc
        
        best_class = np.argmax(allacc)   
        return allacc[best_acc], best_class
    
    def performanceEvaluation(self, start_iter, end_iter):
        totalCount = 0
        correctCount = 0        

        all_files= glob.glob('./Data/Data/Training/*.xml')
        for label_iter, each_file in enumerate(all_files):
            topic_wise_vocab = self.parseXML(each_file)
            len_topic_wise_vocab = len(topic_wise_vocab)
            for iter in range(start_iter,end_iter):
                best_acc, best_class = self.get_result(each_file, iter, len_topic_wise_vocab)
                if best_class is None:
                    continue
                totalCount += 1
                if best_class == label_iter:
                    correctCount += 1 

        print("Smoothing Factor",self.alpha,"Total Correct Count: ",correctCount," Total Wrong Count: ",totalCount-correctCount," Accuracy: ",(correctCount*100)/(totalCount))
        return (correctCount*100)/(totalCount)

    
    def test_runner(self):

        print("Running test for Smoothing Factor = ", self.alpha)
        test_Acc_all = []
        for i in range(700,1200,10):
            test_acc = self.performanceEvaluation(i, i+10) 
            test_Acc_all.append(test_acc)
        
        json_object = json.dumps(test_Acc_all, indent = 4) 
        with open("nb_best_test.json", "w") as outfile: 
            outfile.write(json_object) 
        

In [None]:
def NB_Runner(type):
    NB = Naive_Bayes(alpha=0.1)
    
    if type == "val":
        smooth_factors = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
        val_Acc_all = []
        
        for smooth_factor in smooth_factors:
            NB.change_alpha(smooth_factor)
            val_acc = NB.performanceEvaluation(500,700) 
            val_Acc_all.append([smooth_factor,val_acc])
        
        json_object = json.dumps(val_Acc_all, indent = 4) 
        with open("nb_val_result.json", "w") as outfile: 
            outfile.write(json_object) 
    else:
        NB.change_alpha(0.1)
        NB.test_runner()


In [None]:
# NB_Runner(type="val")
NB_Runner(type="test")

In [None]:
# NB_Runner(type="val")

In [None]:
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.ttest_ind_from_stats.html
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.ttest_rel.html
def load_json(filename):
    with open(filename) as f:
        data = json.load(f) 
        return data
    
KNN_test_results = load_json('knn_best_test.json')
NB_test_results = load_json('nb_best_test.json')

from scipy import stats
print(stats.ttest_rel(NB_test_results,KNN_test_results))
print(stats.ttest_rel(KNN_test_results, NB_test_results))