In [13]:
import pandas as pd
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import sys

import findspark
findspark.init()

In [28]:
# create spark conf
conf = SparkConf()
conf.setAppName('Ch04')
conf.setMaster('local')

# create spark context
sc = SparkContext(conf=conf)

# create paired RDD
# Each line in the file contains a transaction date, time, customer ID, 
# product ID, quantity, and product price, delimited with hash signs.
tranFile = sc.textFile('./../first-edition/ch04/ch04_data_transactions.txt')
tranData = tranFile.map(lambda line: line.split('#'))
transByCust = tranData.map(lambda t: (int(t[2]), t))

In [29]:
# find the number of unique customer IDs
numCustomers = transByCust.keys().distinct().count()
print('Num of Customers: {}'.format(numCustomers))
print()

# investigate count by keys
print(transByCust.countByKey())
print()

# we can find the number of transactions by summing the values
numOfTrans = sum([v for (k, v) in transByCust.countByKey().items()])
print('Num of transactions: {}'.format(numOfTrans))
print()

# find customer who made most purchases
import operator 
(cid, purch) = sorted(transByCust.countByKey().items(), key=operator.itemgetter(1))[-1]
print('customer with most purchases: {} ({} purchases)'.format(cid, purch))
print()

# we need to give this person a complimentary bear doll
complTrans = [['2018-05-31', '12:00 PM', '53', '4', '1', '0.00']]

Num of Customers: 100

defaultdict(<class 'int'>, {51: 18, 99: 12, 79: 13, 86: 9, 63: 12, 23: 13, 49: 8, 97: 12, 94: 12, 91: 13, 20: 8, 38: 9, 46: 9, 56: 17, 11: 8, 59: 9, 8: 10, 85: 9, 27: 7, 84: 9, 54: 7, 74: 11, 6: 7, 35: 10, 39: 11, 47: 13, 17: 13, 40: 10, 57: 8, 80: 7, 87: 10, 52: 9, 30: 5, 62: 6, 41: 12, 71: 10, 61: 8, 95: 8, 5: 11, 2: 15, 78: 11, 13: 12, 4: 11, 100: 12, 19: 6, 98: 11, 53: 19, 89: 9, 15: 10, 45: 11, 12: 7, 32: 14, 16: 8, 1: 9, 72: 7, 14: 8, 7: 10, 28: 11, 43: 12, 93: 12, 70: 8, 73: 7, 65: 10, 50: 14, 3: 13, 69: 7, 60: 4, 76: 15, 66: 11, 90: 8, 10: 7, 34: 14, 83: 12, 64: 10, 18: 9, 81: 9, 44: 8, 21: 13, 88: 5, 58: 13, 24: 9, 26: 11, 77: 11, 36: 5, 22: 10, 31: 14, 96: 8, 29: 9, 33: 9, 68: 12, 75: 10, 25: 12, 48: 5, 82: 13, 9: 7, 67: 5, 37: 7, 55: 13, 92: 8, 42: 7})

Num of transactions: 1000

customer with most purchases: 53 (19 purchases)



In [30]:
# Find transactions made by person with the most purchases
for t in transByCust.lookup(53):
    print(', '.join(t))

2015-03-30, 6:18 AM, 53, 42, 5, 2197.85
2015-03-30, 4:42 AM, 53, 44, 6, 9182.08
2015-03-30, 2:51 AM, 53, 59, 5, 3154.43
2015-03-30, 5:57 PM, 53, 31, 5, 6649.27
2015-03-30, 6:11 AM, 53, 33, 10, 2353.72
2015-03-30, 9:46 PM, 53, 93, 1, 2889.03
2015-03-30, 4:15 PM, 53, 72, 7, 9157.55
2015-03-30, 2:42 PM, 53, 94, 1, 921.65
2015-03-30, 8:30 AM, 53, 38, 5, 4000.92
2015-03-30, 6:06 AM, 53, 12, 6, 2174.02
2015-03-30, 3:44 AM, 53, 47, 1, 7556.32
2015-03-30, 10:25 AM, 53, 30, 2, 5107.0
2015-03-30, 1:48 AM, 53, 58, 4, 718.93
2015-03-30, 9:31 AM, 53, 18, 4, 8214.79
2015-03-30, 9:04 AM, 53, 68, 4, 9246.59
2015-03-30, 1:51 AM, 53, 40, 1, 4095.5
2015-03-30, 1:53 PM, 53, 85, 9, 1630.24
2015-03-30, 6:51 PM, 53, 100, 1, 1694.52
2015-03-30, 7:39 PM, 53, 100, 8, 7885.35


In [31]:
# Next Task: give 5% discount for two or more Barbie Shopping Mall Playsets (ID = 25)
def applyDiscount(tran):
    if (int(tran[3]) == 25) & (int(tran[4]) >= 2):
        tran[5] = str(float(tran[5]) * 0.95)
    
    return tran

In [32]:
# apply the discount
transByCust = transByCust.mapValues(lambda t: applyDiscount(t))

In [33]:
# Next Task: Add complimentary toothbrush (ID 70) to 
# customers who bought five or more dictionaries (ID 81)
from copy import copy

def addToothbrush(tran):
    if (int(tran[3]) == 81) & (int(tran[4]) > 4):
        clonedTran = copy(tran)
        clonedTran[5] == '0.00'
        clonedTran[4] == '1'
        clonedTran[3] == '70'
        return [tran, clonedTran]
    else:
        return [tran]

In [34]:
# apply complimentary toothbrush
transByCust = transByCust.flatMapValues(lambda t: addToothbrush(t))

In [35]:
# how many transactions do we have now?
numOfTrans = sum([v for (k, v) in transByCust.countByKey().items()])
print('Num of transactions: {}'.format(numOfTrans))
print()

Num of transactions: 1006



In [36]:
# Next Task: Find the customer who spent the most money

# first convert values from string to float
amounts = transByCust.mapValues(lambda t: float(t[5]))

# fold values by key
totals = amounts.foldByKey(0, lambda p1, p2: p1 + p2).collect()

# sort and print top 5 customers
sorted(totals, key=operator.itemgetter(1), reverse=True)[:3]

[(76, 100049.00000000001), (53, 88829.76000000001), (56, 85906.94)]

In [37]:
# what happens if we seed the foldByKey with 100,000
totals = amounts.foldByKey(100000, lambda p1, p2: p1 + p2).collect()

# sort and print top 5 customers
sorted(totals, key=operator.itemgetter(1), reverse=True)[:3]

# The zero value is added to the values as many tims as there are partitions in the RDD

[(76, 200049.0), (53, 188829.75999999998), (56, 185906.94000000003)]

In [38]:
# Next Task: Give a pair of pajamas (ID 63) to the customer with the highest transaction
complTrans += [['2018-05-31', '12:00 PM', '76', '63', '1', '0.00']]

In [39]:
# With the transactions complete, we can merge the 
# complimentary transactions into the main dataset
complTrans_tuple = sc.parallelize(complTrans).map(lambda t: (int(t[2]), t))
transByCust = transByCust.union(complTrans_tuple)
transByCust.map(lambda t: '#'.join(t[1])).saveAsTextFile('ch04_output_transByCust')

In [40]:
sc.stop()

### 4.2 Understanding Data Partitioning and Reducing Data Shuffling