In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, max, avg
from pyspark.sql.functions import expr
from pyspark.sql.window import Window

# **Task 1.** Data Ingestion

# Step 1: Load Raw Data
- Objective: Load data from CSV files into DataFrames for processing.

In [3]:
# Load the datasets into Pandas DataFrames
transactions_df = pd.read_csv('raw_data/transactions.csv')
users_df = pd.read_csv('raw_data/users.csv')
products_df = pd.read_csv('raw_data/products.csv')

# Step 2: Inspect the Data
- Objective: Identify missing values, data type inconsistencies, and invalid records.

In [4]:
# Display information about each dataset
transactions_df.info()
users_df.info()
products_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   TransactionID    10000 non-null  object 
 1   CustomerID       10000 non-null  object 
 2   ProductID        10000 non-null  object 
 3   Category         10000 non-null  object 
 4   Quantity         10000 non-null  int64  
 5   Price            10000 non-null  float64
 6   TransactionDate  10000 non-null  object 
dtypes: float64(1), int64(1), object(5)
memory usage: 547.0+ KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   CustomerID  5000 non-null   object
 1   Name        5000 non-null   object
 2   Email       5000 non-null   object
 3   Age         5000 non-null   int64 
 4   Country     5000 non-null   object
 5   SignupDate  5000 non-nul

### **Summary for Data Cleaning**
- Total Rows:
  - Transactions: 10,000
  - Users: 5,000
  - Products: 1,000
- Total Columns:
  - Transactions: 7
  - Users: 6
  - Products: 6
- Columns to Convert:
  - `TransactionDate` → `datetime`
  - `SignupDate` → `datetime`
- Missing Data:
  - `ProductName` → 1 missing value.

In [5]:
# Display the first few rows of transactions data
transactions_df.head()

Unnamed: 0,TransactionID,CustomerID,ProductID,Category,Quantity,Price,TransactionDate
0,T000001,C3025,P00447,Sports,3,421.0,2024-04-22T10:24:22
1,T000002,C1947,P00707,Groceries,5,274.89,2024-05-03T14:42:23
2,T000003,C2163,P00561,Groceries,1,496.08,2024-06-14T22:07:18
3,T000004,C4859,P00168,Electronics,2,56.75,2024-02-22T02:06:49
4,T000005,C3592,P00806,Groceries,4,205.17,2024-01-01T14:50:28


In [6]:
# Display the first few rows of users data
users_df.head()

Unnamed: 0,CustomerID,Name,Email,Age,Country,SignupDate
0,C0001,Kyle Trevino,wallaceamanda@garcia-kim.com,54,Germany,2020-10-15
1,C0002,Brianna Taylor,danielfisher@gmail.com,45,USA,2021-06-02
2,C0003,Eric Larsen,riverajoshua@davis.com,25,Canada,2021-03-08
3,C0004,Zachary Campbell,robertbrown@clark.com,49,India,2023-04-12
4,C0005,Jesse Cunningham,taylor85@gmail.com,20,Germany,2022-08-24


In [7]:
# Display the first few rows of products data
products_df.head()

Unnamed: 0,ProductID,ProductName,Category,Brand,Price,StockQuantity
0,P00001,Machine,Fashion,Stewart and Sons,560.83,451
1,P00002,Chance,Home Appliances,"Bailey, Martinez and Blair",1480.03,503
2,P00003,Speak,Electronics,"Jenkins, Perry and Cook",261.54,559
3,P00004,Entire,Fashion,Rodriguez Group,1600.32,364
4,P00005,Medical,Sports,Wells-Lee,97.81,32


In [8]:
# Check for missing values
print(transactions_df.isnull().sum())
print(users_df.isnull().sum())
print(products_df.isnull().sum())

TransactionID      0
CustomerID         0
ProductID          0
Category           0
Quantity           0
Price              0
TransactionDate    0
dtype: int64
CustomerID    0
Name          0
Email         0
Age           0
Country       0
SignupDate    0
dtype: int64
ProductID        0
ProductName      1
Category         0
Brand            0
Price            0
StockQuantity    0
dtype: int64


In [9]:
# Identify the record with the missing ProductName
missing_product_record = products_df[products_df['ProductName'].isnull()]

# Display the record with missing ProductName
missing_product_record

Unnamed: 0,ProductID,ProductName,Category,Brand,Price,StockQuantity
930,P00931,,Groceries,"Johnson, Randall and Beard",1472.28,972


In [10]:
# Update the ProductName to 'None' for the record with ProductID 'P00931'
products_df.loc[products_df['ProductID'] == 'P00931', 'ProductName'] = 'None'

