# Unit 7 lab: Map/Reduce



The following code loads in the text of H.G.Wells' novel "Time Machine" and turns the novel into a list of lines and each line into a list of tokens.  We will think of and refer to each line as a *document*.  Run the code and print out the first five lines.

In [1]:
from nltk.tokenize import word_tokenize

document1="timemachine.txt"

lines=[]
with open(document1) as instream:
    for line in instream:        
        tokens=[token.lower() for token in word_tokenize(line.rstrip())]  #strip excess white space, tokenize, lower-case
        lines.append(tokens)
        
        

In [2]:
print(lines[:5])

[['\ufeffproject', 'gutenberg', "'s", 'the', 'time', 'machine', ',', 'by', 'h.', 'g.', '(', 'herbert', 'george', ')', 'wells'], [], ['this', 'ebook', 'is', 'for', 'the', 'use', 'of', 'anyone', 'anywhere', 'at', 'no', 'cost', 'and', 'with'], ['almost', 'no', 'restrictions', 'whatsoever', '.', 'you', 'may', 'copy', 'it', ',', 'give', 'it', 'away', 'or'], ['re-use', 'it', 'under', 'the', 'terms', 'of', 'the', 'project', 'gutenberg', 'license', 'included']]


### Exercise 1
Write a simple function (i.e., without using map/reduce) to count the number of times each word occurs and store it in a dictionary.  Use it to find out how many times the word 'time' occurs in the novel.


In [3]:
def countline(aline,adict):
    for token in aline:
        adict[token]=adict.get(token,0)+1
    

def countdoc(doclines):
    countdict={}
    for line in doclines:
        countline(line,countdict)
        
    return countdict

mydict=countdoc(lines)
print(mydict.get("time",0))

201


In [4]:
#alternative
from collections import Counter
def word_count(documents):
    return Counter(word for document in documents for word in document)

counts=word_count(lines)
counts["time"]

201

Now lets see how we can rewrite this in the Map/Reduce paradigm.  Below is a map_reduce function which takes a list of inputs, a mapper function and a reducer function

In [5]:
from collections import defaultdict

def map_reduce(inputs,mapper,reducer):
    
    collector=defaultdict(list)  #this dictionary is where we will store intermediate results
                                 #it will map keys to lists of values (default value of a list is [])
                                 #in a real system, this would be stored in individual files at the map nodes
                                 #and then transferred to the reduce nodes
    
    #map stage
    for inp in inputs:
        for key, value in mapper(inp):     #pass each input to the mapper function and receive back each key,value pair yielded
            collector[key].append(value)     #append the value to the list for that key in the intermediate store
            
    #reduce stage 
    outputs=[]
    for key,values in collector.items():  #take each pair of key, value-lists and pass it to the reduce function
        for res in reducer(key,values):       #take each pair yielded by the reducer and add it to the outputs list
            outputs.append(res)
    
    return outputs


Now we define a mapper function and a reducer function particular to our word-counting task

In [6]:
def wc_mapper(document):
    for word in document:
        yield(word,1)
        
def wc_reducer(word,counts):
    yield(word,sum(counts))
        

Lets run this and display the results.


In [7]:
#display() just prints the dictionary items using tab separation
#ipl is the desired number of columns per line
def display(results,ipl=6):
    line=""
    itemcount=1
    for res in results:
        if itemcount<ipl:
            for thing in res:
                line+=str(thing)+"\t"
                itemcount+=1
        else:
            print(line)
            itemcount=1
            line=""

results = map_reduce(lines,wc_mapper,wc_reducer)
display(results)

﻿project	1	gutenberg	30	's	77	
time	201	machine	91	,	2372	
h.	4	g.	4	(	32	
george	3	)	32	wells	12	
ebook	9	is	131	for	243	
of	1268	anyone	5	anywhere	3	
no	99	cost	3	and	1303	
almost	25	restrictions	2	whatsoever	2	
you	196	may	48	copy	12	
give	8	away	39	or	162	
under	38	terms	23	project	86	
included	3	online	4	www.gutenberg.net	3	
:	59	author	1	release	1	
october	2	2	2	2004	1	
#	1	35	1	]	4	
updated	2	3	6	2014	1	
english	3	***	6	start	6	
i	1266	traveller	61	so	106	
be	112	convenient	5	to	760	
him	39	was	551	expounding	2	
recondite	1	matter	6	us	36	
grey	10	eyes	35	shone	8	
usually	3	pale	9	face	38	
animated	3	fire	29	burned	6	
soft	15	radiance	1	incandescent	1	
in	594	lilies	1	silver	6	
bubbles	1	that	447	flashed	4	
our	59	glasses	1	chairs	2	
patents	1	embraced	1	caressed	2	
than	36	submitted	1	sat	22	
there	122	luxurious	1	after-dinner	1	
when	55	thought	57	roams	1	
free	9	trammels	1	precision	1	
put	34	way	40	--	180	
points	3	lean	2	forefinger	2	
we	90	lazily	1	admired	1	
over	52	new	3

