In [1]:
import findspark
findspark.init()
import sys, getopt

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, DataFrameWriter
from pyspark.sql import functions as F
from pyspark.sql.functions import *

import math
import re
import nltk
from nltk.corpus import stopwords

In [2]:
#Ido Hayun
#Tim Buchbinder 
#Lidor Tevet

In [3]:
c = SparkConf().setAppName("TF-IDF ").set("spark.dynamicAllocation.enabled", "true") # Set Spark configuration
try:
    sc = SparkContext(conf=c)
except:
    sc.stop()
    sc = SparkContext(conf=c)
sc.setLogLevel("ERROR")
sql = SQLContext(sc)

In [4]:
docs_path = "C:/stories/test"
textFiles = sc.wholeTextFiles(docs_path) #(path_doc_name, content)
num_docs = textFiles.count() 
#textFiles.take(3)

In [5]:
try:
    stops = set(stopwords.words('english'))
except:
    nltk.download('popular')
    stops = set(stopwords.words('english'))

def deleteStopWord(word:str):
    global stops
    word = word.replace("\n", "")\
    .replace("\r", "")\
    .lower()
    if word in stops:
        return ""
    else:
        return word
    
def listWithOutStopWord(line):
    fileName = line[0]
    words = line[1]
    withOutStopWords = []
    for word in words:
        if deleteStopWord(word) != "":
            withOutStopWords.append(word)
    return (fileName, withOutStopWords)

In [6]:
def fixText(text):
    regex = re.compile('[^a-zA-Z ]')
    text = text.replace("\t", " ")\
                .replace("\n", " ")\
                .replace("\r", " ")\
                .lower()
    return regex.sub("",text)

In [7]:
# RDD contains name of file and content (as a list of words) (fileName, list(word)) and filtring stopwords
text_files = textFiles.map(lambda docs:(docs[0].split("/")[-1],fixText(docs[1]).split(" "))).map(listWithOutStopWord).cache() # we can take the path as name

# RDD with all the words (every row is a list of words from that story)(list(word))
bag_of_words = text_files.map(lambda docs: docs[1])

# RDD with all the words (every line is one word)
bag_of_words = bag_of_words.flatMap(lambda x:x).cache().filter(lambda x: x !='').distinct() 

numOfWords = bag_of_words.count() #amount of words in all files
bag_of_words.take(50)

['b',
 'mercedes',
 'fight',
 'gp',
 'setback',
 'home',
 'support',
 'funded',
 'readers',
 'subscribe',
 'search',
 'sign',
 'account',
 'overview',
 'profile',
 'emails',
 'settings',
 'help',
 'comments',
 'replies',
 'switch',
 'edition',
 'us',
 'australia',
 'current',
 'news',
 'opinion',
 'lifestyle',
 'world',
 'science',
 'global',
 'development',
 'football',
 'business',
 'cartoons',
 'letters',
 'cricket',
 'rugby',
 'cycling',
 'sports',
 'books',
 'music',
 'design',
 'games',
 'classical',
 'stage',
 'recipes',
 'love',
 'garden',
 'women']

In [8]:
#Inverted index
# every row will be word and the files it appears in
text_files_exploded =text_files.flatMapValues(lambda x:x).map(lambda x: (x[1], x[0]))#(word, file)
# we apply distinct so if word appears in one file more than once it will count only 1 time
# then we group by word and count the number of files it appears in
inverted_index = text_files_exploded.distinct().groupByKey().map(lambda x:( x[0], list(x[1]))).cache()#(word, list(file))
inverted_index.take(50)

