In [1]:
# Read the clearance.txt file from HDFS into an RDD.
lines = sc.textFile('hdfs:/user/cloudera/clearance.txt')
lines.take(5)

['COMMODITY_DESC,HS6',
 'HCPL-2631-500E X 1000 MAX9260GCB/V+T  X 750 MAX/14+ 750P/REEL AAVAGO/15+ 1K/REEL SHIP DATE 21TH MAR PO#1094090#1094087 BANKFEES USD25                 NOT GIVEN,854290',
 '00A-RAMAN SH 00A-RAMAN-SH (RAMAN SAMPLE H         OLDER),847330',
 'RIBBON                                            BALLET SHOES,580632',
 'AIR CRAFT SPARE PARTS,847150']

In [2]:
# print out total number of lines
lines.count()

5001

In [3]:
desc_code_pairs = lines.map(lambda l : l.split(","))
desc_code_pairs.take(10)

[['COMMODITY_DESC', 'HS6'],
 ['HCPL-2631-500E X 1000 MAX9260GCB/V+T  X 750 MAX/14+ 750P/REEL AAVAGO/15+ 1K/REEL SHIP DATE 21TH MAR PO#1094090#1094087 BANKFEES USD25                 NOT GIVEN',
  '854290'],
 ['00A-RAMAN SH 00A-RAMAN-SH (RAMAN SAMPLE H         OLDER)', '847330'],
 ['RIBBON                                            BALLET SHOES', '580632'],
 ['AIR CRAFT SPARE PARTS', '847150'],
 ['CHOCOLATE SAMPLE(SAMPLES ONLY NOT FOR RESALE)', '180690'],
 ['14KR DIA- SG SEE INVOICE 14KR DIA SET RING GOLD 66RD LABOUR DIA USED #080716-2HK; 300716HK 14KY DIA RING GOLD 56 RD 4PC LABOUR GROOVING DIA USED #270716HK 14KW DIA SET RING GOLD 66 RD LABOUR DIA USED',
  '711319'],
 ['SPECTACLE FRAME AND SUNGLASS WITH CASE', '490900'],
 ['PATCH CARD 8544.4299 HARNESS/PATCHCORD 85         44.4299 HARNESS/PATCHCORD 0367560023 NCR          484-0101588 0366670025 NCR 445-0732884',
  '854442'],
 ['AIRCRAFT ENGINE PARTS COOLER AIR NO 4 BRG P/N D187', '980100']]

In [4]:
# Transfer desc-code pairs to key-code tuples
def extractKeyWord(desc_code):
    return [(key, desc_code[1]) for key in desc_code[0].split(" ") if key != "" ]

key_code_tuples =  desc_code_pairs.flatMap(extractKeyWord)
key_code_tuples.take(20)

[('COMMODITY_DESC', 'HS6'),
 ('HCPL-2631-500E', '854290'),
 ('X', '854290'),
 ('1000', '854290'),
 ('MAX9260GCB/V+T', '854290'),
 ('X', '854290'),
 ('750', '854290'),
 ('MAX/14+', '854290'),
 ('750P/REEL', '854290'),
 ('AAVAGO/15+', '854290'),
 ('1K/REEL', '854290'),
 ('SHIP', '854290'),
 ('DATE', '854290'),
 ('21TH', '854290'),
 ('MAR', '854290'),
 ('PO#1094090#1094087', '854290'),
 ('BANKFEES', '854290'),
 ('USD25', '854290'),
 ('NOT', '854290'),
 ('GIVEN', '854290')]

In [14]:
# tried to use readuceByKey

# function to create required data structure - return ({code : codeCount}, totalCodes)
def valueCombiner(value1, value2):
    dict1 = value1[0]
    total1 = value1[1]
    dict2 = value2[0]
    total2 = value2[1]
        
    for (key2, count2) in dict2.items():
        if (key2 in dict1):
            dict1[key2] = dict1[key2] + count2
        else:
            dict1[key2] = count2
        
    total1 += total2
        
    return (dict1, total1)

# function to print out final result
def printResult(keyValue):
    key = keyValue[0]
    codeDict = keyValue[1][0]
    total = keyValue[1][1]
    #initialize result
    result = ""
    for (code, count) in codeDict.items():
        perc = str("{0}%".format(100 * count//total))
        result += ", " + code + "(" + perc + ")"
    
    return(key, result[2:])
    

counts = key_code_tuples.map(lambda kctuple : (kctuple[0], ({kctuple[1]:1}, 1))) \
                        .reduceByKey(valueCombiner) \
                        .map(printResult) \
                        .sortByKey() 
                        
        
counts.take(10)
# counts.lookup("SCARVES")
counts.coalesce(1).saveAsTextFile('hdfs:/user/cloudera/outputDir')

In [128]:
# create code combiner - return ({code : codeCount}, totalCodes)
def createCodeCombiner(code):
    return ({code : 1 }, 1)

# # code combiner function
def codeCombiner(value1, code):
    dict1 = value1[0]
    total = value1[1]
      
    if (code in dict1):
        dict1[code] = dict1[code] + 1
    else:
        dict1[code] = 1
    total += 1
        
    return (dict1, total)

# def codeMerger()
def valueMerger(value1, value2):
    dict1 = value1[0]
    total1 = value1[1]
    dict2 = value2[0]
    total2 = value2[1]
        
    for (key2, count2) in dict2.items():
        if (key2 in dict1):
            dict1[key2] = dict1[key2] + count2
        else:
            dict1[key2] = count2
        
    total1 += total2
        
    return (dict1, total1)

# function to print out final result
def mapResult1(keyValue):
    key = keyValue[0]
    codeDict = keyValue[1][0]
    total = keyValue[1][1]
    #initialize result
    result = ""
    for (code, count) in codeDict.items():
        perc = str("{0}%".format(100*count//total))
        result += ", " + code + "(" + perc + ")"
    
    return(key, result[2:])

key_code_counts = key_code_tuples.combineByKey(createCodeCombiner, codeCombiner, valueMerger) \
                                 .map(mapResult1) \
                                 .sortByKey()
key_code_counts.take(10)

[('""',
  '848690(6%), 580632(6%), 711319(6%), 600642(6%), 900130(12%), 490199(43%), 841989(12%), 870899(6%)'),
 ('"""', '870899(100%)'),
 ('"""100', '610910(100%)'),
 ('"""GLOVE', '902790(100%)'),
 ('"""PARTS', '847170(100%)'),
 ('"""WAFER', '854129(100%)'),
 ('""BUTTERFLY', '940360(100%)'),
 ('""CS300R-13""', '841989(100%)'),
 ('""D""', '854442(100%)'),
 ('""FLG', '300691(100%)')]