## Python code for k Nearest Neighbors Classifier 

This project implements a k-nearest neighbors classifier (kNN classifier) that can find the top-k nearest objects to a specific query object. Using this to classify text documents when a specific search text like ”The Bicycle Thief?” the algorithm can search in the document corpus and find the top K similar documents. 

Dataset Used is a Wikipedia data set. Each Wikipedia Page is a document and has a unique document ID and a specific URL. #### Sample of the dataset is available at 
##### https://s3.amazonaws.com/metcs777/WikipediaPagesOneDocPerLine1000LinesSmall.txt

## Task 1: Generating the Top 20K dictionary words

Below code gets the top 20,000 words in a local array and sorts them based on the frequency of words. The top 20K most frequent words in the corpus will act the dictionary 

At the end, we create  an RDD that includes the docID as key and a numpy array for the position of each words in the top 20K dictionary.

In [None]:
from __future__ import print_function
import sys
import re
import numpy as np 
from numpy import dot
from numpy.linalg import norm
from pyspark import SparkContext
import math
from math import sqrt
from math import isnan
from operator import add


def stringVector(x): 
    returnVal= str (x[0]) 
    for j in x[1]:
        returnVal += ','+ str(j) 
    return returnVal

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file> ", file=sys.stderr)
        exit(-1)
        
    sc = SparkContext(appName="Assig2_CS777")
    corpus = sc.textFile(sys.argv[1], 1)
    numberOfDocs = corpus.count()
    validLines = corpus.filter(lambda x : 'id=' in x and 'url=' in x)
    numberOfValidLines = validLines.count()
    keyAndText = validLines.map(lambda x : (x[x.index('id="') + 4 : x.index('" url=')], x[x.index('">') + 2:][:-6]))
    regex = re.compile('[^a-zA-Z]')

    keyAndListOfWords = keyAndText.map(lambda x: (str(x[0]),regex.sub(' ', x[1]).lower().split()))

    allWords= keyAndListOfWords.flatMap(lambda x: [x[0],[(data, 1) for data in x[1]]])


    allWords= keyAndListOfWords.flatMap(lambda x: [(data, 1) for data in x[1]])

    keyval= allWords.groupByKey()

    wordsGrouped = allWords.groupByKey()

    allWords.map(lambda x , y: (x , 1)).reduceByKey(lambda a,b: (a + b))

    wordsGrouped = allWords.groupByKey()

    allCounts = wordsGrouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)

    topWords = allCounts.values().take(20000)

    twentyK = sc.parallelize(range(20000))

    dictionary = twentyK.map(lambda x: (topWords[x], x))

    allWords = keyAndListOfWords.flatMap(lambda x: ((j, x[0]) for j in x[1]))

    allDictionaryWords = dictionary.join(allWords)

    justDocAndPos = allDictionaryWords.map( lambda x: ( x [1][1], x[1][0] ))

    allDictionaryWordsInEachDoc = justDocAndPos.groupByKey()

    new = allDictionaryWordsInEachDoc.map(lambda x : (x[0], list(x[1])))

    forCSV = new.map(lambda x: ( x[0], np.array(x[1])))

    forCSV = forCSV.map(lambda x: stringVector(x))

### Task 2: Create the TF-IDF Array

After having the top 20K words we create a large array that has columns as the word list in order and rows as documents.

The inverse document frequency is a measure of how much information the word provides, that is, whether the term is common or rare across all documents. It is the logarithmically scaled inverse fraction of the documents that contain the word, obtained by dividing the total number of documents by the number of documents containing the term, and then taking the logarithm of that quotient.


In [None]:
    
    def buildArray(listOfIndices):
        returnVal = np.zeros(20000)
        for index in listOfIndices:
            returnVal[index] = returnVal[index] + 1
        mysum = np.sum(returnVal)
        returnVal = np.divide(returnVal, mysum)
        return returnVal

    allDocsAsNumpyArrays = allDictionaryWordsInEachDoc.map(lambda x: ( x[0], buildArray(x[1])))

    def helper(arr):
        newArray = np.zeros(20000)
        for k in range(len(arr)):
            if(arr[k] == 0):
                newArray[k]= 0
            else:
                newArray[k]= 1
        return newArray

Now we create a version of allDocsAsNumpyArrays where all enteries in the array are either zero or one. A one indicates the presence of the word and a zero proves that the word does not occur.

In [None]:
    zeroOrOne = allDocsAsNumpyArrays.map(lambda x: (x[0], helper(x[1])))

    dfArray = zeroOrOne.reduce(lambda x1, x2:(( "", np.add(x1[1], x2[1]))))
    dfArray = dfArray[1]

    multiplier = np.full(20000, numberOfDocs)
    
        idfArray = np.zeros(20000)

    for k in range(len(dfArray)):
        idfArray[k] = np.log(np.divide(numberOfDocs, float(dfArray[k])))

    allDocsAsNumpyArraysTFidf = allDocsAsNumpyArrays.map (lambda x: (x[0], np.multiply (x[1], idfArray)))

### Task 3 - Implementing the getPrediction function

Finally, we implement the function getPrediction (textInput, k). This function will predict the membership of the input text string in one of the 20 different newsgroups.

In [None]:
# Finally, we have a function that returns the prediction for the label of a string, using a kNN algorithm
def getPrediction (textInput, k):
    
    # Create an RDD out of the textIput
    myDoc = sc.parallelize(('', textInput))
    
    # Flat map the text to (word, 1) pair for each word in the doc
    wordsInThatDoc = myDoc.flatMap (lambda x : ((j, 1) for j in regex.sub(' ', x).lower().split()))

    # This will give us a set of (word, (dictionaryPos, 1)) pairs
    allDictionaryWordsInThatDoc = dictionary.join (wordsInThatDoc).map (lambda x: (x[1][1], x[1][0])).groupByKey()

    # Get tf array for the input string
    myArray = buildArray(allDictionaryWordsInThatDoc.top(1)[0][1])

    # Get the tf * idf array for the input string
    myArray = np.multiply(myArray, idfArray)

    # Get the distance from the input text string to all database documents, using cosine similarity (np.dot() )
    distances = allDocsAsNumpyArraysTFidf.map(lambda x : (x[0], np.dot(x[1], myArray)))

    # get the top k distances
    topK = distances.top(k, lambda x : x[1])

    # and transform the top k distances into a set of (docID, 1) pairs
    docIDRepresented = sc.parallelize(topK).map (lambda x : (x[0], 1))

    # now, for each docID, get the count of the number of times this document ID appeared in the top k
    numTimes = docIDRepresented.reduceByKey(add)

    # Return the top 1 of them.  
    return numTimes.top(1, lambda x: x[1])

### Task 4: Prediction of the Input Text 

Depolying the above code over Google Cloud Platform, we use the below Text to test the feasibility of our classifier.

In [None]:
print('https://en.wikipedia.org/wiki?curid=' + str(getPrediction('God and Religion', 1)[0][0]))
print('https://en.wikipedia.org/wiki?curid=' + str(getPrediction('Sport Basketball Volleyball Soccer', 1)[0][0]))

sc.stop()

Testing On a small dataset of 1000 documents, above query will fetch https://en.wikipedia.org/wiki?curid=433978 and  https://en.wikipedia.org/wiki?curid=418388 respectively.