### Map Reduce Advanced - Count number of friends

In [None]:
import sys
from collections import OrderedDict
class MapReduce:
    def __init__(self):
        self.intermediate = OrderedDict()
        self.result = []
   
    def emitIntermediate(self, key, value):
        self.intermediate.setdefault(key, [])
        self.intermediate[key].append(value)

    def emit(self, value):
        self.result.append(value) 

    def execute(self, data, mapper, reducer):
        
        for record in data: #here, instead of processing all the data in one go, the data is processed per line
                            #in other word, the task is splitted/mapped
            mapper(record)  #intermediate --> key: person's name, values: list of friends' name
            
        for key in self.intermediate:
            reducer(key, self.intermediate[key])
        
        self.result.sort()
        for item in self.result:
            print "{\"key\":\""+item[0]+"\",\"value\":\"" + str(item[1]) + "\"}"

mapReducer = MapReduce()

def mapper(record):
    #Start writing the Map code here
    v1, v2 = record.split()
    mapReducer.emitIntermediate(v1, v2)
    mapReducer.emitIntermediate(v2, v1)

def reducer(key, list_of_values):
    #Start writing the Reduce code here
    mapReducer.emit([key, len(list_of_values)])

if __name__ == '__main__':
    inputData = []
    for line in sys.stdin:
        inputData.append(line)
    mapReducer.execute(inputData, mapper, reducer)

In [2]:
from collections import OrderedDict
intermediate = OrderedDict()

def emitIntermediate(key, value):
    intermediate.setdefault(key, [])       
    intermediate[key].append(value)

data = [['Joe', 'Sue'], ['Sue', 'Phi'], ['Phi', 'Joe'], ['Phi', 'Alice']]

for record in data:
    v1, v2 = record[0], record[1]
    emitIntermediate(v1, v2)
    emitIntermediate(v2, v1)

print(intermediate)

for key, items in intermediate.items():
    print(key, items)

OrderedDict([('Joe', ['Sue', 'Phi']), ('Sue', ['Joe', 'Phi']), ('Phi', ['Sue', 'Joe', 'Alice']), ('Alice', ['Phi'])])
Joe ['Sue', 'Phi']
Sue ['Joe', 'Phi']
Phi ['Sue', 'Joe', 'Alice']
Alice ['Phi']


### Map Reduce Advanced - Relational Join

In [None]:
import sys
from collections import OrderedDict
class MapReduce:
    def __init__(self):
        self.intermediate = OrderedDict()
        self.result = []
   
    def emitIntermediate(self, key, value):
        self.intermediate.setdefault(key, [])       
        self.intermediate[key].append(value)

    def emit(self, value):
        self.result.append(value) 

    def execute(self, data, mapper, reducer):
        for record in data:
            mapper(record) 
            
        # print(self.intermediate)
            
        for key in self.intermediate:
            reducer(key, self.intermediate[key])

        self.result.sort()
        for item in self.result:
            print item

mapReducer = MapReduce()

def mapper(record):
    #Start writing the Map code here
    words = record.rstrip().split(',')
    if words[0]=='Department':
        mapReducer.emitIntermediate(words[1],('D',words[2]))
    elif words[0]=='Employee':
        mapReducer.emitIntermediate(words[2],('E',words[1]))

def reducer(key, list_of_values):
    #Start writing the Reduce code here
    # list_of_values.sort()
    # name = list_of_values[-1][-1]
    # for d in list_of_values:
    #     if d[0] == 'D':
    #         mapReducer.emit((key, name, d[-1]))
    name = [item[1] for item in list_of_values if item[0] == 'E'][0]
    for item in [item[1] for item in list_of_values if item[0] == 'D']:
        mapReducer.emit((key, name, item))

if __name__ == '__main__':
  inputData = []
  for line in sys.stdin:
   inputData.append(line)
  mapReducer.execute(inputData, mapper, reducer)