# Leveraging Apache Spark for Efficient Retail Data Processing at Retail World

## Background

RetailWorld, a prominent retail chain with numerous stores across Metropolia, faces the challenge of processing and analyzing substantial volumes of daily sales data. With real-time data streaming from multiple sources, RetailWorld needs to clean, transform, and aggregate this data to derive actionable insights such as total Sales and Revenue per Product, Total Sales and Revenue per Store, Sales and Revenue per Promotion Type and Stock Analysis per Product.

This dataset is a modified <a href="https://www.kaggle.com/datasets/berkayalan/retail-sales-data?select=sales.csv">sales</a> dataset taken from the Kaggle website. This data is collected from a Turkish retail company, covering the period from the beginning of 2017 to the end of 2019.
It currently consists of 1033435 records.


## Dataset Description


**product_id**: This attribute represents the unique identifier for each product in the dataset. Each product is assigned a specific ID (e.g., P0001).

**store_id**: This attribute represents the unique identifier for each store where the product is sold. Each store is assigned a specific ID (e.g., S0002).

**date**: This attribute represents the date of sales data. It indicates when the sales, revenue, stock, and other information were recorded for a particular product in a specific store.

**sales**: This attribute represents the number of units of the product sold on a given date in a particular store. It indicates the quantity of the product that was purchased.

**revenue**: This attribute represents the total revenue generated from the sales of the product on a given date in a specific store. It is calculated by multiplying the number of units sold (sales) by the price per unit (price).

**stock**: This attribute represents the quantity of the product available in stock at the beginning of the day on the specified date in the given store.

**price**: This attribute represents the price per unit of the product on a given date in a specific store. It indicates the amount charged to the customer for each unit of the product.

**promo_type_1**: This attribute represents the type of promotion (if any) applied to the product. It indicates the first type of promotional activity associated with the product, such as discounts, special offers, or marketing campaigns.

**promo_bin_1**: This attribute represents the specific promotional bin (if any) associated with the first type of promotion. It provides additional details about the nature or category of the promotion.

**promo_type_2**: This attribute represents the type of secondary promotion (if any) applied to the product. It indicates another type of promotional activity associated with the product, similar to promo_type_1 but potentially different in nature or timing.

These attributes collectively provide detailed information about the sales, revenue, pricing, and promotional activities associated with each product in various stores over time.


## Challenges
Traditional data processing tools are inadequate for handling the velocity and volume of incoming sales data, leading to delays in analysis and decision-making. These delays hinder RetailWorld's ability to respond swiftly to market demands and optimize inventory and sales strategies.

## Solution: Apache Spark
To address these challenges, RetailWorld requires a scalable and efficient solution. Apache Spark, with its distributed computing architecture and robust processing capabilities, is the ideal solution for RetailWorld's data analytics needs. Spark's ability to parallelize data processing tasks across a cluster of nodes enables rapid aggregation and analysis of large datasets. Additionally, its fault-tolerant design ensures reliability and resilience against failures, making it a dependable choice for RetailWorld's critical data processing tasks.

## Import Libraries

In [1]:
# import libraries
import findspark
findspark.init()

# Importing SparkSession from pyspark.sql module
from pyspark.sql import SparkSession

In [2]:
# Suppressing warnings by defining a function 'warn' that does nothing
def warn(*args, **kwargs):
    pass

# Importing the 'warnings' module to handle warnings
import warnings

# Overriding the 'warn' function in the 'warnings' module with the defined function to suppress warnings
warnings.warn = warn

# Filtering out all warnings to be ignored
warnings.filterwarnings('ignore')

## Initializing SparkContext

In [22]:
from pyspark import SparkContext
from pyspark import SparkConf
from datetime import datetime

# Set Spark configuration
conf = SparkConf().setAppName("RetailStoreSalesAnalysis") \
                  .set("spark.executor.memory", "4g") \
                  .set("spark.driver.memory", "4g") \
                  .set("spark.network.timeout", "10000000") \
                  .set("spark.executor.heartbeatInterval", "10000000") \
                  .set("spark.driver.maxResultSize", "2g")

# Initialise Spark context
sc = SparkContext(master="local", conf=conf)

In [4]:
filename = 'Retailsales.csv'

## Load Data

In [28]:
raw_data = sc.textFile(filename, 20)

In [29]:
print(f"Total number of lines: {raw_data.count()}")

Total number of lines: 1033435


In [30]:
raw_data.collect()

