In [1]:
def mapper() :
    for line in sys.stdin :
        data = line.strip().split("\t")
        date, time, store, item, cost, payment = data
        print ("{0}\t{1}".format(store, cost))

    

데이터가 많을수록 데이터의 예외가 많기 때문에, 어떤 데이터를 받더라도 Mapper가 계속 동작하도록 해야한다.

따라서 MapReudce 코드에서는 Defensive하게 작성하는 것이 중요하다.

In [3]:
# 항목이 6개가 아닐 경우
def mapper() :
    import sys
    
    for line in sys.stdin :
        data = line.strip().split("\t")
        
        if len(data) == 6 :
            date, time, store, item, cost, payment = data
            print ("{0}\t{1}".format(store, cost))


Reducer는 (Key, Value) 쌍을 "Hadoop Streaming"을 통해 받는다. 

Hadoop에서 자체적으로 Shuffle & Sort를 한다. 

In [4]:
def reducer() :
    import sys
    
    salesTotal = 0
    oldKey = None

    for line in sys.stdin :
        data = line.strip().split("\t")
            # store, sales
        if len(data) != 2 :
            continue
            
        thisKey, thisSale = data
            # thisKey = store, thisSale = sale
        
        if oldKey and oldKey != thisKey :
            print ("{0}\t{1}".format(oldKey, salesTotal))
            
            salesTotal = 0
        
        oldKey = thisKey
        salesTotal += float(thisSale)
        
        # 하지만 마지막 Key가 출력되지 않았다 !!
        
        if oldKey != None :
            print ("{0}\t{1}".format(oldKey, salesTotal))

이제 Hadoop을 사용하기 전에, 터미널을 통해 미리 Mapper와 Reducer를 확인해 볼 것이다.

In [21]:
# purchase의 10행만 실험을 위해 사용
! head -n 10 purchases.txt > test.txt
! cat test.txt

2012-01-01	09:00	San Jose	Men's Clothing	214.05	Amex
2012-01-01	09:00	Fort Worth	Women's Clothing	153.57	Visa
2012-01-01	09:00	San Diego	Music	66.08	Cash
2012-01-01	09:00	Pittsburgh	Pet Supplies	493.51	Discover
2012-01-01	09:00	Omaha	Children's Clothing	235.63	MasterCard
2012-01-01	09:00	Stockton	Men's Clothing	247.18	MasterCard
2012-01-01	09:00	Austin	Cameras	379.6	Visa
2012-01-01	09:00	New York	Consumer Electronics	296.8	Cash
2012-01-01	09:00	Corpus Christi	Toys	25.38	Discover
2012-01-01	09:00	Fort Worth	Toys	213.88	Visa


In [22]:
! cat test.txt | python mapper.py

San Jose	214.05
Fort Worth	153.57
San Diego	66.08
Pittsburgh	493.51
Omaha	235.63
Stockton	247.18
Austin	379.6
New York	296.8
Corpus Christi	25.38
Fort Worth	213.88


In [23]:
# 이번에는 Reducer까지 사용
# Sorting은 직접 해준다 (Hadoop에서는 자동)
! cat test.txt | python mapper.py | sort | python reducer.py

Austin	379.6
Corpus Christi	25.38
Fort Worth	367.45
New York	296.8
Omaha	235.63
Pittsburgh	493.51
San Diego	66.08
San Jose	214.05
Stockton	247.18


##### - Implementation
Mapping, Sorting, Reducing을 하는 쉘 & 파이썬 문법은 다음과 같다.

In [2]:
# Mapper

import sys


#with open('./purchases.txt.ignore') as f:
#        for line in f :
for line in sys.stdin :

    data = line.strip().split("\t")

    if len(data) == 6 :
        date, time, store, item, cost, payment = data
        print("{0}\t{1}".format(item, cost))


In [3]:
# Sorting

import sys

temp_list = list()
i = 0

for line in sys.stdin :
    item = line.strip().split("\t")
    temp_list.append(tuple(item))

temp_list = sorted(temp_list, key = lambda temp_list : temp_list[0])

for item in temp_list :
    print(item)


In [4]:
# Reducer


import sys
import re

oldKey = None
salesTotal = 0

r = re.compile(r'\([\'\"]([\w \']+)[\'\"][ .,]*[\'\"](\d+.?\d*)[\'\"]\)')

for line in sys.stdin :
    m = r.search(line.strip())
    data = m.group(1,2)

    if len(data) != 2 :
        continue
    thisKey, thisSale = data

    if oldKey and oldKey != thisKey :
        print ("{0}\t{1}".format(oldKey, salesTotal))

        salesTotal = 0

    oldKey = thisKey
    salesTotal += float(thisSale)

if oldKey != None :
    print ("{0}\t{1}".format(oldKey, salesTotal))

In [6]:
! cat purchases.txt.ignore | python3 Project1_mapper.py | python3 Project1_sorter.py | \
python3 Project1_reducer.py

Baby	57491808.43999965
Books	57450757.91000004
CDs	57410753.04000111
Cameras	57299046.64000087
Children's Clothing	57624820.94000126
Computers	57315406.319999866
Consumer Electronics	57452374.12999909
Crafts	57418154.50000017
DVDs	57649212.13999929
Garden	57539833.109999545
Health and Beauty	57481589.560001
Men's Clothing	57621279.04000138
Music	57495489.700000465
Pet Supplies	57197250.24000008
Sporting Goods	57599085.890000574
Toys	57463477.10999907
Video Games	57513165.5800005
Women's Clothing	57434448.96999881


위와 동일한 작업을 수행하는 하둡 명령어는 아래와 같다.

In [8]:
!hadoop jar <jar path> \
-file /home/cloudera/Repo/mapper.py -mapper mapper.py \
-file /home/cloudera/Repo/reducer.py -reducer reducer.py \
-input inputt/ -output outputdir

/bin/sh: 1: cannot open jar: No such file
/bin/sh: 1: hadoop: not found


이를 통해 동일한 결과물을 얻을 수 있다.

사실 하둡에서 쓴 파이썬 코드와 쉘에서 쓴 파이썬 코드는 약간 다른데,

쉘에 내장된 sort 프로그램을 사용하지 않고 직접 Sorting 코드를 구현했기에 Reducer 코드도 약간 달라졌다!