### Read the input file customers.tsv to create the customer RDD. The RDD elements are the lines of type string from the input file

In [0]:
from pyspark import SparkContext, SparkConf

# Create a SparkConf object
conf = SparkConf().setAppName("Optimization using Broadcast Variable")

# Read the TSV file into an RDD
customer = sc.textFile("/FileStore/tables/customers.tsv")
salestxn = sc.textFile("/FileStore/tables/salestxns.tsv")

# Display the RDD
customer.take(3)
salestxn.take(3)

Out[25]: ["1\t43\tCamping & Hiking\t957\tDiamondback Women's Serene Classic Comfort Bi\t299.98\t1\t11599",
 '2\t48\tWater Sports\t1073\tPelican Sunstream 100 Kayak\t199.99\t1\t256',
 "3\t24\tWomen's Apparel\t502\tNike Men's Dri-FIT Victory Golf Polo\t50\t5\t256"]

### We now need to transform the RDD into a pairRDD having a key,value pair as the RDD elements with customer id as the key and customer name as the value

In [0]:
customer = customer.map(lambda x: x.split("\t"))
#customer.collect()
customerRDD = customer.map(lambda x: (x[0],x[1]))
customerRDD.collect()

Out[26]: [('11039', 'Mary Torres'),
 ('5623', 'Jose Haley'),
 ('5829', 'Mary Smith'),
 ('6336', 'Richard Maddox'),
 ('1708', 'Margaret Booth'),
 ('10227', 'Mary Henderson'),
 ('839', 'Lisa Walker'),
 ('7604', 'Jonathan Hill'),
 ('6485', 'Carolyn Sheppard'),
 ('4737', 'Mary Mendoza'),
 ('5973', 'Michael Smith'),
 ('9205', 'James Holmes'),
 ('138', 'Mary Dawson'),
 ('371', 'Adam Marquez'),
 ('9285', 'Gloria Smith'),
 ('1209', 'Mary Webb'),
 ('3021', 'Nancy Alvarado'),
 ('3354', 'Russell Flores'),
 ('11684', 'Denise Smith'),
 ('11144', 'Jose Dickerson'),
 ('5252', 'Mary Smith'),
 ('8688', 'Michelle Rose'),
 ('8900', 'Russell Peterson'),
 ('5577', 'Mary Smith'),
 ('11161', 'Mary Pennington'),
 ('4494', 'Mary Smith'),
 ('7565', 'Jean Donovan'),
 ('11218', 'Louis Novak'),
 ('10124', 'Mary Santos'),
 ('6052', 'Mary Harding'),
 ('9756', 'Mary White'),
 ('5748', 'Shawn Mann'),
 ('11748', 'James Smith'),
 ('3852', 'Roy Potts'),
 ('668', 'Phillip Lewis'),
 ('1236', 'Virginia Robertson'),
 ('6991'

### Collecting it as Map

In [0]:
customerAsMap=customerRDD.collectAsMap()
print(customerAsMap)

{'11039': 'Mary Torres', '5623': 'Jose Haley', '5829': 'Mary Smith', '6336': 'Richard Maddox', '1708': 'Margaret Booth', '10227': 'Mary Henderson', '839': 'Lisa Walker', '7604': 'Jonathan Hill', '6485': 'Carolyn Sheppard', '4737': 'Mary Mendoza', '5973': 'Michael Smith', '9205': 'James Holmes', '138': 'Mary Dawson', '371': 'Adam Marquez', '9285': 'Gloria Smith', '1209': 'Mary Webb', '3021': 'Nancy Alvarado', '3354': 'Russell Flores', '11684': 'Denise Smith', '11144': 'Jose Dickerson', '5252': 'Mary Smith', '8688': 'Michelle Rose', '8900': 'Russell Peterson', '5577': 'Mary Smith', '11161': 'Mary Pennington', '4494': 'Mary Smith', '7565': 'Jean Donovan', '11218': 'Louis Novak', '10124': 'Mary Santos', '6052': 'Mary Harding', '9756': 'Mary White', '5748': 'Shawn Mann', '11748': 'James Smith', '3852': 'Roy Potts', '668': 'Phillip Lewis', '1236': 'Virginia Robertson', '6991': 'Mary Smith', '6941': 'Jeffrey Palmer', '12425': 'Mary Smith', '5013': 'Mary Peters', '759': 'Donna Smith', '8829': 

### Converting the Dictionary as a Broadcast Variable

In [0]:
broadcastCustomer=sc.broadcast(customerAsMap)

### Converting Sales Transaction into a Paired RDD

In [0]:
salestxn1 = salestxn.map(lambda x: x.split("\t"))
#salestxn1.collect()
#salestxn1.collect()
salesRDD = salestxn1.map(lambda x:((x[3],x[4],x[5],x[7]),x[6]))
salesRDD.count()

Out[44]: 172198

### Sum of Quantities of Same Product Bought by Same Customer in Seperate Multiple Transactions

In [0]:
multrans=salesRDD.reduceByKey(lambda x,y: x+y)
multrans.take(3)

Out[45]: [(('957', "Diamondback Women's Serene Classic Comfort Bi", '299.98', '11599'),
  '1'),
 (('403', "Nike Men's CJ Elite 2 TD Football Cleat", '129.99', '256'), '1111'),
 (('365', 'Perfect Fitness Perfect Rip Deck', '59.99', '8827'), '5511')]

### Creating Final RDD from the products

In [0]:
finalRDD = multrans.map(lambda x: ((x[0][3],broadcastCustomer.value.get(x[0][3],None), x[0][0], x[0][1], x[0][2], x[1][0]), (int(x[1][0]) * float(x[0][2]))))
finalRDD.collect()

Out[64]: [(('11599',
   None,
   '957',
   "Diamondback Women's Serene Classic Comfort Bi",
   '299.98',
   '1'),
  299.98),
 (('256',
   None,
   '403',
   "Nike Men's CJ Elite 2 TD Football Cleat",
   '129.99',
   '1'),
  129.99),
 (('8827', None, '365', 'Perfect Fitness Perfect Rip Deck', '59.99', '5'),
  299.95),
 (('8827', None, '502', "Nike Men's Dri-FIT Victory Golf Polo", '50', '3'),
  150.0),
 (('11318', None, '1014', "O'Brien Men's Neoprene Life Vest", '49.98', '2'),
  99.96),
 (('11318',
   None,
   '403',
   "Nike Men's CJ Elite 2 TD Football Cleat",
   '129.99',
   '1'),
  129.99),
 (('4530', None, '1073', 'Pelican Sunstream 100 Kayak', '199.99', '1'),
  199.99),
 (('4530',
   None,
   '957',
   "Diamondback Women's Serene Classic Comfort Bi",
   '299.98',
   '1'),
  299.98),
 (('4530', None, '926', 'Glove It Imperial Golf Towel', '15.99', '5'), 79.95),
 (('2911', None, '1014', "O'Brien Men's Neoprene Life Vest", '49.98', '4'),
  199.92),
 (('5648', None, '1073', 'Pelican 

In [0]:
finalRDD=finalRDD.map(lambda x: ((x[0][3],broadcastCustomer.value[x[0][3]], x[0][0], x[0][1], x[0][2], x[1][0]), (int(x[1][0]) * float(x[0][2]))))
finalRDD.take(3)

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3378, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<command-2336174887685501>", line 2, in <module>
    finalRDD.take(3)
  File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
  File "/databricks/spark/python/pyspark/rdd.py", line 2859, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/databricks/spark/python/pyspark/context.py", line 2414, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptions.py", line 228, in deco
    return f(*a, **kw)
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/p