[('b',
  ['F11.txt',
   'F13.txt',
   'F14.txt',
   'football2.txt',
   'football3.txt',
   'football4.txt',
   'tennis1.txt',
   'F12.txt',
   'F15.txt',
   'football1.txt',
   'football5.txt',
   'football6.txt',
   'NBA2.txt',
   'tennis2.txt',
   'tennis3.txt',
   'tennis4.txt',
   'tennis5.txt',
   'tennis6.txt']),
 ('mercedes', ['F11.txt', 'F13.txt', 'F14.txt', 'F12.txt']),
 ('fight', ['F11.txt', 'F13.txt', 'F14.txt', 'football6.txt']),
 ('gp', ['F11.txt', 'F13.txt', 'F14.txt', 'F15.txt']),
 ('setback', ['F11.txt', 'F13.txt', 'football5.txt']),
 ('home',
  ['F11.txt',
   'F13.txt',
   'F14.txt',
   'football2.txt',
   'football4.txt',
   'tennis1.txt',
   'F15.txt',
   'football1.txt',
   'football5.txt',
   'football6.txt',
   'tennis2.txt',
   'tennis3.txt',
   'tennis4.txt',
   'tennis5.txt',
   'tennis6.txt']),
 ('support',
  ['F11.txt',
   'F13.txt',
   'F14.txt',
   'football2.txt',
   'football3.txt',
   'football4.txt',
   'NBA1.txt',
   'tennis1.txt',
   'F15.txt',
   'f

In [9]:
def calculate_TF_For_Word(tf):
    if not tf == 0:
        tf = 1.0 + math.log10(tf)
    return tf

In [10]:
#Convert bagOfWords to broadcast variable to be more efficient
bag_of_words_readOnly = sc.broadcast(bag_of_words.collect())

def countWord(words, text_file_list, fileName:str):
    wordsCount = []
    for word in words:
        wordsCount.append( (word, (fileName , calculate_TF_For_Word(text_file_list.count(word)))) )
    return wordsCount

def calculateTF(textFile, words):
    return countWord(words, textFile[1], textFile[0])

#We could make it a table but this way it was easier to create the tfidf table later on
TFVectors = text_files.map(lambda x: calculateTF(x, bag_of_words_readOnly.value)).flatMap(lambda x: x) #(word, (file, TF))
TFVectors.take(50)

[('b', ('F11.txt', 1.0)),
 ('mercedes', ('F11.txt', 2.113943352306837)),
 ('fight', ('F11.txt', 1.4771212547196624)),
 ('gp', ('F11.txt', 1.6989700043360187)),
 ('setback', ('F11.txt', 1.3010299956639813)),
 ('home', ('F11.txt', 1.4771212547196624)),
 ('support', ('F11.txt', 1.3010299956639813)),
 ('funded', ('F11.txt', 1.3010299956639813)),
 ('readers', ('F11.txt', 1.3010299956639813)),
 ('subscribe', ('F11.txt', 1.4771212547196624)),
 ('search', ('F11.txt', 1.845098040014257)),
 ('sign', ('F11.txt', 1.3010299956639813)),
 ('account', ('F11.txt', 1.3010299956639813)),
 ('overview', ('F11.txt', 1.0)),
 ('profile', ('F11.txt', 1.0)),
 ('emails', ('F11.txt', 1.0)),
 ('settings', ('F11.txt', 1.0)),
 ('help', ('F11.txt', 1.3010299956639813)),
 ('comments', ('F11.txt', 1.0)),
 ('replies', ('F11.txt', 1.0)),
 ('switch', ('F11.txt', 1.845098040014257)),
 ('edition', ('F11.txt', 2.0)),
 ('us', ('F11.txt', 1.9030899869919435)),
 ('australia', ('F11.txt', 1.3010299956639813)),
 ('current', ('F11

In [11]:
def count_files(line):
    return line[0], len(line[1])

def calculate_DF(line):
    global num_docs
    count = (float(num_docs)/float(line[1]))
    df = math.log10(count)
    return (line[0],df)

In [12]:
#Counting the files that each word appers in and calculate DF fo this value
DF = inverted_index.map(count_files).map(calculate_DF).cache()
DF.take(100)

[('b', 0.0871501757189002),
 ('mercedes', 0.7403626894942439),
 ('fight', 0.7403626894942439),
 ('gp', 0.7403626894942439),
 ('setback', 0.8653014261025438),
 ('home', 0.16633142176652496),
 ('support', 0.0636690798693773),
 ('funded', 0.16633142176652496),
 ('readers', 0.16633142176652496),
 ('subscribe', 0.16633142176652496),
 ('search', 0.16633142176652496),
 ('sign', 0.0871501757189002),
 ('account', 0.13830269816628146),
 ('overview', 0.16633142176652496),
 ('profile', 0.16633142176652496),
 ('emails', 0.16633142176652496),
 ('settings', 0.16633142176652496),
 ('help', 0.11197375944393233),
 ('comments', 0.16633142176652496),
 ('replies', 0.16633142176652496),
 ('switch', 0.16633142176652496),
 ('edition', 0.16633142176652496),
 ('us', 0.13830269816628146),
 ('australia', 0.16633142176652496),
 ('current', 0.13830269816628146),
 ('news', 0.0),
 ('opinion', 0.16633142176652496),
 ('lifestyle', 0.16633142176652496),
 ('world', 0.0871501757189002),
 ('science', 0.16633142176652496),


In [13]:
DFjoinTF = DF.join(TFVectors)#(word,(df,(fileName, tf)) )
DFjoinTF.take(50)

[('b', (0.0871501757189002, ('F11.txt', 1.0))),
 ('b', (0.0871501757189002, ('F12.txt', 1.0))),
 ('b', (0.0871501757189002, ('F13.txt', 1.0))),
 ('b', (0.0871501757189002, ('F14.txt', 1.0))),
 ('b', (0.0871501757189002, ('F15.txt', 1.0))),
 ('b', (0.0871501757189002, ('football1.txt', 1.0))),
 ('b', (0.0871501757189002, ('football2.txt', 1.0))),
 ('b', (0.0871501757189002, ('football3.txt', 1.0))),
 ('b', (0.0871501757189002, ('football4.txt', 1.0))),
 ('b', (0.0871501757189002, ('football5.txt', 1.0))),
 ('b', (0.0871501757189002, ('football6.txt', 1.0))),
 ('b', (0.0871501757189002, ('NBA1.txt', 0))),
 ('b', (0.0871501757189002, ('NBA2.txt', 1.3010299956639813))),
 ('b', (0.0871501757189002, ('NBA3.txt', 0))),
 ('b', (0.0871501757189002, ('NBA4.txt', 0))),
 ('b', (0.0871501757189002, ('NBA5.txt', 0))),
 ('b', (0.0871501757189002, ('tennis1.txt', 1.0))),
 ('b', (0.0871501757189002, ('tennis2.txt', 1.0))),
 ('b', (0.0871501757189002, ('tennis3.txt', 1.0))),
 ('b', (0.0871501757189002, 

In [14]:
def calculateTFIDF(line):
    df = (float)(line[1][0])
    tf = (float)(line[1][1][1])
    tfIdf = df*tf
    return (line[1][1][0], (line[0], tfIdf))

In [15]:
Pre_TFIDF = DFjoinTF.map(calculateTFIDF) #(file, (word, tfidf))
Pre_TFIDF.take(10)

[('F11.txt', ('b', 0.0871501757189002)),
 ('F12.txt', ('b', 0.0871501757189002)),
 ('F13.txt', ('b', 0.0871501757189002)),
 ('F14.txt', ('b', 0.0871501757189002)),
 ('F15.txt', ('b', 0.0871501757189002)),
 ('football1.txt', ('b', 0.0871501757189002)),
 ('football2.txt', ('b', 0.0871501757189002)),
 ('football3.txt', ('b', 0.0871501757189002)),
 ('football4.txt', ('b', 0.0871501757189002)),
 ('football5.txt', ('b', 0.0871501757189002))]

In [16]:
#We group by the file and get the tfidf table
tf_idf = Pre_TFIDF.map(lambda x: (x[0], x[1][1])).groupByKey().map(lambda x : (x[0], list(x[1])))#(file list(tfidf))
tf_idf.take(50)

[('football2.txt',
  [0.0871501757189002,
   0.0,
   0.0,
   0.24569167841907472,
   0.12873137690694092,
   0.16633142176652496,
   0.16633142176652496,
   0.2664729161128467,
   0.16633142176652496,
   0.3068977802941999,
   0.24592311567580505,
   0.17993595879559407,
   0.2825920963598891,
   0.16633142176652496,
   0.13830269816628146,
   0.16633142176652496,
   0.16633142176652496,
   0.16633142176652496,
   0.17993595879559407,
   0.16633142176652496,
   0.11197375944393233,
   0.13830269816628146,
   0.24569167841907472,
   0.16633142176652496,
   0.24592311567580505,
   0.0,
   0.0,
   0.0,
   0.0,
   0.11338499273767592,
   0.0,
   0.0,
   0.0,
   0.28995099252980916,
   0.0,
   0.0,
   0.0,
   0.0,
   0.8653014261025438,
   0.0,
   0.39164905395343774,
   0.0,
   0.17993595879559407,
   0.3010299956639812,
   0.4973246408079494,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.3424226808222063,
   0.0,
   0.2164021689396858,
   0.0,
   0.0,
   0.0,
   0.0,
   0.497324640

In [17]:
def normalize (line):
    doc_name = line[0]
    vector = line[1]
    sum = 0.0
    for i in range (len(vector)):
        sum = sum + vector[i]*vector[i]
    normal = math.sqrt(sum)
    for i in range (len(vector)):
        vector[i] = vector[i]/normal
    return (doc_name,vector)

In [18]:
#Normalize the vector
tf_idf_normalized = tf_idf.map(normalize).cache()
tf_idf_normalized.take(100)

[('football2.txt',
  [0.003680420028211566,
   0.0,
   0.0,
   0.010375751586951467,
   0.0054364266499672435,
   0.007024305928710398,
   0.007024305928710398,
   0.011253359495234738,
   0.007024305928710398,
   0.01296053310152408,
   0.010385525363170715,
   0.007598836159353101,
   0.011934085074158628,
   0.007024305928710398,
   0.0058406310267082135,
   0.007024305928710398,
   0.007024305928710398,
   0.007024305928710398,
   0.007598836159353101,
   0.007024305928710398,
   0.004728739368476321,
   0.0058406310267082135,
   0.010375751586951467,
   0.007024305928710398,
   0.010385525363170715,
   0.0,
   0.0,
   0.0,
   0.0,
   0.004788336853345723,
   0.0,
   0.0,
   0.0,
   0.012244857010369762,
   0.0,
   0.0,
   0.0,
   0.0,
   0.03654235545479428,
   0.0,
   0.0165396456210213,
   0.0,
   0.007598836159353101,
   0.012712731970933755,
   0.021002408239043292,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.0,
   0.014460777413425604,
   0.0,
   0.009138832711972567,
   0.0

In [19]:
broadcat_tf_idf = sc.broadcast(tf_idf_normalized.collect())

def calculate_distance(vector1, vector2):
    distance = 0.0
    for i in range (len(vector2)):
        distance += vector1[i]* vector2[i]
                
    return distance

def calculate_distance_best(line):
    res = []
    
    for (key, value) in broadcat_tf_idf.value:
        if key != line[0]:
            res.append(   (key, calculate_distance(line[1], value)) )
            
    res = sorted(res, key=lambda x: -x[1])
    
    return line[0], list(res)[:5]

In [20]:
best5ForEachFile = tf_idf_normalized.map(calculate_distance_best)
best5ForEachFile.take(23)
#As expected same brances of sport will provide the same similarity

[('football2.txt',
  [('football5.txt', 0.09224583736819295),
   ('football1.txt', 0.07791103213105953),
   ('football6.txt', 0.0665641105527657),
   ('football4.txt', 0.0657783632334112),
   ('football3.txt', 0.05897752200705769)]),
 ('football4.txt',
  [('football5.txt', 0.1177844945427229),
   ('football1.txt', 0.0729595493700162),
   ('football2.txt', 0.0657783632334112),
   ('football6.txt', 0.05911338987926454),
   ('tennis5.txt', 0.047338003022169055)]),
 ('NBA1.txt',
  [('NBA5.txt', 0.19316389235930298),
   ('NBA4.txt', 0.1849138572942812),
   ('NBA3.txt', 0.17310990925311256),
   ('NBA2.txt', 0.13942052085306314),
   ('tennis2.txt', 0.04822880422235758)]),
 ('NBA3.txt',
  [('NBA4.txt', 0.2086460913474355),
   ('NBA5.txt', 0.20729067668568849),
   ('NBA1.txt', 0.17310990925311256),
   ('NBA2.txt', 0.09353416069852619),
   ('F14.txt', 0.031923602588212883)]),
 ('F15.txt',
  [('F14.txt', 0.09478637391335704),
   ('F13.txt', 0.08736528572886137),
   ('F11.txt', 0.08682269881398508

In [21]:
def cleanDup(line):
    name1 = line[0]
    name2 = line[1][0]
    return name1<name2

In [22]:
best5 = best5ForEachFile.flatMapValues(lambda x:x).filter(cleanDup).sortBy(lambda x: -x[1][1])
best5.take(5)

[('NBA4.txt', ('NBA5.txt', 0.2848691614193588)),
 ('NBA3.txt', ('NBA4.txt', 0.2086460913474355)),
 ('NBA3.txt', ('NBA5.txt', 0.20729067668568849)),
 ('NBA1.txt', ('NBA5.txt', 0.19316389235930298)),
 ('F11.txt', ('F13.txt', 0.18812274053366645))]

In [23]:
def fixQuery(query):
    words = fixText(query).split(" ")
    listWithOutStopWord = []
    for word in words:
        if deleteStopWord(word) != "":
            listWithOutStopWord.append(word)
    return listWithOutStopWord

In [24]:
def calculate_distance(vector1, vector2):
    distance = 0.0
    for i in range (len(vector2)):
        distance += vector1[i]* vector2[i]
                
    return distance

def calculate_distance_best_query(line):
    global tf_idf_normalized
    res = []
    
    for (key, value) in broadcat_tf_idf.value:
        if key != line[0]:
            res.append(   (key, calculate_distance(line[1], value)) )
            
    res = sorted(res, key=lambda x: -x[1])
    
    return line[0], list(res)[:10]

In [25]:
def search(query):
    global closest_files
    global tf_idf_normalized
    global bag_of_words_readOnly
    global DF
    QueryWords = fixQuery(query) #Delete all chars that are not a-z OR A-Z and stopwords
    tfRDD = sc.parallelize(countWord(bag_of_words_readOnly.value,QueryWords,"query")) #RDD - (word, ('query' , TF)))
    DFjoinTFQuery = DF.join(tfRDD)#(word, (DF,('query' , TF)))
    Pre_TFIDFQuery = DFjoinTFQuery.map(calculateTFIDF)#('query', (word, tfidf))
    tf_idfQuery = Pre_TFIDFQuery.map(lambda x: (x[0], x[1][1])).groupByKey().map(lambda x : (x[0], list(x[1])))
    #('query', list(tfidf))
    tf_idfQuery_normalized = tf_idfQuery.map(normalize)#('query', normalize list(tfidf))
    closest_files = tf_idfQuery_normalized.map(calculate_distance_best_query).sortByKey(False)#((file,vec),('query',vec))
    return closest_files

In [26]:
def getSearch(query, tries):
    try:
        if tries < 10:
            return search(query)
    except:
        tries = tries + 1
        return getSearch(query, tries)

In [27]:
def getRDDRes(rdd, tries):
    try:
        if tries < 10:
            return rdd.take(10)
    except:
        tries = tries + 1
        return getRDDRes(rdd, tries)

In [28]:
test = getSearch("who is the best football player", 0)#1

In [29]:
print(getRDDRes(test, 0))
#We can assume that the word football will be related with the branch football, 
#and as expected we see matches with football6,4
#as well best player can be related to other branches 

[('query', [('football6.txt', 0.03885786872336027), ('tennis5.txt', 0.019910011963091917), ('football4.txt', 0.01109749072651651), ('tennis6.txt', 0.008658317482965576), ('tennis2.txt', 0.007822019223646753), ('F11.txt', 0.007546096304512636), ('tennis4.txt', 0.00721411494904064), ('football1.txt', 0.007087425313067309), ('football5.txt', 0.00700620285678186), ('F13.txt', 0.006970093602453385)])]


In [30]:
test = getSearch("How to get to the NBA", 0)#2

In [31]:
print(getRDDRes(test, 0))
#We can assume that the word "NBA" will appear only in NBA text where the other words are common words 
#(maybe even stop words) which appear in every text so they don't affect the result.

[('query', [('NBA1.txt', 0.05693855505125285), ('NBA4.txt', 0.03973952961639638), ('NBA3.txt', 0.03886988724065243), ('NBA5.txt', 0.034648744974219624), ('NBA2.txt', 0.017892674241314157), ('football2.txt', 0.0), ('football4.txt', 0.0), ('F15.txt', 0.0), ('football6.txt', 0.0), ('tennis2.txt', 0.0)])]


In [32]:
test = getSearch("how to kick a ball", 0)#3

In [33]:
print(getRDDRes(test, 0))

[('query', [('football5.txt', 0.11558275962456205), ('football2.txt', 0.0), ('football4.txt', 0.0), ('NBA1.txt', 0.0), ('NBA3.txt', 0.0), ('F15.txt', 0.0), ('football6.txt', 0.0), ('NBA2.txt', 0.0), ('NBA4.txt', 0.0), ('NBA5.txt', 0.0)])]


In [34]:
test = getSearch("best tennis player", 0)#4

In [35]:
print(getRDDRes(test, 0))

[('query', [('football6.txt', 0.03836381932667615), ('football4.txt', 0.010956394243917941), ('tennis6.txt', 0.008548233305183917), ('NBA2.txt', 0.008542985029975587), ('tennis2.txt', 0.007722567966919153), ('F11.txt', 0.00745015320600905), ('tennis4.txt', 0.007122392750801785), ('football1.txt', 0.00699731388094289), ('football5.txt', 0.006917124108817519), ('F13.txt', 0.006881473957262866)])]


In [36]:
test = getSearch("why the sun rise up in the morning",0)#5

In [37]:
print(getRDDRes(test, 0))

[('query', [('NBA3.txt', 0.1025383266064706), ('NBA2.txt', 0.018207384147729173), ('football2.txt', 0.0), ('football4.txt', 0.0), ('NBA1.txt', 0.0), ('F15.txt', 0.0), ('football6.txt', 0.0), ('NBA4.txt', 0.0), ('NBA5.txt', 0.0), ('tennis2.txt', 0.0)])]


In [38]:
test = getSearch("who is the fast man", 0)#6

In [39]:
print(getRDDRes(test, 0))

[('query', [('NBA4.txt', 0.04292312299813206), ('NBA2.txt', 0.03512112891902071), ('F14.txt', 0.03495377607907297), ('NBA1.txt', 0.026253356035104026), ('football2.txt', 0.0), ('football4.txt', 0.0), ('NBA3.txt', 0.0), ('F15.txt', 0.0), ('football6.txt', 0.0), ('NBA5.txt', 0.0)])]


In [40]:
test = getSearch("why can't I finish my My CS degree", 0)#7

In [41]:
print(getRDDRes(test, 0))
#Most of the words not related to sport and therefore most of that don't appear in any text, and that is the reason 
#most result are zero.

[('query', [('NBA4.txt', 0.049565517454536814), ('NBA1.txt', 0.03648549673005891), ('NBA2.txt', 0.024517351767110177), ('football2.txt', 0.0), ('football4.txt', 0.0), ('NBA3.txt', 0.0), ('F15.txt', 0.0), ('football6.txt', 0.0), ('NBA5.txt', 0.0), ('tennis2.txt', 0.0)])]


In [42]:
test = getSearch("only bibi can get us to peace", 0)#8

In [43]:
print(getRDDRes(test, 0))

[('query', [('NBA1.txt', 0.09435524226739739), ('NBA2.txt', 0.016118235206643555), ('football2.txt', 0.0), ('football4.txt', 0.0), ('NBA3.txt', 0.0), ('F15.txt', 0.0), ('football6.txt', 0.0), ('NBA4.txt', 0.0), ('NBA5.txt', 0.0), ('tennis2.txt', 0.0)])]


In [44]:
test = getSearch("TV BED GO ROOM FLOOR", 0)#9

In [45]:
print(getRDDRes(test, 0))

[('query', [('football1.txt', 0.07951053805786), ('football2.txt', 0.0339559125860953), ('NBA3.txt', 0.01762340591103905), ('NBA2.txt', 0.0140578634667113), ('F12.txt', 0.006495247087962533), ('football4.txt', 0.0), ('NBA1.txt', 0.0), ('F15.txt', 0.0), ('football6.txt', 0.0), ('NBA4.txt', 0.0)])]


In [46]:
test = getSearch("recing cars people and helmet", 0)#10

In [47]:
print(getRDDRes(test, 0))

[('query', [('football2.txt', 0.037686672800145586), ('F15.txt', 0.02846612360207252), ('tennis6.txt', 0.010051537817136661), ('football4.txt', 0.009902309262067225), ('tennis2.txt', 0.009080669793818405), ('tennis4.txt', 0.008374946907423448), ('football1.txt', 0.00822787148895669), ('football5.txt', 0.008133579429031637), ('F13.txt', 0.00809165979093274), ('tennis3.txt', 0.007808300536822047)])]


In [48]:
#Sheif 6!

In [49]:
vectors = tf_idf_normalized.map(lambda x: x[1])#(list(normalized vector))
vectors.take(10)

[[0.003680420028211566,
  0.0,
  0.0,
  0.010375751586951467,
  0.0054364266499672435,
  0.007024305928710398,
  0.007024305928710398,
  0.011253359495234738,
  0.007024305928710398,
  0.01296053310152408,
  0.010385525363170715,
  0.007598836159353101,
  0.011934085074158628,
  0.007024305928710398,
  0.0058406310267082135,
  0.007024305928710398,
  0.007024305928710398,
  0.007024305928710398,
  0.007598836159353101,
  0.007024305928710398,
  0.004728739368476321,
  0.0058406310267082135,
  0.010375751586951467,
  0.007024305928710398,
  0.010385525363170715,
  0.0,
  0.0,
  0.0,
  0.0,
  0.004788336853345723,
  0.0,
  0.0,
  0.0,
  0.012244857010369762,
  0.0,
  0.0,
  0.0,
  0.0,
  0.03654235545479428,
  0.0,
  0.0165396456210213,
  0.0,
  0.007598836159353101,
  0.012712731970933755,
  0.021002408239043292,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.014460777413425604,
  0.0,
  0.009138832711972567,
  0.0,
  0.0,
  0.0,
  0.0,
  0.021002408239043292,
  0.0,
  0.0210024082390432

In [50]:
numOfCenters = 4
converge = 0.0001
tempDist = 1.0

In [51]:
def normalizevec (vector):
    
    sum = 0.0
    for i in range (len(vector)):
        sum = sum + vector[i]*vector[i]
    normal = math.sqrt(sum)
    for i in range (len(vector)):
        vector[i] = vector[i]/normal
    return (vector)

In [52]:
import random
#Generating centers
centers = []
for i in range(numOfCenters):
    centers.append(normalizevec([random.random() for _ in range(numOfWords)]))
print(centers)

[[0.002303787007615448, 0.01857166179047093, 0.012395584522801636, 0.007208483628248475, 0.000958607429931439, 0.018142829685182892, 0.0008144465766868676, 0.0015804764515621088, 0.0006766579424070685, 0.004355128596387772, 0.014830394174702758, 0.01227736250562627, 0.0220254667955749, 0.008822288517494097, 0.021306159230421732, 0.011201278127745034, 0.022100319095876127, 0.009678139985090688, 0.00344790996160087, 0.0055748003112055826, 0.004391260028332985, 0.022695768367809236, 0.004268544257824513, 0.01820102894245054, 0.0034897455620061116, 0.021055695315172773, 0.0189602982053169, 0.003964108552605381, 0.019919924363623977, 0.010006980761382052, 0.011200365857105439, 0.01716369788464912, 0.024766052833329978, 0.023045574343384514, 2.2784763139495923e-05, 0.010294898959803192, 7.052674484312054e-05, 0.020875948438233736, 0.015155019037788754, 0.005130792680425885, 0.016930719189963483, 0.010328337849176233, 0.010607510274185123, 0.007944368848382777, 0.0074388311010248455, 0.023330

In [53]:
def calculateVectorsDistance(p, center):
    sumVector = 0.0
    for i in range(len(p)):
        sumVector = sumVector + (p[i] - center[i])**2
    return sumVector

In [54]:
def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist1 = calculateVectorsDistance(p, centers[i])
        if tempDist1 < closest:
            closest = tempDist1
            bestIndex = i
    return bestIndex

In [55]:
def sumVectorsAndCount(x, y):
    return([m + n for m,n in zip(x[0], y[0])], x[1] + y[1]) 

In [56]:
def calculateNewCenter(s):
    return(s[0], [(float)(m/s[1][1]) for m in s[1][0]])

In [57]:
def calculateDistanceBetweenCenters(newPoints, centers):
    res = 0
    for (centerNumber, centerVector) in newPoints:
        res = res + calculateVectorsDistance(centers[centerNumber], centerVector)
    return res

In [58]:
vectors.take(22)

[[0.003680420028211566,
  0.0,
  0.0,
  0.010375751586951467,
  0.0054364266499672435,
  0.007024305928710398,
  0.007024305928710398,
  0.011253359495234738,
  0.007024305928710398,
  0.01296053310152408,
  0.010385525363170715,
  0.007598836159353101,
  0.011934085074158628,
  0.007024305928710398,
  0.0058406310267082135,
  0.007024305928710398,
  0.007024305928710398,
  0.007024305928710398,
  0.007598836159353101,
  0.007024305928710398,
  0.004728739368476321,
  0.0058406310267082135,
  0.010375751586951467,
  0.007024305928710398,
  0.010385525363170715,
  0.0,
  0.0,
  0.0,
  0.0,
  0.004788336853345723,
  0.0,
  0.0,
  0.0,
  0.012244857010369762,
  0.0,
  0.0,
  0.0,
  0.0,
  0.03654235545479428,
  0.0,
  0.0165396456210213,
  0.0,
  0.007598836159353101,
  0.012712731970933755,
  0.021002408239043292,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.014460777413425604,
  0.0,
  0.009138832711972567,
  0.0,
  0.0,
  0.0,
  0.0,
  0.021002408239043292,
  0.0,
  0.0210024082390432

In [60]:
while tempDist > converge:
        closest = vectors.map(lambda x: ( closestPoint(x, centers), (x, 1) ))#(Index of closeset center, (vector, 1))
        pointStats = closest.reduceByKey(sumVectorsAndCount)
        #(Index of center, (sum of all vector related to center, num of vectors we sum))
        newPoints = pointStats.map(calculateNewCenter).collect()
        #[(Index of center, vector center)]
        tempDist = calculateDistanceBetweenCenters(newPoints,centers)
        #num - The Distance Between new and old Centers
        
        #Replace Old centers
        for (centerNumber, centerVector) in newPoints:
            centers[centerNumber] = centerVector

In [61]:
print(centers)

[[0.004717275101272617, 0.01448541361266909, 0.0, 0.01329882846059287, 0.006258997457254638, 0.009003207027250318, 0.009003207027250318, 0.009360810460308311, 0.009003207027250318, 0.016611799639822147, 0.015162811560389775, 0.007486064934658914, 0.01471107183575223, 0.009003207027250318, 0.00830661847867143, 0.009003207027250318, 0.012732040429066803, 0.009003207027250318, 0.00795734277991441, 0.009003207027250318, 0.006060929000584892, 0.008053552620986169, 0.01329882846059287, 0.009003207027250318, 0.012393990940385527, 0.0, 0.0, 0.0, 0.01290094543727909, 0.005531114104889129, 0.0, 0.0, 0.0, 0.011331329266575629, 0.0, 0.015106253863578606, 0.0, 0.0, 0.0, 0.0, 0.007316958885164891, 0.0, 0.011118228756667589, 0.015707599459592794, 0.009360342164619133, 0.0, 0.0061799509309122674, 0.0, 0.008283185849542457, 0.0, 0.00870407165880868, 0.0059779445510059715, 0.0, 0.00919297632132979, 0.008926302932666339, 0.0, 0.0, 0.0, 0.0035518742766552113, 0.0033954160786329097, 0.007478922694810007, 0

In [62]:
def findClosestFile(line, centers):
    bestIndex = 0
    p = line[1]
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist1 = calculateVectorsDistance(p, centers[i])
        if tempDist1 < closest:
            closest = tempDist1
            bestIndex = i
    return bestIndex

In [63]:
#tf_idf_normalized #(File, Vector)
#tf_idf_normalized.take(10)
#NOT OF THE ALGO JUST TO SEE THE RESULT
closestFile = tf_idf_normalized.map(lambda x: ( findClosestFile(x, centers), x[0] ) )#(Index of closeset center, fileName)
closestFileByIndex = closestFile.groupByKey().map(lambda x: (x[0], list(x[1])))
closestFileByIndex.take(4)

[(0,
  ['F15.txt',
   'football6.txt',
   'tennis6.txt',
   'F14.txt',
   'tennis3.txt',
   'tennis5.txt']),
 (1, ['NBA2.txt', 'football3.txt', 'F12.txt']),
 (2, ['tennis2.txt', 'tennis4.txt', 'tennis1.txt', 'football5.txt']),
 (3,
  ['football2.txt',
   'football4.txt',
   'NBA1.txt',
   'NBA3.txt',
   'NBA4.txt',
   'NBA5.txt',
   'F11.txt',
   'F13.txt',
   'football1.txt'])]

In [64]:
#We can see that closest vectors related to the same center. For Example: 
#tennis2, tennis4, tennis1
#NBA1, NBA3, NBA4, NBA5
#In a prfect world we would like to see similar group of vectors (NBA or football) related to same centers.
#More over, we deliberately took 4 branches from sport to exmaine the coraltion 
#between those barnches that are from the same domain.
#and that is why we see mix between the branches.