### Exercise 2
Turn the output of map_reduce (i.e., results) into a Pandas dataframe  or a dictionary.  Then
1. look up the number of times 'time' occurs.
2. find all of the words which occur more than 200 times

In [8]:
import pandas as pd

df =pd.DataFrame(results,columns=["word","count"])
df[df["word"]=="time"]

Unnamed: 0,word,count
4,time,201


In [9]:
df[df["count"]>200]

Unnamed: 0,word,count
3,the,2419
4,time,201
6,",",2372
18,for,243
20,of,1268
23,at,253
26,and,1303
27,with,262
31,.,1882
35,it,432


In [10]:
#alternative without pandas

def list_to_dict(alist):
    adict={}
    for (k,v) in alist:
        adict[k]=v
    return adict

resultsdict=list_to_dict(results)
print("time: "+str(resultsdict['time']))


def morethan(adict,threshold):
    alist=[]

    for (key,value) in resultsdict.items():
        if value>threshold:
            alist.append((key,value))
    return alist

print(morethan(resultsdict,200))
    

time: 201
[('the', 2419), ('time', 201), (',', 2372), ('for', 243), ('of', 1268), ('at', 253), ('and', 1303), ('with', 262), ('.', 1882), ('it', 432), ('i', 1266), ('to', 760), ('was', 551), ('a', 872), ('in', 594), ('that', 447), ('as', 271), ('me', 281), ("'", 248), ('my', 437), ('had', 355)]


Here is some code which uses a pool of processes to do the map tasks and reduce tasks.  A slight modification is needed for the map and reduce functions so that they return lists rather than the 'unpickleable' generators.


In [11]:
#import operator
from multiprocessing import Pool

def map_reduce_parallel(inputs,mapper,reducer,mapprocesses=3,reduceprocesses=2):
    
    collector=defaultdict(list)  #this dictionary is where we will store intermediate results
                                 #it will map keys to lists of values (default value of a list is [])
                                 #in a real system, this would be stored in individual files at the map nodes
                                 #and then transferred to the reduce nodes
    
    mappool = Pool(processes=mapprocesses)
    #map stage
    
    mapresults=mappool.map(mapper,inputs)
    mappool.close()
    #print mapresult
    for mapresult in mapresults:
        for (key, value) in mapresult:     #pass each input to the mapper function and receive back each key,value pair yielded
            collector[key].append(value)     #append the value to the list for that key in the intermediate store
            
    #reduce stage 
    outputs=[]
    reducepool = Pool(processes=reduceprocesses)
    
    reduceresults=reducepool.map(reducer,collector.items())
    reducepool.close()
    for reduceresult in reduceresults:
        outputs+=reduceresult
   
    return outputs


In [12]:
import defs  # auxiliary python program with the map and reduce functions: defs.py 
             
# Here are these functions, commented out, for your information:

# def wc_mapper_parallel(document):
#     output=[]
#     for word in document:
#         output.append((word,1))
        
#     return output

# def wc_reducer_parallel(item):
#     output=[]
#     (word,counts)=item
#     output.append((word,sum(counts)))
#     return output
        
results=map_reduce_parallel(lines,defs.wc_mapper_parallel,defs.wc_reducer_parallel)
#results
df=pd.DataFrame(results,columns=["word","count"])
df[df["word"]=="time"]

Unnamed: 0,word,count
4,time,201


### Exercise 3
Time the performance of the parallel version of map_reduce with different numbers of map and reduce processes.  
* What is the best number of processes for each stage on the computer you are working on? 
* How fast is the original non-parallel version of map_reduce? 
* Explain why this is significantly different to the parallel version (with 1 process at each stage).

In [13]:
import numpy as np
import time

def timeit(somefunc,*args,**kwargs):
    times=[]
    repeats=100
    for i in range(repeats):
        starttime=time.time()
        ans=somefunc(*args,**kwargs)
        endtime=time.time()
        timetaken=endtime-starttime
        times.append(timetaken)
    
    mean=np.mean(times)
    stdev=np.std(times)
    error=stdev/(repeats**0.5)
 
    return (ans,mean,error)

In [14]:
ans,mean,sd=timeit(map_reduce,lines,wc_mapper,wc_reducer)
print(mean,sd)