['product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2',
 'P0001,S0002,1/2/2017,0,0,8,6.25,PR14,,PR03',
 'P0001,S0012,1/2/2017,1,5.3,0,6.25,PR14,,PR03',
 'P0001,S0013,1/2/2017,2,10.59,0,6.25,PR14,,PR03',
 'P0001,S0023,1/2/2017,0,0,6,6.25,PR14,,PR03',
 'P0001,S0025,1/2/2017,0,0,1,6.25,PR14,,PR03',
 'P0001,S0027,1/2/2017,0,0,7,6.25,PR14,,PR03',
 'P0001,S0040,1/2/2017,0,0,19,6.25,PR14,,PR03',
 'P0001,S0049,1/2/2017,0,0,8,6.25,PR14,,PR03',
 'P0001,S0050,1/2/2017,0,0,5,6.25,PR14,,PR03',
 'P0001,S0051,1/2/2017,0,0,6,6.25,PR14,,PR03',
 'P0001,S0055,1/2/2017,0,0,6,6.25,PR14,,PR03',
 'P0001,S0056,1/2/2017,1,5.3,6,6.25,PR14,,PR03',
 'P0001,S0062,1/2/2017,0,0,2,6.25,PR14,,PR03',
 'P0001,S0063,1/2/2017,0,0,7,6.25,PR14,,PR03',
 'P0001,S0066,1/2/2017,0,0,1,6.25,PR14,,PR03',
 'P0001,S0078,1/2/2017,0,0,6,6.25,PR14,,PR03',
 'P0001,S0082,1/2/2017,0,0,4,6.25,PR14,,PR03',
 'P0001,S0083,1/2/2017,0,0,10,6.25,PR14,,PR03',
 'P0001,S0087,1/2/2017,0,0,10,6.25,PR14,,PR03',
 'P

In [31]:
# Debugging
print(type(raw_data))  # This should show <class 'pyspark.rdd.RDD'>
print(raw_data.take(5))  # Inspect the first 5 records for debugging

<class 'pyspark.rdd.RDD'>
['product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2', 'P0001,S0002,1/2/2017,0,0,8,6.25,PR14,,PR03', 'P0001,S0012,1/2/2017,1,5.3,0,6.25,PR14,,PR03', 'P0001,S0013,1/2/2017,2,10.59,0,6.25,PR14,,PR03', 'P0001,S0023,1/2/2017,0,0,6,6.25,PR14,,PR03']


In [32]:
# Debugging
try:
    header = raw_data.first()
    print(f"Header: {header}")
except Exception as e:
    print(f"Error encountered: {e}")

Header: product_id,store_id,date,sales,revenue,stock,price,promo_type_1,promo_bin_1,promo_type_2


## Parse and Clean Data

The `parse_line` function is defined to parse each line of the CSV file into a structured format, extracting fields like __product ID, store ID, date, sales, revenue__ etc. The header line is removed from the RDD. The parsed data is filtered to remove records with missing or invalid data, such as zero or negative sales or price.

In [33]:
# Parse and clean data
def parse_line(line):
    # Split the line by comma to get fields
    fields = line.split(",")
    # Return a dictionary with parsed fields
    return {
        'product_id': fields[0],
        'store_id': fields[1],
        'date': fields[2],
        'sales': float(fields[3]),
        'revenue': float(fields[4]),
        'stock': float(fields[5]),
        'price': float(fields[6]),
        'promo_type_1': fields[7],
        'promo_type_2': fields[9]
    }

# Remove the header line
header = raw_data.first()

raw_data_no_header = raw_data.filter(lambda line: line!= header)

# Parse the lines into a structured format
parsed_data = raw_data_no_header.map(parse_line)
parsed_data = parsed_data.filter(lambda x: x is not None)

# Filter out records with missing or invalid data
cleaned_data = parsed_data.filter(lambda x: x['sales'] > 0 and x['price'] > 0)

## Partitioning

The number of partitions in the cleaned data RDD is chekced and printed

In [34]:
# Check the number of partitions
print(f"Number of partitions in cleaned_data: {cleaned_data.getNumPartitions()}")

Number of partitions in cleaned_data: 20


## Partition-wise Count:

Here a function `count_in_partition` is defined to count the number of records in each partition of RDD. This function is applied using `mapPartitionsWithIndex` to get the count of records in each partition, and the results are printed.

In [35]:
# Function to count the number of records in each parition
def count_in_partition(index, iterator):
    count = sum(1 for _ in iterator)
    yield (index, count)
    
# Get the count of records in each partition
partitions_info = cleaned_data.mapPartitionsWithIndex(count_in_partition).collect()
print("Number of records in each partitions:")
for partition, count in partitions_info:
    print(f"Partition {partition}: {count} records")

Number of records in each partitions:
Partition 0: 9542 records
Partition 1: 10008 records
Partition 2: 9344 records
Partition 3: 9963 records
Partition 4: 9654 records
Partition 5: 9673 records
Partition 6: 9673 records
Partition 7: 9718 records
Partition 8: 10325 records
Partition 9: 9634 records
Partition 10: 10435 records
Partition 11: 9323 records
Partition 12: 9686 records
Partition 13: 9335 records
Partition 14: 9760 records
Partition 15: 10780 records
Partition 16: 10194 records
Partition 17: 10326 records
Partition 18: 9299 records
Partition 19: 9989 records


## Aggregations

Several aggregations are performed on the cleaned data RDD:
- Total sales and revenue per product.
- Total sales and revenue per store.
- Average price per product.
- Sales and revenue per promotion type 1 and promotion type 2.
- Stock analysis per product.
- Each aggregation is performed using map to transform the data into key-value pairs and reduceByKey to aggregate the values for each key.

### Total Sales and Revenue per Product:
This aggregation calculates the total sales and revenue for each product.
It first maps each record in cleaned_data to a key-value pair, where the key is the product ID and the value is a tuple containing sales and revenue.
Then, it uses `reduceByKey` to aggregate the sales and revenue values for each product ID.

In [38]:
# Aggregation 1
sales_revenue_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

## Total Sales and Revenue per Store:

This aggregation calculates the total sales and revenue for each store. Similar to the first aggregation, it maps each record to a key-value pair with the store ID as the key and a tuple containing sales and revenue as the value. It then uses reduceByKey to aggregate the sales and revenue values for each store ID.

In [39]:
# Aggregation 2
sales_revenue_per_store = cleaned_data.map(lambda x: (x['store_id'], (x['sales'], x['revenue']))) \
                                    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

### Average Price per Product:
This aggregation calculates the average price for each product. It first maps each record to a key-value pair with the product ID as the key and a tuple containing the price and a count of 1. Then, it uses reduceByKey to aggregate the total price and count of prices for each product. Finally, it calculates the average price by dividing the total price by the count.

In [41]:
# Aggregation 3
total_price_count_per_product = cleaned_data.map(lambda x: (x['product_id'], (x['price'], 1))) \
                                    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

average_price_per_product = total_price_count_per_product.mapValues(lambda x: x[0]/x[1])

## Sales and Revenue per Promotion Type:
These aggregations calculate the total sales and revenue for each promotion type (promo_type_1 and promo_type_2). Each record is mapped to a key-value pair with the promotion type as the key and a tuple containing sales and revenue as the value. Then, reduceByKey is used to aggregate the sales and revenue values for each promotion type.

In [42]:
# Aggregation 4
sales_revenue_per_promo_1 = cleaned_data.map(lambda x: (x['promo_1'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
sales_revenue_per_promo_2 = cleaned_data.map(lambda x: (x['promo_2'], (x['sales'], x['revenue']))) \
                                        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

## Stock Analysis per Product:
This aggregation calculates the total stock for each product. Each record is mapped to a key-value pair with the product ID as the key and the stock as the value. Then, reduceByKey is used to aggregate the stock values for each product.

In [43]:
# Aggregation 5
stock_per_product = cleaned_data.map(lambda x: (x['product_id'], x['stock'])) \
                                .reduceByKey(lambda a, b: a+b)

## Saving Results

print the results of each aggregation

In [44]:
# Print results
print("Total Sales and Revenue per Product:")
print("=" * 35)
for product in sales_revenue_per_product.collect():
    # Create the format string with appropriate padding
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"

    # Print the values using the format string
    print(format_string.format(str(product[0]), str(round(product[1][0],2)), str(round(product[1][1],2))))

print("\n\nTotal Sales and Revenue per Store:")
print("=" * 35)
for store in sales_revenue_per_store.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"
    print(format_string.format(str(store[0]), str(round(store[1][0],2)), str(round(store[1][1],2))))

print("\n\nAverage Price per Product:")
print("=" * 30)

for product in average_price_per_product.collect():
    format_string = f"{{:<5}} | {{:<9}}"
    print(format_string.format(str(product[0]), str(round(product[1],2))))

print("\n\nSales and Revenue per Promotion Type 1:")
print("=" * 40)
for promo in sales_revenue_per_promo_1.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"
    print(format_string.format(str(promo[0]), str(round(promo[1][0],2)), str(round(promo[1][1],2))))

print("\n\nSales and Revenue per Promotion Type 2:")
print("=" * 40)
for promo in sales_revenue_per_promo_2.collect():
    format_string = f"{{:<5}} | {{:<9}} | {{:<9}}"

    print(format_string.format(str(promo[0]), str(round(promo[1][0],2)), str(round(promo[1][1],2))))

print("\n\nStock per Product:")
print("=" * 20)
for product in stock_per_product.collect():
    format_string = f"{{:<5}} | {{:<9}}"
    print(format_string.format(str(product[0]), str(round(product[1],2))))

Total Sales and Revenue per Product:
P0169 | 593.0     | 2257.85  
P0261 | 9501.0    | 36815.99 
P0333 | 11697.0   | 19883.82 
P0336 | 531.0     | 10193.62 
P0437 | 179.0     | 1242.63  
P0454 | 970.0     | 1677.28  
P0590 | 19334.0   | 8694.91  
P0608 | 1480.0    | 10499.17 
P0695 | 256.0     | 1168.7   
P0343 | 12.0      | 136.34   
P0511 | 33.0      | 873.84   
P0599 | 27.0      | 259.09   
P0466 | 210.0     | 920.53   
P0477 | 37.0      | 122.48   
P0018 | 1340.0    | 2374.91  
P0177 | 212.0     | 1484.38  
P0195 | 630.0     | 3278.5   
P0220 | 797.0     | 1291.14  
P0272 | 97.0      | 606.68   
P0296 | 6745.0    | 4614.89  
P0345 | 620.0     | 1823.99  
P0348 | 6134.0    | 11476.97 
P0704 | 3102.0    | 5246.25  
P0693 | 9.0       | 287.67   
P0625 | 45.0      | 452.36   
P0495 | 14.0      | 82.69    
P0160 | 25.0      | 54.34    
P0055 | 1136.0    | 3650.86  
P0057 | 233.0     | 2478.77  
P0097 | 138.0     | 1506.68  
P0109 | 308.0     | 2345.69  
P0183 | 241.0     | 1108.49  
P02

S0106 | 5526.99   | 13176.46 
S0066 | 5332.09   | 25313.23 
S0010 | 6727.69   | 26428.78 
S0088 | 4986.0    | 8089.97  
S0131 | 3765.14   | 10557.32 
S0049 | 3585.01   | 9918.76  
S0014 | 3299.35   | 10988.31 
S0021 | 2482.0    | 6224.09  
S0070 | 2127.76   | 5411.76  
S0052 | 3348.71   | 12547.09 
S0137 | 3242.0    | 9251.87  
S0045 | 1985.11   | 5521.61  
S0083 | 2125.0    | 7728.11  
S0082 | 4982.66   | 14191.6  
S0074 | 3085.91   | 8910.0   
S0044 | 1378.0    | 2432.52  
S0085 | 37937.73  | 117882.5 
S0026 | 19473.19  | 82572.55 
S0073 | 2260.0    | 6968.0   
S0080 | 6184.0    | 10228.03 
S0089 | 1167.0    | 2664.06  
S0022 | 5135.39   | 21876.56 
S0127 | 721.0     | 1484.54  
S0012 | 4023.0    | 11399.66 
S0096 | 6031.13   | 15773.65 
S0135 | 1951.95   | 3637.91  
S0117 | 2948.0    | 4662.88  
S0119 | 786.0     | 1787.48  
S0090 | 1010.24   | 9229.75  
S0093 | 668.0     | 2148.98  
S0053 | 2300.43   | 4328.38  
S0023 | 5920.52   | 24434.22 
S0124 | 3068.0    | 5961.35  
S0013 | 39

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 193) (DESKTOP-SVRSI70 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "H:\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "H:\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1237, in process
  File "H:\Anaconda3\lib\site-packages\pyspark\rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "H:\Anaconda3\lib\site-packages\pyspark\rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "H:\Anaconda3\lib\site-packages\pyspark\rdd.py", line 840, in func
    return f(iterator)
  File "H:\Anaconda3\lib\site-packages\pyspark\rdd.py", line 3983, in combineLocally
    merger.mergeValues(iterator)
  File "H:\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "H:\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\Ning\AppData\Local\Temp\ipykernel_21452\2366400287.py", line 2, in <lambda>
KeyError: 'promo_1'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "H:\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1247, in main
  File "H:\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 1237, in process
  File "H:\Anaconda3\lib\site-packages\pyspark\rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "H:\Anaconda3\lib\site-packages\pyspark\rdd.py", line 5434, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "H:\Anaconda3\lib\site-packages\pyspark\rdd.py", line 840, in func
    return f(iterator)
  File "H:\Anaconda3\lib\site-packages\pyspark\rdd.py", line 3983, in combineLocally
    merger.mergeValues(iterator)
  File "H:\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "H:\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 83, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\Ning\AppData\Local\Temp\ipykernel_21452\2366400287.py", line 2, in <lambda>
KeyError: 'promo_1'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


## Cleanup

In [45]:
# Stop the spark context
sc.stop()