# Verify the update
products_df[products_df['ProductID'] == 'P00931']

Unnamed: 0,ProductID,ProductName,Category,Brand,Price,StockQuantity
930,P00931,,Groceries,"Johnson, Randall and Beard",1472.28,972


# Step 3: Handle Data Type Inconsistencies
- Objective: Ensure each column has the appropriate data type.

In [11]:
# Convert columns to appropriate data types
transactions_df["TransactionDate"] = pd.to_datetime(transactions_df["TransactionDate"])
users_df["SignupDate"] = pd.to_datetime(users_df["SignupDate"])

In [12]:
# Display information about each dataset
transactions_df.info()
users_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   TransactionID    10000 non-null  object        
 1   CustomerID       10000 non-null  object        
 2   ProductID        10000 non-null  object        
 3   Category         10000 non-null  object        
 4   Quantity         10000 non-null  int64         
 5   Price            10000 non-null  float64       
 6   TransactionDate  10000 non-null  datetime64[ns]
dtypes: datetime64[ns](1), float64(1), int64(1), object(4)
memory usage: 547.0+ KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   CustomerID  5000 non-null   object        
 1   Name        5000 non-null   object        
 2   Email       5000 non-null   ob

# Step 4: Handle Duplicate Records
- Objective: Identify and remove duplicate rows from the datasets.

In [13]:
# Check for duplicates
print("Duplicate records in transactions:", transactions_df.duplicated().sum())
print("Duplicate records in users:", users_df.duplicated().sum())
print("Duplicate records in products:", products_df.duplicated().sum())

# Remove duplicates
transactions_df = transactions_df.drop_duplicates()
users_df = users_df.drop_duplicates()
products_df = products_df.drop_duplicates()

Duplicate records in transactions: 0
Duplicate records in users: 0
Duplicate records in products: 0


# Step 5: Remove Invalid Records
- Objective: Filter out rows with invalid or out-of-range values.

In [14]:
# Remove invalid records
transactions_df = transactions_df[(transactions_df["Price"] > 0) & (transactions_df["Quantity"] > 0)]
users_df = users_df[users_df["Age"] > 0]
products_df = products_df[products_df["StockQuantity"] >= 0]

# Step 6: Save Cleaned Data to Staging Area

In [16]:
# Save cleaned data to CSV files
transactions_df.to_csv("cleaned_data/transactions_cleaned.csv", index=False)
users_df.to_csv("cleaned_data/users_cleaned.csv", index=False)
products_df.to_csv("cleaned_data/products_cleaned.csv", index=False)

# **Task 2.** Data Transformation

# Step 1: Initialize PySpark and Load Data

In [17]:
# Initialize SparkSession
spark = SparkSession.builder.appName("DataTransformation").getOrCreate()

# Load the cleaned data
transactions_df = spark.read.csv("cleaned_data/transactions_cleaned.csv", header=True, inferSchema=True)
users_df = spark.read.csv("cleaned_data/users_cleaned.csv", header=True, inferSchema=True)
products_df = spark.read.csv("cleaned_data/products_cleaned.csv", header=True, inferSchema=True)

# Step 2: Create CustomerTransactionSummary
- Logic:
1. `TotalSpent`: Sum of `Price` * `Quantity` for each `CustomerID`.
2. `TotalTransactions`: Count of transactions for each `CustomerID`.
3. `LastTransactionDate`: Latest transaction date for each `CustomerID`.

In [18]:
# Add a new column for TotalPrice (Price * Quantity)
transactions_df = transactions_df.withColumn("TotalPrice", col("Price") * col("Quantity"))

# Perform aggregations
customer_transaction_summary = transactions_df.groupBy("CustomerID") \
    .agg(
        sum("TotalPrice").alias("TotalSpent"),
        count("TransactionID").alias("TotalTransactions"),
        max("TransactionDate").alias("LastTransactionDate")
    )

# Display the results
customer_transaction_summary.show()

+----------+------------------+-----------------+-------------------+
|CustomerID|        TotalSpent|TotalTransactions|LastTransactionDate|
+----------+------------------+-----------------+-------------------+
|     C3656|            3128.3|                4|2024-10-06 02:06:27|
|     C3970|3828.7400000000002|                5|2024-11-24 05:49:35|
|     C1875|           3303.34|                2|2024-09-12 05:35:40|
|     C2983|            4403.5|                9|2024-10-30 07:21:02|
|     C0059|           1037.19|                2|2024-05-14 13:02:21|
|     C2430|           2028.51|                3|2024-09-30 23:19:58|
|     C4863|            386.69|                1|2024-10-18 17:48:26|
|     C1602|3039.0600000000004|                3|2024-06-29 05:06:22|
|     C1804|             594.3|                3|2024-08-02 01:40:58|
|     C4932|2716.2599999999998|                4|2024-08-15 19:54:58|
|     C4202|            609.06|                2|2024-07-04 06:56:18|
|     C2320|        

