#### RDD Operations Exercise customer and salestxn data set with broadcase variable

We have customer details in a file - customers.tsv and sales transactions details in another file - salestxn.tsv.
Sales transactions contain only customer id but not name. Also the transactions may have multiple entries for a particular item if customer made separate transactions. We have to generate output that contain customer id, customer name along with the product name, product id, price, total quantity ordered and total amount paid.

Since customer file is smaller (with 1244 records) and sales transactions file is larger (172198 records) we will put customer details in a variable and broadcast it across the cluster in our code. We will use this broadcast variable to add the customer id and name to the sales transactions.

In the application flow we also need to aggregate the sales transactions of the same product in cases where a customer ordered as multiple transactions.

Import findspark with spark installation path and initialize 

In [1]:
import os
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/anaconda3/bin/python3.7'
os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python3.7'
os.environ["SPARK_HOME"] = '/opt/cloudera/parcels/CDH/lib/spark'
import findspark
findspark.init()
import findspark
findspark.init()

Import PySpark and create SparkContext

In [2]:
import pyspark
sc = pyspark.SparkContext(appName='BroadcastVar')

Read the customers data input file to create an input RDD

In [3]:
custRDD1=sc.textFile("customers.tsv")

Display count and a sample of the RDD elements for debugging purposes. The RDD elements are the lines of type string from the input file.

In [4]:
custRDD1.count()

1244

###### We need to transform the RDD elements into key,value pairs with the required key and value.

The function for custRDD:
 - Splits the input string at tabs.
 - Uses customer id as the key and customer name as the value
 - Then returns the string constructed as pair RDD

In [7]:
def make_cust_pairRDD(str):
    line = str.split('\t')
    return (line[0], line[1])

Use map lambda function to transform each element of RDD1 to key,value pair using the above function. 

In [8]:
custRDD2=custRDD1.map(lambda line: make_cust_pairRDD(line))

Normally persist is done when we have to do several transformations on an RDD. It will avoid recomputing the RDD from the beginning for each transformation.

In [9]:
custRDD2.persist()

PythonRDD[3] at RDD at PythonRDD.scala:53

In [10]:
custDict=custRDD2.collectAsMap()

The RDD operation collectAsMap returns the pairRDD as a dictionary (referred to as Map in other programming languages).

In [11]:
custDict['6342']

'Kimberly Blair'

In [12]:
custDict['11599']

KeyError: '11599'

Since some customer details are missing in the customers files we better use .get method to get the customer name for a given customer id.

In [13]:
custDict.get('6342')

'Kimberly Blair'

In [14]:
custDict.get('11599')

We will define a broadcast variable - custDictBroadcast and make our custDict as the broadcast variable in our application.

In [15]:
custDictBroadcast=sc.broadcast(custDict)

In [16]:
type(custDictBroadcast)

pyspark.broadcast.Broadcast

In [17]:
custDictBroadcast.value

