## This is a MapReduce code for join two tables

We have two types of tables. "foodcalorie*.txt" stores pairs of (food,calorie).

chicken,108<br />
beef,140<br />
chicken,127<br />
shrimp,65<br />
shrimp,89<br />
pork,98<br />
turkey,45<br />
asparagus,30<br />
asparagus,56<br />
broccoli,20<br />
broccoli,18<br />
carrot,22<br />
...<br />

"foodtype*.txt" stores pairs of (food,type).

chicken,meat<br />
beef,meat<br />
egg,meat<br />
turkey,meat<br />
asparagus,vegetable<br />
broccoli,vegetable<br />
carrot,vegetable<br />
...<br />

We want to join two types of tables and get <food, {calorie,type}> in each row.


In [8]:
#join1_mapper.py
# the mapper use food as key. Values can be calorie or type.
#!/usr/bin/env python
import sys
for line in sys.stdin:
	line = line.strip()
	key_value = line.split(",")
	if(len(key_value)==1):
		continue
	
	if key_value[1].isdigit():
		food = key_value[0]
		calorie = key_value[1]
		print('%s\t%s' % (food,calorie))
	else:
		food = key_value[0]
		type = key_value[1]
		print('%s\t%s' % (food,type))

In [10]:
#join1_reducer.py
# the reducer generate <food, {calorie, type}>
#!/usr/bin/env python
import sys

prev_word  = ""   #initialize previous word  to blank string
calorie_to_output    = [] #an empty list to hold calories for current food
type_to_output = ""
curr_word = ""
for line in sys.stdin:
	line       = line.strip()   
	key_value  = line.split('\t')
	curr_word  = key_value[0]    
	value_in   = key_value[1]  
	if curr_word != prev_word:
		if prev_word!="":
			for i in range(len(calorie_to_output)):
				print('{0}\t{1} {2}'.format(prev_word,calorie_to_output[i],type_to_output))
			calorie_to_output   =[]
			type_to_output = ""
			prev_word = curr_word  #set up previous word for the next set of input lines
	if (value_in.isdigit()):
		calorie_to_output.append(value_in)
	else:
		type_to_output = value_in  #if the value field was just the total count then its
if curr_word == prev_word and prev_word!="":
	for i in range(len(calorie_to_output)):
		print('{0}\t{1} {2}'.format(prev_word,calorie_to_output[i],type_to_output))



## To debug: 

cat food*.txt | python join1_mapper.py | sort | python join1_reducer.py

## To run with hadoop on cloudera:
    
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \<br />
   -input /user/cloudera/input \<br />
   -output /user/cloudera/output_new \<br />
   -mapper /home/cloudera/join1_mapper.py \<br />
   -reducer /home/cloudera/join1_reducer.py<br />

## get result:

asparagus	30 vegetable<br />
asparagus	56 vegetable<br />
beef	140 meat<br />
broccoli	18 vegetable<br />
broccoli	20 vegetable<br />
carrot	22 vegetable<br />
carrot	35 vegetable<br />
cauliflower	36 vegetable<br />
chicken	108 meat<br />
chicken	127 meat<br />
cucumber	29 vegetable<br />
egg	49 meat<br />
fish	70 meat<br />
fish	99 meat<br />
onion	19 vegetable<br />
onion	19 vegetable<br />
pork	98 meat<br />
shrimp	65 meat<br />
shrimp	89 meat<br />
squash	32 vegetable<br />
turkey	45 meat<br />