# Cloudfare - Inverted Index Solution by Vamshi Krishna Terala

#  Steps:
    1. Create a document dictionary. This document dictionary has an unique id assigned to each document. Broadcast the document dictionary
        (filename, file_index)
    2. Create a word dictionary. This dictionary has the list of all words and each word is assigned a unique id. Broadcast the word dictionary
        (word, word_index)
    3. Use the above dictionaries to get the RDD like below
        (word_index, file_index)
    4. ReduceByKey and Sort by word id and then document ids to get the inverted index
        (word_index, [list of file_indexes])
    5. Save the output to disk

In [4]:
import findspark
findspark.init()
import pyspark

In [5]:
from pyspark import SparkContext, SparkConf
sparkconf = SparkConf().setAppName("InvIndex")
sc = SparkContext.getOrCreate(conf=sparkconf)

List all files in the directory

In [6]:
import glob, os
base_files = glob.glob("C:/Vamshi/Cloudfare/test/data/*.txt")
base_files

['C:/Vamshi/Cloudfare/test/data\\file0.txt',
 'C:/Vamshi/Cloudfare/test/data\\file1.txt',
 'C:/Vamshi/Cloudfare/test/data\\file10.txt',
 'C:/Vamshi/Cloudfare/test/data\\file11.txt',
 'C:/Vamshi/Cloudfare/test/data\\file12.txt',
 'C:/Vamshi/Cloudfare/test/data\\file13.txt',
 'C:/Vamshi/Cloudfare/test/data\\file14.txt',
 'C:/Vamshi/Cloudfare/test/data\\file15.txt',
 'C:/Vamshi/Cloudfare/test/data\\file16.txt',
 'C:/Vamshi/Cloudfare/test/data\\file17.txt',
 'C:/Vamshi/Cloudfare/test/data\\file18.txt',
 'C:/Vamshi/Cloudfare/test/data\\file19.txt',
 'C:/Vamshi/Cloudfare/test/data\\file2.txt',
 'C:/Vamshi/Cloudfare/test/data\\file20.txt',
 'C:/Vamshi/Cloudfare/test/data\\file21.txt',
 'C:/Vamshi/Cloudfare/test/data\\file22.txt',
 'C:/Vamshi/Cloudfare/test/data\\file23.txt',
 'C:/Vamshi/Cloudfare/test/data\\file24.txt',
 'C:/Vamshi/Cloudfare/test/data\\file3.txt',
 'C:/Vamshi/Cloudfare/test/data\\file4.txt',
 'C:/Vamshi/Cloudfare/test/data\\file5.txt',
 'C:/Vamshi/Cloudfare/test/data\\file6.t

# 1. Create a document dictionary

    Zip function will assign a index to each document.
    Broadcast this dictionary to reducers as this is small in size. This improves the performance of the final reduce phase.  

In [7]:
document_indexes = [i for i in range(len(base_files))]
document_dict = dict(zip(base_files, document_indexes))

In [8]:
document_dict_broadcast = sc.broadcast(document_dict)

### Word Generator Function

    Create a list of words in the file.
    Map each word to the source file name.
    Final Output: (word, filename)

In [9]:
import re
def word_generator(filename):
    with open(filename, 'r', encoding="UTF-8") as f:
        words=[]
        for line in f:
            line = re.sub(r'[^\w\s]','',line)
            words = words + line.strip().split()
        words_dict = {(s,filename) for s in words}
        return words_dict

##### Parallelize the read files and call the word generator function on each file

In [10]:
words_rdd = sc.parallelize(base_files).flatMap(word_generator)
words_rdd.take(5)

[('sense', 'C:/Vamshi/Cloudfare/test/data\\file0.txt'),
 ('three', 'C:/Vamshi/Cloudfare/test/data\\file0.txt'),
 ('draught', 'C:/Vamshi/Cloudfare/test/data\\file0.txt'),
 ('endeavours', 'C:/Vamshi/Cloudfare/test/data\\file0.txt'),
 ('irrevocably', 'C:/Vamshi/Cloudfare/test/data\\file0.txt')]

# 2. Create a word dictionary

    From the above RDD get all the words(keys)
    Zip with index gives each word an index value starting from first word in the first partition to the last word in the last partition

In [11]:
indexed_word_dictionary = words_rdd\
    .map(lambda x: x[0])\
    .zipWithIndex()

In [12]:
indexed_word_dictionary.take(5)

[('sense', 0),
 ('three', 1),
 ('draught', 2),
 ('endeavours', 3),
 ('irrevocably', 4)]

###### Remove duplicates

    The above indexed dictionary contains duplicates.
    We can use distinct to remove duplicates but it will trigger a shuffle. 
    Instead, use collectAsMap which removes all the duplicates and returns the final dictionary to the driver.
    collectAsMap function does not trigger a shuffle.

In [14]:
dedup_word_dictionary = indexed_word_dictionary.collectAsMap()
word_dictionary_broadcast = sc.broadcast(dedup_word_dictionary)

# 3. Map document dictionary and word dictionary

    RDD: (word, filename)
    Document dictionary (broadcast): (filename, file_index)
    Word dictionary (broadcast): (word, word_index)
    
    1. Map initial RDD and document dictionary to create a rdd of the form (word, file_index)
    2. Map the above rdd with word dictionary to create a rdd of the form (word_index, file_index)

In [15]:
def get_document_id(w):
    return (w[0],document_dict_broadcast.value[w[1]])

In [16]:
words_rdd_document = words_rdd.map(lambda w : get_document_id(w))

In [17]:
words_rdd_document.take(5)

[('sense', 0),
 ('three', 0),
 ('draught', 0),
 ('endeavours', 0),
 ('irrevocably', 0)]

In [18]:
wordid_documentid = words_rdd_document.map(lambda w: (word_dictionary_broadcast.value[w[0]], w[1]))

In [19]:
wordid_documentid.take(10)

[(24735, 0),
 (22805, 0),
 (17802, 0),
 (21724, 0),
 (3371, 0),
 (25419, 0),
 (14720, 0),
 (19825, 0),
 (23010, 0),
 (19706, 0)]

# 4. Create Inverted index and Sort it

    1. ReduceByKey to get rdd of the form (word_index, [list of file_indexes])
    2. SortByKey to sort the inverted index based on word_indexes.

In [20]:
inverted_index = wordid_documentid.map(lambda w : (w[0], [w[1]])).reduceByKey(lambda x,y : x+y).sortByKey()

In [21]:
inverted_index.take(10)

[(10, [0]),
 (19, [0]),
 (20, [0]),
 (30, [0]),
 (39, [0]),
 (41, [0]),
 (49, [0]),
 (51, [0]),
 (65, [0]),
 (67, [0])]

# 5. Save the output to disk

In [22]:
inverted_index.saveAsTextFile("cloudfare_inverted_index_result.txt")