{'11039': 'Mary Torres',
 '5623': 'Jose Haley',
 '5829': 'Mary Smith',
 '6336': 'Richard Maddox',
 '1708': 'Margaret Booth',
 '10227': 'Mary Henderson',
 '839': 'Lisa Walker',
 '7604': 'Jonathan Hill',
 '6485': 'Carolyn Sheppard',
 '4737': 'Mary Mendoza',
 '5973': 'Michael Smith',
 '9205': 'James Holmes',
 '138': 'Mary Dawson',
 '371': 'Adam Marquez',
 '9285': 'Gloria Smith',
 '1209': 'Mary Webb',
 '3021': 'Nancy Alvarado',
 '3354': 'Russell Flores',
 '11684': 'Denise Smith',
 '11144': 'Jose Dickerson',
 '5252': 'Mary Smith',
 '8688': 'Michelle Rose',
 '8900': 'Russell Peterson',
 '5577': 'Mary Smith',
 '11161': 'Mary Pennington',
 '4494': 'Mary Smith',
 '7565': 'Jean Donovan',
 '11218': 'Louis Novak',
 '10124': 'Mary Santos',
 '6052': 'Mary Harding',
 '9756': 'Mary White',
 '5748': 'Shawn Mann',
 '11748': 'James Smith',
 '3852': 'Roy Potts',
 '668': 'Phillip Lewis',
 '1236': 'Virginia Robertson',
 '6991': 'Mary Smith',
 '6941': 'Jeffrey Palmer',
 '12425': 'Mary Smith',
 '5013': 'Mary 

In [18]:
type(custDictBroadcast.value)

dict

In [19]:
custDictBroadcast.value['6342']

'Kimberly Blair'

In [None]:
custDictBroadcast.value['11599']

Since some customer details are missing in the customers files we better use .get method to get the customer name for a given customer id.

Read the salestxns data input file to create another input RDD

In [20]:
salesRDD1=sc.textFile("salestxns.tsv")

Display count and a sample of the RDD elements for debugging purposes. The RDD elements are the lines of type string from the input file.

In [21]:
salesRDD1.count()

172198

In [22]:
salesRDD1.take(3)

["1\t43\tCamping & Hiking\t957\tDiamondback Women's Serene Classic Comfort Bi\t299.98\t1\t11599",
 '2\t48\tWater Sports\t1073\tPelican Sunstream 100 Kayak\t199.99\t1\t256',
 "3\t24\tWomen's Apparel\t502\tNike Men's Dri-FIT Victory Golf Polo\t50\t5\t256"]

###### We need to transform the RDD elements into key,value pairs with the required key and value.

The function for salesRDD:
 - Splits the input string at tabs.
 - Uses a tuple containing customer id, product id, product name and product price as the key and product quantity as the value
 - Then returns the string constructed as pair RDD
 - Later in the code we will use reduceByKey to sum up the quantities of same product bought by same customer in separate multiple transactions
 

In [23]:
def make_sales_pairRDD(str):
    line = str.split('\t')
    return ((line[7],line[3],line[4],line[5]), int(line[6]))

In [24]:
salesRDD2=salesRDD1.map(lambda line: make_sales_pairRDD(line))

Display the RDD2 elements for debug purposes.

Since sales RDD is more than 100 times larger than customer RDD we can increase the parallelism  by increasing the partitions using partitionBy.
PySpark uses a default partitioner in this case.

In [25]:
salesRDD2.partitionBy(10).persist()

MapPartitionsRDD[11] at mapPartitions at PythonRDD.scala:133

In [26]:
salesRDD2.count()

172198

In [27]:
salesRDD3 = salesRDD2.reduceByKey(lambda va,vb: va+vb)

In [28]:
salesRDD3.count()

92875

In [29]:
salesRDD3.take(3)

[(('11599', '957', "Diamondback Women's Serene Classic Comfort Bi", '299.98'),
  1),
 (('256', '403', "Nike Men's CJ Elite 2 TD Football Cleat", '129.99'), 4),
 (('8827', '365', 'Perfect Fitness Perfect Rip Deck', '59.99'), 12)]

Map the key and value in each element of salesRDD3 so that we have customer id, product id, product name, price, total quantity purchased and lastly total amount paid (price * total qty).
Once that is done we can use the broadcast variable to get the customer name as required for the final format mentioned in the problem statement.

In [30]:
salesRDD4=salesRDD3.map(lambda fld : (fld[0][0],fld[0][1],fld[0][2],float(fld[0][3]),fld[1], 
                                      round(float(fld[0][3])*fld[1],2)))

In [31]:
salesRDD4.take(20)

[('11599',
  '957',
  "Diamondback Women's Serene Classic Comfort Bi",
  299.98,
  1,
  299.98),
 ('256', '403', "Nike Men's CJ Elite 2 TD Football Cleat", 129.99, 4, 519.96),
 ('8827', '365', 'Perfect Fitness Perfect Rip Deck', 59.99, 12, 719.88),
 ('8827', '502', "Nike Men's Dri-FIT Victory Golf Polo", 50.0, 3, 150.0),
 ('11318', '1014', "O'Brien Men's Neoprene Life Vest", 49.98, 2, 99.96),
 ('11318',
  '403',
  "Nike Men's CJ Elite 2 TD Football Cleat",
  129.99,
  3,
  389.97),
 ('4530', '1073', 'Pelican Sunstream 100 Kayak', 199.99, 2, 399.98),
 ('4530',
  '957',
  "Diamondback Women's Serene Classic Comfort Bi",
  299.98,
  4,
  1199.92),
 ('4530', '926', 'Glove It Imperial Golf Towel', 15.99, 5, 79.95),
 ('2911', '1014', "O'Brien Men's Neoprene Life Vest", 49.98, 10, 499.8),
 ('5648', '1073', 'Pelican Sunstream 100 Kayak', 199.99, 6, 1199.94),
 ('918', '365', 'Perfect Fitness Perfect Rip Deck', 59.99, 4, 239.96),
 ('918', '191', "Nike Men's Free 5.0+ Running Shoe", 99.99, 7, 699

In [32]:
type(salesRDD3.first())

tuple

In [31]:
salesRDD5=salesRDD4.map(lambda fld : (fld[0],custDictBroadcast.value.get(fld[0]),fld[1],fld[2],fld[3],fld[4],fld[5]))

In [25]:
salesRDD5.take(100)

[('3730', None, '365', 'Perfect Fitness Perfect Rip Deck', 59.99, 3, 179.97),
 ('3730', None, '502', "Nike Men's Dri-FIT Victory Golf Polo", 50.0, 2, 100.0),
 ('3730',
  None,
  '957',
  "Diamondback Women's Serene Classic Comfort Bi",
  299.98,
  4,
  1199.92),
 ('3925',
  None,
  '502',
  "Nike Men's Dri-FIT Victory Golf Polo",
  50.0,
  15,
  750.0),
 ('8743', None, '1014', "O'Brien Men's Neoprene Life Vest", 49.98, 7, 349.86),
 ('8743',
  None,
  '403',
  "Nike Men's CJ Elite 2 TD Football Cleat",
  129.99,
  3,
  389.97),
 ('2640',
  None,
  '403',
  "Nike Men's CJ Elite 2 TD Football Cleat",
  129.99,
  4,
  519.96),
 ('2640', None, '1014', "O'Brien Men's Neoprene Life Vest", 49.98, 1, 49.98),
 ('5004',
  None,
  '1014',
  "O'Brien Men's Neoprene Life Vest",
  49.98,
  31,
  1549.38),
 ('5004',
  None,
  '403',
  "Nike Men's CJ Elite 2 TD Football Cleat",
  129.99,
  5,
  649.95),
 ('6413',
  None,
  '627',
  "Under Armour Girls' Toddler Spine Surge Runni",
  39.99,
  3,
  119.97

In [28]:
for line in salesRDD5.take(30):
    print(line)

('11599', None, '957', "Diamondback Women's Serene Classic Comfort Bi", 299.98, 1, 299.98)
('256', None, '403', "Nike Men's CJ Elite 2 TD Football Cleat", 129.99, 4, 519.96)
('8827', None, '365', 'Perfect Fitness Perfect Rip Deck', 59.99, 12, 719.88)
('8827', None, '502', "Nike Men's Dri-FIT Victory Golf Polo", 50.0, 3, 150.0)
('11318', None, '1014', "O'Brien Men's Neoprene Life Vest", 49.98, 2, 99.96)
('11318', None, '403', "Nike Men's CJ Elite 2 TD Football Cleat", 129.99, 3, 389.97)
('4530', None, '1073', 'Pelican Sunstream 100 Kayak', 199.99, 2, 399.98)
('4530', None, '957', "Diamondback Women's Serene Classic Comfort Bi", 299.98, 4, 1199.92)
('4530', None, '926', 'Glove It Imperial Golf Towel', 15.99, 5, 79.95)
('2911', None, '1014', "O'Brien Men's Neoprene Life Vest", 49.98, 10, 499.8)
('5648', None, '1073', 'Pelican Sunstream 100 Kayak', 199.99, 6, 1199.94)
('918', None, '365', 'Perfect Fitness Perfect Rip Deck', 59.99, 4, 239.96)
('918', None, '191', "Nike Men's Free 5.0+ Runni

In [None]:
salesRDD5.saveAsTextFile('cust_sales55')

The above code has several print statements for illustration and for debug purposes. Also, some of the code can be improved. For example we do not need salesRDD5. We can get the required result without creating it thus reducing one transformation. Figure out any other such improvements and rewrite the code.

Also figure out how you can get the same output when you use join the two RDDs (customers and salestxns)