# Hadoop Introduction

Hadoop 1.0 consists of two components:

**1. Hadoop Distributed File Systems (HDFS):** 
    * Distributes data across different nodes
    * Replicates data to increase fault tolerance and reduce data loss. Default replication factor is 3.0
        
**2. MapReduce (MR): **
    * Distributed computing framework.
    * A programn involves series of _Mapper_ and _Reducer_ phase. 
    

    



## MapReduce Tutorial##

* Introduce Mapper/Reducer Phase
* Introduce Combiner
* Introduce Join Optimization

## Download MovieLens Dataset
Download MovieLens dataset from [here](http://grouplens.org/datasets/movielens/). It contains user info, movie info and movie ratings by different users. As a motivating example consider the problem of ranking movies by taking average rating score. 

In [2]:
%%bash
# Download dataset:
wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
unzip ml-100k.zip


Archive:  ml-100k.zip
   creating: ml-100k/
  inflating: ml-100k/allbut.pl       
  inflating: ml-100k/mku.sh          
  inflating: ml-100k/README          
  inflating: ml-100k/u.data          
  inflating: ml-100k/u.genre         
  inflating: ml-100k/u.info          
  inflating: ml-100k/u.item          
  inflating: ml-100k/u.occupation    
  inflating: ml-100k/u.user          
  inflating: ml-100k/u1.base         
  inflating: ml-100k/u1.test         
  inflating: ml-100k/u2.base         
  inflating: ml-100k/u2.test         
  inflating: ml-100k/u3.base         
  inflating: ml-100k/u3.test         
  inflating: ml-100k/u4.base         
  inflating: ml-100k/u4.test         
  inflating: ml-100k/u5.base         
  inflating: ml-100k/u5.test         
  inflating: ml-100k/ua.base         
  inflating: ml-100k/ua.test         
  inflating: ml-100k/ub.base         
  inflating: ml-100k/ub.test         


--2016-03-22 21:10:10--  http://files.grouplens.org/datasets/movielens/ml-100k.zip
Resolving files.grouplens.org... 128.101.34.146
Connecting to files.grouplens.org|128.101.34.146|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4924029 (4.7M) [application/zip]
Saving to: “ml-100k.zip”

     0K .......... .......... .......... .......... ..........  1%  436K 11s
    50K .......... .......... .......... .......... ..........  2%  623K 9s
   100K .......... .......... .......... .......... ..........  3% 10.7M 6s
   150K .......... .......... .......... .......... ..........  4% 53.6M 5s
   200K .......... .......... .......... .......... ..........  5%  957K 5s
   250K .......... .......... .......... .......... ..........  6% 3.55M 4s
   300K .......... .......... .......... .......... ..........  7% 1.26M 4s
   350K .......... .......... .......... .......... ..........  8% 14.8M 3s
   400K .......... .......... .......... .......... ..........  9% 5.83M 3s
   

## Traditional Approach -- Attempt 1
1. Read data and use a HashMap where key is the movie id and value is an array of all the scores. 
2. Compute average score for each movie id
3. Sort by score in descending order and list top 10 movies

In [82]:
# Step 1: Read data and store it in HashMap
from collections import defaultdict
data = defaultdict(list) 
with open("ml-100k/u.data", "r") as fp:
    for line in fp:
        (user, movie, rating, ts) = line.strip().split("\t")
        data[int(movie)].append(float(rating))

# Step 2: Compute Mean
means = []
for key, values in data.items():
    mean = sum(values)/len(values)
    means.append((key, mean))


result = sorted(means, key=lambda x: x[1], reverse=True)

for idx in xrange(10):
    print idx, result[idx]

0 (814, 5.0)
1 (1122, 5.0)
2 (1189, 5.0)
3 (1201, 5.0)
4 (1293, 5.0)
5 (1467, 5.0)
6 (1500, 5.0)
7 (1536, 5.0)
8 (1599, 5.0)
9 (1653, 5.0)


# Traditional Approach -- Attemp 2

Instead of storing scores, let's keep track of sum and count.

In [10]:
def defaultRecord():
    """Returns initial value for the hashmap"""
    return (0.0, 0.0) # elem 1 will store sum and element 2 will store count

# Step 1: Read data and store sum and length
from collections import defaultdict
data = defaultdict(defaultRecord)

with open("ml-100k/u.data", "r") as fp:
    for line in fp:
        (user, movie, rating, ts) = line.strip().split("\t")
        movie = int(movie)
        
        # Compute number of users and total rating
        prev = data[movie]
        total = prev[0] + float(rating)
        cnt = prev[1] + 1
        
        # Add Updated Record
        data[movie] = (total, cnt)

# Step 2: Compute Mean
means = []
for movie, scores in data.items():
    mean =  scores[0]/scores[1] # total/cnt
    means.append((movie, mean))
    
# Step 3: Sort movies by mean score and print top 10 movies
result = sorted(means, key=lambda x: x[1], reverse=True)
for idx in xrange(10):
    print result[idx]


(814, 5.0)
(1122, 5.0)
(1189, 5.0)
(1201, 5.0)
(1293, 5.0)
(1467, 5.0)
(1500, 5.0)
(1536, 5.0)
(1599, 5.0)
(1653, 5.0)


## Map Reduce -- Attempt 1
Constraints: Not enough memory 

In [11]:
%%writefile mapper1.py

# Mapper Program
# Extract movie and rating for each movie
import sys
for line in sys.stdin:
    (user, movie, rating, ts) = line.strip().split("\t")
    print "{0}\t{1}".format(movie, rating)
    

Writing mapper1.py


In [1]:
%%writefile reducer1.py
import sys

cur_scores = 0.0 # total 
cur_cnt = 0.0 # number of elements observed
cur_movie = 0 # current movie 

for line in sys.stdin:

    movie, rating = line.strip().split("\t")
    movie = int(movie)
    
    if cur_movie == movie: 
        cur_cnt += 1
        cur_scores += float(rating)
    else:
        if cur_movie != 0:
            print "{0}\t{1}".format(cur_movie, cur_scores/cur_cnt)
        
        cur_movie = movie
        cur_cnt = 1.0
        cur_scores = float(rating)

print "{0}\t{1}".format(cur_movie, cur_scores/cur_cnt)

Writing reducer1.py


In [9]:
%%bash 
cat ml-100k/u.data | python mapper1.py | sort -t$'\t' -k1,1 | python reducer1.py | sort -t$'\t' -k2,2 -nr | head -n 10

814	5.0
1653	5.0
1599	5.0
1536	5.0
1500	5.0
1467	5.0
1293	5.0
1201	5.0
1189	5.0
1122	5.0


sort: write failed: standard output: Broken pipe
sort: write error


In [10]:
%%writefile hash.py
# simulates shuffle 
import sys
for line in sys.stdin:
    tokens = line.strip().split("\t")
    id = int(tokens[0]) % 5
    with open("r{0}_{1}".format(id, sys.argv[1]), "a+") as myfile:
        myfile.write(line)
        

Overwriting hash.py


In [11]:
%%bash
mkdir -p tmp
rm -rf tmp/*
shuf ml-100k/u.data  > tmp/shuffle.tab
cd tmp

split -l 20000 shuffle.tab 

# Mapper Phase #
cat xaa | python ../mapper1.py | python ../hash.py xaa
cat xab | python ../mapper1.py | python ../hash.py xab
cat xac | python ../mapper1.py | python ../hash.py xac
cat xad | python ../mapper1.py | python ../hash.py xad
cat xae | python ../mapper1.py | python ../hash.py xae

# Hash Key to decide reducer
cat r0_* | sort -t$'\t' -k1,1 | python ../reducer1.py > o
cat r1_* | sort -t$'\t' -k1,1 | python ../reducer1.py >> o
cat r2_* | sort -t$'\t' -k1,1 | python ../reducer1.py >> o
cat r3_* | sort -t$'\t' -k1,1 | python ../reducer1.py >> o
cat r4_* | sort -t$'\t' -k1,1 | python ../reducer1.py >> o

# Sort result
cat o | sort -t$'\t' -k2,2 -nr | head -n 10




814	5.0
1653	5.0
1599	5.0
1536	5.0
1500	5.0
1467	5.0
1293	5.0
1201	5.0
1189	5.0
1122	5.0


sort: write failed: standard output: Broken pipe
sort: write error


## Map Reduce -- Attempt 2 
Using Combiners

In [16]:
%%writefile mapper2.py

# Mapper Program
# Extract movie and rating for each movie
import sys
for line in sys.stdin:
    (user, movie, rating, ts) = line.strip().split("\t")
    print "{0}\t{1}\t1.0".format(movie, rating)

Writing mapper2.py


In [14]:
%%writefile reducer2.py

# Acts as combiner as well as reducer. Mode can be controlled via first argument. 

import sys 
cur_scores = 0.0 # stores score
cur_cnt = 0.0 # stores count
cur_movie = 0 # store current movie

for line in sys.stdin:

    movie, score, cnt = line.strip().split("\t")
    movie = int(movie)

    if cur_movie == movie: 
        cur_cnt += float(cnt)
        cur_scores += float(score)
    else:
        if cur_movie != 0:
            if sys.argv[1] == "combiner":
                print "{0}\t{1}\t{2}".format(cur_movie, cur_scores,cur_cnt)
            else:
                print "{0}\t{1}".format(cur_movie, cur_scores/cur_cnt)                

        cur_movie = movie
        cur_cnt = float(cnt)
        cur_scores = float(score)


    

Overwriting reducer2.py


In [17]:
%%bash
mkdir -p tmp
rm -rf tmp/*
shuf ml-100k/u.data  > tmp/shuffle.tab
cd tmp

split -l 20000 shuffle.tab 

# Mapper Phase
cat xaa | python ../mapper2.py | python ../hash.py xaa
cat xab | python ../mapper2.py | python ../hash.py xab
cat xac | python ../mapper2.py | python ../hash.py xac
cat xad | python ../mapper2.py | python ../hash.py xad
cat xae | python ../mapper2.py | python ../hash.py xae

# Combiner Phase
cat r0_* | sort -t$'\t' -k1,1 | python ../reducer2.py combiner > r0
cat r1_* | sort -t$'\t' -k1,1 | python ../reducer2.py combiner > r1
cat r2_* | sort -t$'\t' -k1,1 | python ../reducer2.py combiner > r2
cat r3_* | sort -t$'\t' -k1,1 | python ../reducer2.py combiner > r3
cat r4_* | sort -t$'\t' -k1,1 | python ../reducer2.py combiner > r4

# Reducer Phase
cat r0 | sort -t$'\t' -k1,1 | python ../reducer2.py reducer > o
cat r1 | sort -t$'\t' -k1,1 | python ../reducer2.py reducer >> o
cat r2 | sort -t$'\t' -k1,1 | python ../reducer2.py reducer >> o
cat r3 | sort -t$'\t' -k1,1 | python ../reducer2.py reducer >> o
cat r4 | sort -t$'\t' -k1,1 | python ../reducer2.py reducer >> o

# Sort result
cat o | sort -t$'\t' -k2,2 -nr | head -n 10



814	5.0
1653	5.0
1599	5.0
1536	5.0
1500	5.0
1467	5.0
1293	5.0
1201	5.0
1189	5.0
1122	5.0


sort: write failed: standard output: Broken pipe
sort: write error


# Join

## Reduce Join

In [2]:
%%writefile itemMapper.py
#!/usr/bin/env python

import sys


for line in sys.stdin:
    id, title, addn = line.strip().split('|', 2)
    print "{0}\t{1}\tTitle".format(id, title)

Overwriting itemMapper.py


In [3]:
%%writefile reduceJoin.py
#!/usr/bin/env python

import sys

title=None
mean=None
id=None

for line in sys.stdin:
    tokens = line.strip().split("\t")
    if tokens[0] != id:
        if id != None:
            print "{0}\t{1}\t{2}".format(id, title, mean)
        id=tokens[0]
        title = None
        mean = None

    if len(tokens) == 2:
        mean = tokens[1]
    else:
        title = tokens[1]

Overwriting reduceJoin.py


In [5]:
%%bash
cd tmp/
cat ../ml-100k/u.item | python ../itemMapper.py > itemTitles.tab
cat o itemTitles.tab | sort -t$'\t' -k1,1 | python ../reduceJoin.py | more

1	Toy Story (1995)	3.87831858407
10	Richard III (1995)	3.83146067416
100	Fargo (1996)	4.15551181102
1000	Lightning Jack (1994)	3.0
1001	Stupids, The (1996)	2.0
1002	Pest, The (1997)	1.875
1003	That Darn Cat! (1997)	2.25
1004	Geronimo: An American Legend (1993)	3.11111111111
1005	Double vie de V�ronique, La (Double Life of Veronique, The) (1991)	3.68181818182
1006	Until the End of the World (Bis ans Ende der Welt) (1991)	2.82608695652
1007	Waiting for Guffman (1996)	4.12765957447
1008	I Shot Andy Warhol (1996)	3.37837837838
1009	Stealing Beauty (1996)	3.375
101	Heavy Metal (1981)	3.2602739726
1010	Basquiat (1996)	3.25
1011	2 Days in the Valley (1996)	3.22580645161
1012	Private Parts (1997)	3.53
1013	Anaconda (1997)	2.28947368421
1014	Romy and Michele's High School Reunion (1997)	3.0612244898
1015	Shiloh (1997)	2.83333333333
1016	Con Air (1997)	3.4598540146
1017	Trees Lounge (1996)	3.24
1018	Tie Me Up! Tie Me Down! (1990)	3.1875
1019	Die xue shuang xiong (Killer, The) (1989)	3.9677419354

## Secondary Sorting 

In [11]:
%%writefile reformatMean.py
#!/usr/bin/env python
import sys

for line in sys.stdin:
    id, mean = line.strip().split("\t")
    print "{0}|0\t{1}".format(id, mean)
    

Overwriting reformatMean.py


In [13]:
%%writefile itemMapper1.py
#!/usr/bin/env python

import sys

for line in sys.stdin:
    id, title, addn = line.strip().split('|', 2)
    print "{0}|1\t{1}".format(id, title)

Overwriting itemMapper1.py


In [15]:
%%writefile reduceJoin1.py
#!/usr/bin/env python

import sys

id = None
mean = None
for line in sys.stdin:
    curId, value = line.strip().split('\t', 2)
    curId = curId.split('|')[0]
    if id != curId:
        id = curId
        mean = value
    else:
        print "{0}\t{1}\t{2}".format(id, value, mean)


Overwriting reduceJoin1.py


In [16]:
%%bash
cd tmp/
cat o | python ../reformatMean.py > o1.tab
cat ../ml-100k/u.item | python ../itemMapper1.py > o2.tab
cat o1.tab o2.tab | sort -t$'\t' -k1,1 | python ../reduceJoin1.py | more

1000	Lightning Jack (1994)	3.0
1001	Stupids, The (1996)	2.0
1002	Pest, The (1997)	1.875
1003	That Darn Cat! (1997)	2.25
1004	Geronimo: An American Legend (1993)	3.11111111111
1005	Double vie de V�ronique, La (Double Life of Veronique, The) (1991)	3.68181818182
1006	Until the End of the World (Bis ans Ende der Welt) (1991)	2.82608695652
1007	Waiting for Guffman (1996)	4.12765957447
1008	I Shot Andy Warhol (1996)	3.37837837838
1009	Stealing Beauty (1996)	3.375
1010	Basquiat (1996)	3.25
1011	2 Days in the Valley (1996)	3.22580645161
1012	Private Parts (1997)	3.53
1013	Anaconda (1997)	2.28947368421
1014	Romy and Michele's High School Reunion (1997)	3.0612244898
1015	Shiloh (1997)	2.83333333333
1016	Con Air (1997)	3.4598540146
1017	Trees Lounge (1996)	3.24
1018	Tie Me Up! Tie Me Down! (1990)	3.1875
1019	Die xue shuang xiong (Killer, The) (1989)	3.96774193548
1020	Gaslight (1944)	3.88571428571
1021	8 1/2 (1963)	3.81578947368
1022	Fast, Cheap & Out of Control (1997)	3.4375
1023	Fathers' Day (

## MapSide Join