### Map Reduce Paradigm

In [1]:
import nltk
nltk.download('gutenberg')
from nltk.corpus import gutenberg

[nltk_data] Downloading package gutenberg to
[nltk_data]     C:\Users\the-e\AppData\Roaming\nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!


In [3]:
from nltk.tokenize import word_tokenize
import time
import numpy as np
from timeit import default_timer as timer
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from nltk.corpus import brown
import scipy.stats as stats
import math
import random

In [4]:
from collections import defaultdict
#import operator
from multiprocessing import Pool
from nltk.probability import FreqDist

In [5]:
def freqdot(docA,docB): #method to obtain the dotproduct from frequency distribution dictionary
    product = 0
    for word, count in docA.items():
        product += count * docB.get(word,0) #the 0 in .get will use 0 if there is no word in the docB
    return product


In [6]:
def all_pairs(documents): #creating a list of all documents which dictionaries are input
    similarity=[]
    temp=[]
    for i in range(len(documents)):
        temp=[]
        for j in range(i+1,len(documents)):
            temp.append((i,j))
        similarity.append(temp)
    return similarity

In [7]:
fileids= gutenberg.fileids() #Getting all the different book inside the gutenburg corpus in terms of ids
docsA= [list(gutenberg.words(i)) for i in fileids] #gutenberg.words alows us to extract all sentences from the gutenberg corpus
wordsA = []
for d in docsA:
    for i in d:
        wordsA.append(i)

In [8]:
# just for checking
docpairs = all_pairs(docsA)
print(docpairs)

