## Weekly Sales Data Processing With RDDs in PySpark

This notebook presents a solution to process product sales data on a weekly basis using PySpark's RDD (Resilient Distributed Dataset) abstraction. The dataset consists of shipping records, and the goal is to calculate the total quantity of products shipped in each week.

The workflow includes:
- Initializing a Spark session and loading the dataset.
- Removing the header and extracting relevant fields.
- Filtering orders with a shipping status that starts with "Shipped".
- Identifying Mondays as weekly boundaries.
- Aggregating quantities shipped per SKU for each week.
- Exporting the final result to a CSV file.

### 🔹 Initialize SparkSession and SparkContext

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Weekly-Sales").getOrCreate()
sc = spark.sparkContext

### 🔹 Load Data

In [2]:
lines = sc.textFile("file:///D:/Intrduction_To_Big_Data/22120384/src/Task_2.2/asr.csv")

print(f"Number of lines before removing header: {lines.count()}")

# remove header
header = lines.first()
lines = lines.filter(lambda x: x != header)

print(f"Number of lines after removing header: {lines.count()}")

Number of lines before removing header: 128976
Number of lines after removing header: 128975


### 🔹 Extract Relevant Fields

In [3]:
# extract the relevant columns
extracted_data = lines.map(lambda line:(
    line.split(",")[9],  # Category -> SKU
    line.split(",")[2],  # Date
    line.split(",")[3],  # Status
    line.split(",")[13],  # Quantity
))
extracted_data.take(5)

[('Set', '04-30-22', 'Cancelled', '0'),
 ('kurta', '04-30-22', 'Shipped - Delivered to Buyer', '1'),
 ('kurta', '04-30-22', 'Shipped', '1'),
 ('Western Dress', '04-30-22', 'Cancelled', '0'),
 ('Top', '04-30-22', 'Shipped', '1')]

### 🔹 Define Date Conversion Function

In [4]:
from datetime import datetime, timedelta

# Convert date string to datetime object
def safe_parse_date(date_str):
    try:
        return datetime.strptime(date_str.strip(), "%m-%d-%y").date()
    except Exception:
        return None
    
# Convert quantity string to integer
def safe_int(value):
    try:
        return int(value)
    except:
        return None


### 🔹 Filter Shipped Orders and Format Data

In [5]:
# Filter the data for shipped items have status start with "Shipped"
shipped_data = extracted_data.filter(lambda x: x[2].startswith("Shipped"))

# Convert data types
parsed_data =  shipped_data.map(lambda x: (x[0], safe_parse_date(x[1]), safe_int(x[3])))

# Find the average quantity of shipped items
valid_quantities = parsed_data.filter(lambda x: x[2] is not None)\
                              .map(lambda x: x[2])
avg_quantity = int(round(valid_quantities.mean()))

# Remove None values in date and replace none values in quantity with average quantity
transformed_data = parsed_data.filter(lambda x: x[1] is not None)\
                              .map(lambda x: (x[0], x[1], x[2] if x[2] is not None else avg_quantity))

transformed_data.take(5)

[('kurta', datetime.date(2022, 4, 30), 1),
 ('kurta', datetime.date(2022, 4, 30), 1),
 ('Top', datetime.date(2022, 4, 30), 1),
 ('Set', datetime.date(2022, 4, 30), 1),
 ('Set', datetime.date(2022, 4, 30), 1)]

### 🔹 Identify Mondays and Aggregate Weekly Sales

In [6]:
# Find all mondays in the dataset
dates = transformed_data.map(lambda x: x[1])\
                        .distinct()\
                        .sortBy(lambda x: x)\
                        .collect()
mondays = [date for date in dates if date.weekday() == 0]

### 🔹 Calculate the total quantity of products shipped in each week

In [7]:
# Create a list of tuples (SKU, week_start_date, quantity)
loaded_data = []

for monday in mondays:
    # Filter the shipped data for the current week
    start_date = monday - timedelta(days=7)
    end_date = monday
    weekly_data = transformed_data.filter(lambda x: start_date <= x[1] < end_date)
    
    # Sum the quantities for each SKU in the current week
    weekly_sales = weekly_data.map(lambda x: (x[0], x[2]))\
                              .reduceByKey(lambda x, y: x + y)\
                              .map(lambda x: (monday.strftime("%d-%m-%Y"), x[0], x[1]))        

    loaded_data.extend(weekly_sales.collect())

# Sort by date
loaded_data.sort(key=lambda x: x[0])

In [8]:
# Write to CSV file
header = ["report_date", "sku", "total_quantity"]
output_file = "./output.csv"

with open(output_file, "w") as f:
    f.write(",".join(header) + "\n")
    for row in loaded_data:
        f.write(",".join(map(str, row)) + "\n")

### 🔹 Stop SparkContext

In [9]:
sc.stop()