0.011469218730926514 0.0007166078252461874


In [15]:
ans,mean,sd=timeit(map_reduce_parallel,lines,defs.wc_mapper_parallel,defs.wc_reducer_parallel,mapprocesses=1,reduceprocesses=1)
print(mean,sd)

0.1261836338043213 0.008532408635731558


In [16]:
ans,mean,sd=timeit(map_reduce_parallel,lines,defs.wc_mapper_parallel,defs.wc_reducer_parallel,mapprocesses=2,reduceprocesses=1)
print(mean,sd)

0.11331103563308716 0.007615309586743195


In [17]:
ans,mean,sd=timeit(map_reduce_parallel,lines,defs.wc_mapper_parallel,defs.wc_reducer_parallel,mapprocesses=1,reduceprocesses=2)
print(mean,sd)

0.11699332475662232 0.007705948549100209


In [18]:
ans,mean,sd=timeit(map_reduce_parallel,lines,defs.wc_mapper_parallel,defs.wc_reducer_parallel,mapprocesses=3,reduceprocesses=1)
print(mean,sd)

0.11308610439300537 0.007416591036939156


In [19]:
ans,mean,sd=timeit(map_reduce_parallel,lines,defs.wc_mapper_parallel,defs.wc_reducer_parallel,mapprocesses=2,reduceprocesses=2)
print(mean,sd)

0.09655795812606811 0.0054101573922414395


In [20]:
ans,mean,sd=timeit(map_reduce_parallel,lines,defs.wc_mapper_parallel,defs.wc_reducer_parallel,mapprocesses=3,reduceprocesses=2)
print(mean,sd)

0.09799331188201904 0.005302156288125585


On my laptop, the version involving multiprocessing is slower, showing how big the time overhead can be in setting up a parallel computing environment. However, if you are using a parallel computing environment, then the results do show that making use of more than 1 map/reduce process does speed things up a bit. For heavier tasks than this one, there will be more benefit to parallel computing.

### Exercise 4
Write some map and reduce functions of your own.  Maybe ...
1. the frequency of each length of word
2. the mean length of a word beginning with each letter
3. matrix-vector multiplication
4. matrix-matrix multiplication

**In the below, I put the map and reduce functions into this notebook. However, there is a chance that this won't work for everyone's Python set-up. If you find this is the case, then you need to put the map and reduce functions into an auxiliary .py file, as above for the word counting.**

In [21]:
#frequency of each length of word

def length_mapper_parallel(document):
    output=[]
    for word in document:
        output.append((len(word),1))
        
    return output

def length_reducer_parallel(item):
    output=[]
    (word,counts)=item
    output.append((word,sum(counts)))
    return output

        

results=map_reduce_parallel(lines,length_mapper_parallel,length_reducer_parallel)
length_df=pd.DataFrame(results,columns=["length","count"])
length_df.head(10)

Unnamed: 0,length,count
0,8,1553
1,9,1192
2,2,6100
3,3,7932
4,4,6199
5,7,2478
6,1,7081
7,6,2779
8,5,3855
9,12,302


In [22]:
#mean length of word

def mean_mapper_parallel(document):
    output=[]
    for word in document:
        output.append((word[0],len(word)))
        
    return output

def mean_reducer_parallel(item):
    output=[]
    (character,lengths)=item
    output.append((character,np.mean(lengths)))
    return output

        

results=map_reduce_parallel(lines,mean_mapper_parallel,mean_reducer_parallel)
mean_df=pd.DataFrame(results,columns=["initial","mean"])
mean_df.head(10)

Unnamed: 0,initial,mean
0,﻿,8.0
1,g,6.212538
2,',2.82485
3,t,3.820648
4,m,4.22262
5,",",1.0
6,b,4.737043
7,h,4.12168
8,(,1.0
9,),1.0


In [23]:
v=[1,2,3,4]
m=[[1,2,3,4],[2,3,4,5],[3,4,5,6],[4,5,6,7]]

v_array=np.array(v)
m_array=np.array(m)

m_array.dot(v_array.T)

array([30, 40, 50, 60])

In [24]:
def matrixvector_mapper(inp):
    (i,j,mij,v)=inp
    return([(i,mij*v[j])])

def matrixvector_reducer(item):
    (key,values)=item
    output=[(key,sum(values))]
    return output

def mapreduce_matrixvector(m,v,mp=3,rp=2):
    
    items=[]
    for i,row in enumerate(m):
        for j,item in enumerate(row):
            items.append((i,j,m[i][j],v))
    return map_reduce_parallel(items,matrixvector_mapper,matrixvector_reducer,mapprocesses=mp,reduceprocesses=rp)