[[(0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0, 10), (0, 11), (0, 12), (0, 13), (0, 14), (0, 15), (0, 16), (0, 17)], [(1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9), (1, 10), (1, 11), (1, 12), (1, 13), (1, 14), (1, 15), (1, 16), (1, 17)], [(2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8), (2, 9), (2, 10), (2, 11), (2, 12), (2, 13), (2, 14), (2, 15), (2, 16), (2, 17)], [(3, 4), (3, 5), (3, 6), (3, 7), (3, 8), (3, 9), (3, 10), (3, 11), (3, 12), (3, 13), (3, 14), (3, 15), (3, 16), (3, 17)], [(4, 5), (4, 6), (4, 7), (4, 8), (4, 9), (4, 10), (4, 11), (4, 12), (4, 13), (4, 14), (4, 15), (4, 16), (4, 17)], [(5, 6), (5, 7), (5, 8), (5, 9), (5, 10), (5, 11), (5, 12), (5, 13), (5, 14), (5, 15), (5, 16), (5, 17)], [(6, 7), (6, 8), (6, 9), (6, 10), (6, 11), (6, 12), (6, 13), (6, 14), (6, 15), (6, 16), (6, 17)], [(7, 8), (7, 9), (7, 10), (7, 11), (7, 12), (7, 13), (7, 14), (7, 15), (7, 16), (7, 17)], [(8, 9), (8, 10), (8, 11), (8, 12), (8, 13), (8, 14), (

In [9]:
def mapper(documents): #mapping functions
    output=[]
    for pair in documents:
        output.append((pair[0],pair)) #the first element is used to split the computation and second is which computation it will complete
    return output

In [21]:
def jac_reduce(items):
    output=[]
    key,value = items #items here is the value obtained from mapper which has (i,[A,B])
    for documents in value:
        
        A=FreqDist(docsA[documents[0]]) #selecting item 1 from the mapped name document
        B=FreqDist(docsA[documents[1]]) #selecting item 2 from the mapped name document
        #ordinary jaccard similarity
        U = A.keys() | B.keys()
        I = A.keys() & B.keys()
        Jacc = len(I) / len(U)
        
        output.append(((documents[0],documents[1]),Jacc))
    return output

In [11]:
def cos_reduce(items):
    output=[]
    key,value = items
    for documents in value:
        A=FreqDist(docsA[documents[0]]) #selecting item 1 from the mapped name document
        B=FreqDist(docsA[documents[1]]) #selecting item 2 from the mapped name document
        # ordinary cosine similarity
        num = freqdot(A,B) 
        denom = np.power((freqdot(A,A))*(freqdot(B,B)),0.5)
        Cos = num/denom
        output.append(((documents[0],documents[1]),Cos))
    return output

In [12]:

def map_reduce_parallel(docum,mapper,reducer,mapprocesses=3,reduceprocesses=2):
    output = []
    collector=defaultdict(list)  #this dictionary is where we will store intermediate results
                                 #it will map keys to lists of values (default value of a list is [])
                                 #in a real system, this would be stored in individual files at the map nodes
                                 #and then transferred to the reduce nodes
    doc1=docum
    #map stage
    mappool = Pool(processes=mapprocesses)
    map_out = mappool.map(mapper,doc1)
    mappool.close()
    #print map_out
    
    for mapresult in map_out:
        for (key, value) in mapresult:     #pass each input to the mapper function and receive back each key,value pair yielded
            collector[key].append(value)     #append the value to the list for that key in the intermediate store
            
    #reduce stage 
    reducepool = Pool(processes=reduceprocesses)
    #print(type(reducer))
    #print(type(collector.items()))
    reduceresults=reducepool.map(reducer,collector.items())
    reducepool.close()
    for reduceresult in reduceresults:
        output+=reduceresult
   
    return output


In [56]:
#checking all documents for the run time
def time_parallel(docum,mapper,reducer,style,**args):
    #current= 0
    times=[]
    for i in range(2):
        print('Running loop',i) #helps find error
        start = timer()
        map_reduce_parallel(docum,mapper,reducer,**args)
        end = timer()
        #current = (end-start)
        times.append(end-start)
    avg_time = np.mean(times)
    print("All pair similarity for all gutenberg corpus with {} had average run time {}".format(style,avg_time))
    return avg_time    
    

In [None]:
from nltk.probability import FreqDist

In [151]:
def processfinder(method,i,j): #used to enumerate multiple processes
    lst1=[]
    lst2=[]
    lst3=[]
    for a in range(i):
        for b in range(j):
            lst1.append(a)
            lst2.append(b)
            lst3.append(float(time_parallel(docpairs,mapper, method,"Jaccard",mapprocesses=i,reduceprocesses=j)))
      
    return lst1,lst2,lst3

In [152]:
a,b,c = processfinder(jac_reduce,4,4)

Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 30.639299690499683
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 30.78159794400017
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 30.57187984150096
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 30.67204374799894
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 30.766731173001062
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 30.758039680000365
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 30.609995562999757
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run tim

In [159]:
df1 = pd.DataFrame()
df1["Map Process count"] = [i for i in a]
df1["Reduce Process Count"] = [i for i in b]
df1["Average Run Time"] = [i for i in c]
df1.sort_values(by = "Average Run Time").head(3) # Only interested in the best performing model

Unnamed: 0,Map Process count,Reduce Process Count,Average Run Time
14,3,2,30.488052
13,3,1,30.508794
2,0,2,30.57188


After testing all possible cases the fastest calculation of all Jaccard similarity is when I utilise 3 map processes and 2 reduce processes.

In [161]:
d,e,f = processfinder(cos_reduce,4,4)

Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 31.208363962000476
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 31.220804969500932
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 31.235590779500853
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 31.0801687485
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 31.118708703000266
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 31.116786405500534
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 31.126346098500107
Running loop 0
Running loop 1
All pair similarity for all gutenberg corpus with Jaccard had average run time 

In [160]:
df1 = pd.DataFrame()
df1["Map Process count"] = [i for i in d]
df1["Reduce Process Count"] = [i for i in e]
df1["Average Run Time"] = [i for i in f]
df1.sort_values(by = "Average Run Time").head(3) # Only interested in the best performing model

Unnamed: 0,Map Process count,Reduce Process Count,Average Run Time
9,2,1,30.710155
7,1,3,30.744845
13,3,1,30.777759


After testing all possible cases the fastest calculation of all Jaccard similarity is when I utilise 2 map processes and 1 reduce processes.