# Step 3: Create ProductPerformance
- Logic:
1. `TotalSales`: Sum of `Price` * `Quantity` for each `ProductID`.
2. `AveragePrice`: Average Price for each `ProductID`.
3. `UnitsSold`: Total quantity sold for each `ProductID`.


In [19]:
# Perform aggregations
product_performance = transactions_df.groupBy("ProductID") \
    .agg(
        sum("TotalPrice").alias("TotalSales"),
        avg("Price").alias("AveragePrice"),
        sum("Quantity").alias("UnitsSold")
    )

# Display the results
product_performance.show()

+---------+------------------+------------------+---------+
|ProductID|        TotalSales|      AveragePrice|UnitsSold|
+---------+------------------+------------------+---------+
|   P00704|           5037.93|272.12699999999995|       18|
|   P00256|           4920.84|           219.185|       28|
|   P00217|          16331.53|308.06999999999994|       51|
|   P00525|           7819.93|238.25111111111113|       29|
|   P00566| 7752.539999999999| 282.3573333333333|       33|
|   P00336|6407.1900000000005|253.90499999999997|       26|
|   P00339| 5708.109999999999|250.46142857142857|       21|
|   P00113|           6551.28|225.15500000000003|       31|
|   P00685|           5544.34|295.53222222222223|       22|
|   P00882|           8075.56| 259.4245454545454|       34|
|   P00606|16735.969999999998|        290.941875|       52|
|   P00710|           9272.94| 186.9166666666667|       46|
|   P00389| 5038.719999999999| 287.6171428571429|       19|
|   P00062| 5143.880000000001|199.236363

# Step 4: Save the Transformed Tables

In [23]:
# Save CustomerTransactionSummary
customer_transaction_summary.coalesce(1).write.mode("overwrite").csv("transformed_data/customer_transaction_summary.csv", header=True)

# Save ProductPerformance
product_performance.coalesce(1).write.mode("overwrite").csv("transformed_data/product_performance.csv", header=True)


# Step 5: Verify the Transformed Data
- To ensure the transformed data is accurate, you can load and display it:

In [29]:
# Load and display CustomerTransactionSummary
customer_summary = spark.read.csv("transformed_data/customer_transaction_summary.csv", header=True, inferSchema=True)
customer_summary.show()

# Load and display ProductPerformance
product_perf = spark.read.csv("transformed_data/product_performance.csv", header=True, inferSchema=True)
product_perf.show()

+----------+------------------+-----------------+-------------------+
|CustomerID|        TotalSpent|TotalTransactions|LastTransactionDate|
+----------+------------------+-----------------+-------------------+
|     C3656|            3128.3|                4|2024-10-06 02:06:27|
|     C3970|3828.7400000000002|                5|2024-11-24 05:49:35|
|     C1875|           3303.34|                2|2024-09-12 05:35:40|
|     C2983|            4403.5|                9|2024-10-30 07:21:02|
|     C0059|           1037.19|                2|2024-05-14 13:02:21|
|     C2430|           2028.51|                3|2024-09-30 23:19:58|
|     C4863|            386.69|                1|2024-10-18 17:48:26|
|     C1602|3039.0600000000004|                3|2024-06-29 05:06:22|
|     C1804|             594.3|                3|2024-08-02 01:40:58|
|     C4932|2716.2599999999998|                4|2024-08-15 19:54:58|
|     C4202|            609.06|                2|2024-07-04 06:56:18|
|     C2320|        

# **Task 5.** Optimization

- ### **Caching or Persisting Data**

In [2]:
from pyspark import StorageLevel
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("Optimization").getOrCreate()

# Load the data
df_transactions = spark.read.csv("cleaned_data/transactions_cleaned.csv", header=True, inferSchema=True)

# Cache the DataFrame for faster access
df_transactions.cache()

# Or use persist to control storage levels
df_transactions.persist(StorageLevel.MEMORY_AND_DISK)

# Perform operations
daily_sales = df_transactions.groupBy("TransactionDate").agg({"Price": "sum"})
daily_sales.show()


