Simple social network task. Consider a simple social network dataset, where key = person and value = some friend of that person. Describe a MapReduce algorithm to count he number of friends each person has. Data was collected from here: http://snap.stanford.edu/data/egonets-Facebook.html

In [46]:
%matplotlib inline
import matplotlib
import seaborn as sns
matplotlib.rcParams['savefig.dpi'] = 144

In [47]:
%%writefile my_mr.py
from mrjob.job import MRJob
import heapq as hp
import re

class FirstStep(MRJob):
    def mapper(self,_,line):
        lis=line.split(' ')
        for num in lis:
            yield (int(num),1)
    def reducer(self,word,values):
        yield ('red',(sum(values),word))
            
class SecondStep(MRJob):
    def reducer(self,key,records):
        self.heap1=[] 
        for item in records:
            hp.heappush(self.heap1,(int(item[0]),int(item[1])))
        #print hp.nlargest(100,self.heap1)
        #yield ('key',(hp.nlargest(100,self.heap1)))
        yield ('key',self.heap1)

class ThirdStep(MRJob):
    def reducer(self,key,items):
        self.com_heap=[]
        for item in items:
            hp.heappush(self.com_heap,item)
        yield ('key2',self.com_heap)
    
class SteppedJob(MRJob):
    def steps(self):
        return FirstStep().steps()+SecondStep().steps()+ThirdStep().steps()
   
if __name__ == '__main__':
    SteppedJob.run()       

Overwriting my_mr.py


In [48]:
!python my_mr.py -r inline 'facebook/*.edges' > out1.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 3...
Creating temp directory /var/folders/vh/j55dyd2x3sz4gybmf6v7g9380000gn/T/my_mr.home.20180110.184711.346196
Running step 2 of 3...
Running step 3 of 3...
Streaming final output from /var/folders/vh/j55dyd2x3sz4gybmf6v7g9380000gn/T/my_mr.home.20180110.184711.346196/output...
Removing temp directory /var/folders/vh/j55dyd2x3sz4gybmf6v7g9380000gn/T/my_mr.home.20180110.184711.346196...


In [49]:
import re
with open('out1.txt','r') as f:
    fout=f.read()
f.close()
#print fout
values=fout.split('\t')
ls=values[1].split(',')
tup=[]
sum_=0
for i in range(0,len(ls),2):
    cnt= ls[i].replace('[','')
    word1= ls[i+1].replace('"','')
    word=word1.replace(']','')
    tup.append((int(word),int(cnt)))
    sum_+=int(cnt)

each tuple has two entries, one for the node number and the other for the number of connections

In [50]:
print sum_
print len(tup)
print "**************"
print tup

