#### RDD Operations Exercise customer and salestxn data set with broadcase variable

We have customer details in a file - customers.tsv and sales transactions details in another file - salestxn.tsv.
Sales transactions contain only customer id but not name. Also the transactions may have multiple entries for a particular item if customer made separate transactions. We have to generate output that contain customer id, customer name along with the product name, product id, price, total quantity ordered and total amount paid.

Since customer file is smaller (with 1244 records) and sales transactions file is larger (172198 records) we will put customer details in a variable and broadcast it across the cluster in our code. We will use this broadcast variable to add the customer id and name to the sales transactions.

In the application flow we also need to aggregate the sales transactions of the same product in cases where a customer ordered as multiple transactions.

Import findspark with spark installation path and initialize 

In [None]:
import findspark
findspark.init('/usr/local/spark')

Import PySpark and create SparkContext

In [None]:
import pyspark
sc = pyspark.SparkContext(appName='RDD Operations Broadcast Variable Example')

Read the customers data input file to create an input RDD

In [None]:
custRDD1=sc.textFile("customers.tsv")

Display count and a sample of the RDD elements for debugging purposes. The RDD elements are the lines of type string from the input file.

In [None]:
custRDD1.count()

In [None]:
custRDD1.take(3)

In [None]:
type(custRDD1.first())

###### We need to transform the RDD elements into key,value pairs with the required key and value.

The function for custRDD:
 - Splits the input string at tabs.
 - Uses customer id as the key and customer name as the value
 - Then returns the string constructed as pair RDD

In [None]:
def make_cust_pairRDD(str):
    line = str.split('\t')
    return (line[0], line[1])

Use map lambda function to transform each element of RDD1 to key,value pair using the above function. 

In [None]:
custRDD2=custRDD1.map(lambda line: make_cust_pairRDD(line))

Normally persist is done when we have to do several transformations on an RDD. It will avoid recomputing the RDD from the beginning for each transformation.

In [None]:
custRDD2.persist()

In [None]:
custRDD2.take(5)

In [None]:
type(custRDD2.first())

In [None]:
custDict=custRDD2.collectAsMap()

The RDD operation collectAsMap returns the pairRDD as a dictionary (referred to as Map in other programming languages).

In [None]:
type(custDict)

In [None]:
for k, v in custDict.items():
    print(k, v)

In [None]:
custDict['6342']

In [None]:
custDict['11599']

Since some customer details are missing in the customers files we better use .get method to get the customer name for a given customer id.

In [None]:
custDict.get('6342')

In [None]:
custDict.get('11599')

We will define a broadcast variable - custDictBroadcast and make our custDict as the broadcast variable in our application.

In [None]:
custDictBroadcast=sc.broadcast(custDict)

In [None]:
type(custDictBroadcast)

In [None]:
custDictBroadcast.value

In [None]:
type(custDictBroadcast.value)

In [None]:
custDictBroadcast.value['6342']

In [None]:
custDictBroadcast.value['11599']

Since some customer details are missing in the customers files we better use .get method to get the customer name for a given customer id.

In [None]:
custDictBroadcast.value.get('6342')

In [None]:
custDictBroadcast.value.get('11599')

In [None]:
for k, v in custDictBroadcast.value.items():
    print(k, v)

Read the salestxns data input file to create another input RDD

In [None]:
salesRDD1=sc.textFile("salestxns.tsv")

Display count and a sample of the RDD elements for debugging purposes. The RDD elements are the lines of type string from the input file.

In [None]:
salesRDD1.count()

In [None]:
salesRDD1.take(3)

In [None]:
type(salesRDD1.first())

###### We need to transform the RDD elements into key,value pairs with the required key and value.

The function for salesRDD:
 - Splits the input string at tabs.
 - Uses a tuple containing customer id, product id, product name and product price as the key and product quantity as the value
 - Then returns the string constructed as pair RDD
 - Later in the code we will use reduceByKey to sum up the quantities of same product bought by same customer in separate multiple transactions
 

In [None]:
def make_sales_pairRDD(str):
    line = str.split('\t')
    return ((line[7],line[3],line[4],line[5]), int(line[6]))

In [None]:
salesRDD2=salesRDD1.map(lambda line: make_sales_pairRDD(line))

Display the RDD2 elements for debug purposes.

Since sales RDD is more than 100 times larger than customer RDD we can increase the parallelism  by increasing the partitions using partitionBy.
PySpark uses a default partitioner in this case.

In [None]:
salesRDD2.partitionBy(10).persist()

In [None]:
salesRDD2.count()

In [None]:
salesRDD2.take(3)

In [None]:
type(salesRDD2.first())

In [None]:
salesRDD3 = salesRDD2.reduceByKey(lambda va,vb: va+vb)

In [None]:
salesRDD3.count()

In [None]:
salesRDD3.take(3)

Map the key and value in each element of salesRDD3 so that we have customer id, product id, product name, price, total quantity purchased and lastly total amount paid (price * total qty).
Once that is done we can use the broadcast variable to get the customer name as required for the final format mentioned in the problem statement.

In [None]:
salesRDD4=salesRDD3.map(lambda fld : (fld[0][0],fld[0][1],fld[0][2],float(fld[0][3]),fld[1], 
                                      round(float(fld[0][3])*fld[1],2)))

In [None]:
salesRDD4.take(5)

In [None]:
type(salesRDD3.first())

In [None]:
salesRDD5=salesRDD4.map(lambda fld : (fld[0],custDictBroadcast.value.get(fld[0]),fld[1],fld[2],fld[3],fld[4],fld[5]))

In [None]:
for line in salesRDD5.take(15):
    print(line)

In [None]:
salesRDD5.saveAsTextFile('cust_sales55')

The above code has several print statements for illustration and for debug purposes. Also, some of the code can be improved. For example we do not need salesRDD5. We can get the required result without creating it thus reducing one transformation. Figure out any other such improvements and rewrite the code.

Also figure out how you can get the same output when you use join the two RDDs (customers and salestxns)