mvresults=mapreduce_matrixvector(m,v)
print(mvresults)
    

[(0, 30), (1, 40), (2, 50), (3, 60)]


In [25]:
ans,mean,sd=timeit(mapreduce_matrixvector,m,v,mp=2,rp=1)
print(mean,sd)

0.018314309120178222 0.00017813431201346285


In [26]:
ans,mean,sd=timeit(mapreduce_matrixvector,m,v,mp=1,rp=1)
print(mean,sd)

0.016373491287231444 8.877683017237075e-05


In [27]:
v_array=np.array(v)
m_array=np.array(m)
m_array.dot(m_array)

array([[ 30,  40,  50,  60],
       [ 40,  54,  68,  82],
       [ 50,  68,  86, 104],
       [ 60,  82, 104, 126]])

In [28]:
#matrix multiplication

def mm_mapper1(item):
    (mat,i,j,mij)=item
    
    if mat=='M':
        output=[(j,('M',i,mij))]
    else:
        output=[(i,('N',j,mij))]
        
    return output
        
def mm_reducer1(item):
    #print(item)
    (key,values)=item
    Ms=[]
    Ns=[]
    for value in values:
        if value[0]=='M':
            Ms.append((value[1],value[2]))
        else:
            Ns.append((value[1],value[2]))
    output=[]        
    for (i,m) in Ms:
        for (k,n) in Ns:
            output.append(((i,k),m*n))
    return output

def mm_mapper2(item):
    return [item]

def mm_reducer2(item):
    
    (key,values)=item
    output=[(key,sum(values))]
    return output


def mapreduce_matrixmult(m,n,mp=3,rp=2):
    items=[]
    for i,row in enumerate(m):
        for j, item in enumerate(row):
            items.append(('M',i,j,m[i][j]))
            
    for i,row in enumerate(n):
        for j,item in enumerate(row):
            items.append(('N',i,j,n[i][j]))
    #print(items)        
    pass1=map_reduce_parallel(items,mm_mapper1,mm_reducer1,mapprocesses=mp,reduceprocesses=rp)
    return map_reduce_parallel(pass1,mm_mapper2,mm_reducer2,mapprocesses=mp,reduceprocesses=rp)

In [29]:
mapreduce_matrixmult(m,m)

[((0, 0), 30),
 ((0, 1), 40),
 ((0, 2), 50),
 ((0, 3), 60),
 ((1, 0), 40),
 ((1, 1), 54),
 ((1, 2), 68),
 ((1, 3), 82),
 ((2, 0), 50),
 ((2, 1), 68),
 ((2, 2), 86),
 ((2, 3), 104),
 ((3, 0), 60),
 ((3, 1), 82),
 ((3, 2), 104),
 ((3, 3), 126)]

In [30]:
ans,mean,sd=timeit(mapreduce_matrixmult,m,m,mp=1,rp=1)
print(mean,sd)

0.032625267505645754 0.0001310789039645563


In [31]:
ans,mean,sd=timeit(mapreduce_matrixmult,m,m,mp=2,rp=1)
print(mean,sd)

0.037695558071136476 0.00016486395583180095


In [32]:
ans,mean,sd=timeit(mapreduce_matrixmult,m,m,mp=1,rp=2)
print(mean,sd)

0.03778847455978394 0.00013824512402255397


In [33]:
ans,mean,sd=timeit(mapreduce_matrixmult,m,m,mp=2,rp=2)
print(mean,sd)

0.04131392002105713 0.0001857586071032738


In [34]:
big=np.random.rand(32,32)
print(big)

[[0.22903855 0.34596606 0.75622172 ... 0.83020132 0.77643067 0.60894094]
 [0.46123371 0.38585371 0.09273281 ... 0.64226986 0.59715403 0.37192755]
 [0.86670735 0.81820758 0.08647737 ... 0.04689848 0.9085142  0.22603577]
 ...
 [0.09627361 0.5255098  0.67160285 ... 0.03533762 0.72662724 0.45179589]
 [0.35019386 0.38877569 0.30133888 ... 0.25772366 0.88522437 0.62101558]
 [0.76386968 0.52234017 0.13222605 ... 0.22483833 0.5157115  0.63151478]]


In [35]:
ans,mean,sd=timeit(mapreduce_matrixmult,big,big,mp=1,rp=1)
print(mean,sd)

0.6257751202583313 0.011350108948889232


In [36]:
ans,mean,sd=timeit(mapreduce_matrixmult,big,big,mp=2,rp=2)
print(mean,sd)

0.49178553581237794 0.008141246160734025