340348
3959
**************
[(33, 2), (35, 2), (70, 2), (145, 2), (42, 2), (90, 2), (138, 2), (234, 2), (183, 2), (256, 2), (47, 2), (154, 2), (153, 2), (282, 2), (216, 2), (267, 2), (286, 2), (241, 2), (244, 2), (377, 2), (1967, 2), (255, 2), (52, 2), (160, 2), (205, 2), (841, 2), (2788, 2), (1065, 2), (1176, 2), (305, 2), (2814, 2), (3268, 2), (336, 4), (3407, 2), (179, 4), (1854, 2), (2382, 2), (288, 6), (3688, 2), (379, 2), (3807, 2), (3854, 2), (2541, 2), (1430, 2), (401, 2), (468, 2), (233, 2), (581, 2), (279, 2), (744, 2), (775, 2), (1690, 2), (859, 2), (97, 4), (0, 4), (1755, 2), (112, 4), (1224, 2), (1233, 2), (1326, 2), (316, 2), (321, 4), (2691, 4), (228, 4), (3282, 2), (1776, 8), (3375, 4), (340, 10), (3408, 2), (182, 4), (3494, 6), (2740, 2), (2380, 4), (356, 4), (2857, 2), (3619, 6), (3653, 6), (247, 4), (3704, 2), (2483, 4), (3746, 2), (3808, 2), (3846, 2), (3875, 2), (3885, 2), (3922, 2), (3942, 2), (3987, 2), (3983, 2), (4034, 2), (443, 2), (2922, 4), (499, 4), (530, 6)

Now lets get the top 100 nodes with the most connections

In [51]:
%%writefile my_mr1.py
from mrjob.job import MRJob
import heapq as hp
import re

class FirstStep(MRJob):
    def mapper(self,_,line):
        lis=line.split(' ')
        for num in lis:
            yield (int(num),1)
    def reducer(self,word,values):
        yield ('red',(sum(values),word))
            
class SecondStep(MRJob):
    def reducer(self,key,records):
        self.heap1=[] 
        for item in records:
            hp.heappush(self.heap1,(int(item[0]),int(item[1])))
        #print hp.nlargest(100,self.heap1)
        yield ('key',(hp.nlargest(100,self.heap1)))
        #yield ('key',self.heap1)
        
class ThirdStep(MRJob):
    def reducer(self,key,items):
        self.com_heap=[]
        for item in items:
            hp.heappush(self.com_heap,item)
        yield ('key2',hp.nlargest(100,self.com_heap))        

class SteppedJob(MRJob):
    def steps(self):
        return FirstStep().steps()+SecondStep().steps()+ThirdStep().steps()

if __name__ == '__main__':
    SteppedJob.run()

Overwriting my_mr1.py


In [52]:
!python my_mr1.py -r inline 'facebook/*.edges' > out2.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 3...
Creating temp directory /var/folders/vh/j55dyd2x3sz4gybmf6v7g9380000gn/T/my_mr1.home.20180110.184803.451576
Running step 2 of 3...
Running step 3 of 3...
Streaming final output from /var/folders/vh/j55dyd2x3sz4gybmf6v7g9380000gn/T/my_mr1.home.20180110.184803.451576/output...
Removing temp directory /var/folders/vh/j55dyd2x3sz4gybmf6v7g9380000gn/T/my_mr1.home.20180110.184803.451576...


In [53]:
import re
with open('out2.txt','r') as f:
    fout=f.read()
f.close()
#print fout
values=fout.split('\t')
ls=values[1].split(',')
tup=[]
sum_=0
for i in range(0,len(ls),2):
    cnt= ls[i].replace('[','')
    word1= ls[i+1].replace('"','')
    word=word1.replace(']','')
    tup.append((int(word),int(cnt)))
    sum_+=int(cnt)

In [54]:
print sum_
print len(tup)
print "**************"
print tup

39878
100
**************
[(2543, 586), (2347, 580), (483, 568), (1888, 506), (1800, 488), (1663, 468), (2266, 466), (1352, 466), (1730, 450), (1985, 446), (1941, 444), (2233, 442), (2142, 440), (1431, 438), (1199, 432), (1584, 420), (2206, 418), (1768, 416), (2611, 412), (2410, 412), (2229, 412), (2218, 408), (2047, 408), (1589, 408), (1086, 408), (2078, 406), (2123, 404), (1993, 404), (2464, 402), (1746, 402), (2560, 400), (2507, 400), (2240, 400), (1827, 400), (2244, 398), (2309, 396), (1983, 396), (2602, 394), (2340, 394), (2131, 394), (2088, 394), (1126, 394), (2590, 392), (2369, 392), (2324, 392), (2604, 390), (2542, 390), (2607, 388), (2220, 388), (2073, 388), (1804, 388), (2188, 386), (2172, 384), (2059, 384), (1390, 384), (2150, 382), (1943, 382), (2526, 380), (2428, 380), (1946, 380), (1833, 380), (2601, 378), (2331, 378), (2201, 378), (1917, 378), (1612, 378), (1377, 378), (2090, 376), (1938, 376), (1621, 376), (376, 376), (2624, 374), (2564, 374), (2384, 374), (2118, 374), (