+-------------------+----------+
|    TransactionDate|sum(Price)|
+-------------------+----------+
|2024-05-27 14:43:59|    242.28|
|2024-06-18 15:33:19|    391.44|
|2024-09-27 11:00:31|    230.29|
|2024-09-28 19:15:48|      32.8|
|2024-06-11 04:04:08|    286.92|
|2024-04-04 16:52:36|    381.54|
|2024-01-07 22:48:24|    274.52|
|2024-09-30 04:06:18|    194.18|
|2024-12-01 20:07:12|    240.37|
|2024-11-11 19:02:32|     88.68|
|2024-11-28 03:16:13|    184.03|
|2024-10-01 02:21:14|     48.23|
|2024-09-14 12:45:58|    173.39|
|2024-03-08 05:59:10|     425.4|
|2024-09-26 17:21:25|    457.06|
|2024-01-28 09:36:38|    455.55|
|2024-11-22 15:53:45|     81.28|
|2024-01-06 18:37:48|     46.96|
|2024-11-02 10:41:47|    295.85|
|2024-10-19 06:45:57|    309.71|
+-------------------+----------+
only showing top 20 rows



#### **When to Use**:
- Cache when a DataFrame is reused multiple times in the same job.
- Use `persist` if the DataFrame is too large for memory, as it spills to disk.

- ### **Partition data by date**

### **Step 1: Define the Quarters**
1. **Quarters for Partitioning**:
   - Q1: `Dec 24, 2023` to `Mar 23, 2024`
   - Q2: `Mar 24, 2024` to `Jun 23, 2024`
   - Q3: `Jun 24, 2024` to `Sep 23, 2024`
   - Q4: `Sep 24, 2024` to `Dec 23, 2024`

2. **Create a Helper Column**:
   - Add a column `Quarter` to categorize each transaction into one of these intervals.

### **Step 2: Add Quarter Column**
- Use PySpark to calculate the quarter dynamically:

In [4]:
from pyspark.sql.functions import when, col, lit

# Load the cleaned transactions DataFrame
transactions_df = spark.read.csv("cleaned_data/transactions_cleaned.csv", header=True, inferSchema=True)

# Add Quarter column based on TransactionDate
partitioned_df = transactions_df.withColumn(
    "Quarter",
    when(
        (col("TransactionDate") >= lit("2023-12-24")) & (col("TransactionDate") <= lit("2024-03-23")), lit("Q1")
    ).when(
        (col("TransactionDate") >= lit("2024-03-24")) & (col("TransactionDate") <= lit("2024-06-23")), lit("Q2")
    ).when(
        (col("TransactionDate") >= lit("2024-06-24")) & (col("TransactionDate") <= lit("2024-09-23")), lit("Q3")
    ).when(
        (col("TransactionDate") >= lit("2024-09-24")) & (col("TransactionDate") <= lit("2024-12-23")), lit("Q4")
    )
)


### **Step 3: Partition the Data**
- Partition the data by the `Quarter` column and save it:

In [5]:
# Partition by Quarter and save to Parquet
partitioned_df.write.partitionBy("Quarter").parquet("partitioned_data/transactions_by_quarter")

### **Step 4: Verify the Partitioned Data**
Read the partitioned data and view a specific quarter:

In [8]:
# Load and filter data for a Q1 partition
q1_data = spark.read.parquet("partitioned_data/transactions_by_quarter/Quarter=Q1")
q1_data.show()

+-------------+----------+---------+---------------+--------+------+-------------------+
|TransactionID|CustomerID|ProductID|       Category|Quantity| Price|    TransactionDate|
+-------------+----------+---------+---------------+--------+------+-------------------+
|      T000004|     C4859|   P00168|    Electronics|       2| 56.75|2024-02-22 02:06:49|
|      T000005|     C3592|   P00806|      Groceries|       4|205.17|2024-01-01 14:50:28|
|      T000016|     C2344|   P00326|          Books|       1|416.48|2024-01-05 08:32:31|
|      T000019|     C2247|   P00573|Home Appliances|       4|176.98|2024-02-10 16:45:18|
|      T000020|     C3193|   P00595|         Sports|       1|166.41|2024-01-13 11:54:10|
|      T000025|     C2300|   P00245|Home Appliances|       1| 338.4|2024-02-07 10:11:27|
|      T000027|     C1584|   P00134|        Fashion|       5|345.76|2024-03-06 04:16:41|
|      T000033|     C0370|   P00183|        Fashion|       4|111.68|2024-01-25 00:02:18|
|      T000038|     C

### **Benefits of Partitioning**
1. **Query Optimization**:
   - Queries filtering by `Quarter` will only read relevant partitions, reducing data scan size.
2. **Parallel Processing**:
   - Spark processes each partition independently, improving performance on distributed clusters.