## Install PySpark and Java

In [None]:
# Installing PySpark and Java
# Installing  Java and the compatible version of PySpark

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark (adjusting the version when needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2bin-hadoop3.2.tgz

# Unzip the downloaded file
!tar xf spark-3.1.2-bin-hadoop3.2.tgz



In [None]:
! pip install openpyxl



## Set up the Environment Variables

In [None]:
# Configuring the environment for Java and Spark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


In [None]:
!pip install pyspark==3.1.2


Collecting pyspark==3.1.2
  Using cached pyspark-3.1.2-py2.py3-none-any.whl
Collecting py4j==0.10.9 (from pyspark==3.1.2)
  Using cached py4j-0.10.9-py2.py3-none-any.whl.metadata (1.3 kB)
Using cached py4j-0.10.9-py2.py3-none-any.whl (198 kB)
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninstalling py4j-0.10.9.7:
      Successfully uninstalled py4j-0.10.9.7
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.3
    Uninstalling pyspark-3.5.3:
      Successfully uninstalled pyspark-3.5.3
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
import pyspark
print("PySpark version:", pyspark.__version__)


PySpark version: 3.1.2


## Creating a Spark Session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("WalmartSalesDataPipeline") \
    .master("local[*]") \
    .getOrCreate()


# Load the Data

In [None]:
# Load the data into DataFrames
#customers_df = spark.read.csv("/content/customers.tsv", header=True, inferSchema=True)



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Start Spark session
spark = SparkSession.builder.appName("AddHeadersToTSV").getOrCreate()

# Define the schema with the required column names and data types
schema = StructType([
    StructField("Customer_ID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Zip_Code", StringType(), True)
])

# Load TSV data with the defined schema
file_path = "/content/customers.tsv"
customers_df = spark.read.option("delimiter", "\t").csv(file_path, schema=schema, header=False)

# Show the result
#df.show()


In [None]:
# Show the result
customers_df.show()

+-----------+----------------+-------------+-----+--------+
|Customer_ID|            Name|         City|State|Zip_Code|
+-----------+----------------+-------------+-----+--------+
|      11039|     Mary Torres|       Caguas|   PR|     725|
|       5623|      Jose Haley|     Columbus|   OH|   43207|
|       5829|      Mary Smith|      Houston|   TX|   77015|
|       6336|  Richard Maddox|       Caguas|   PR|     725|
|       1708|  Margaret Booth|    Arlington|   TX|   76010|
|      10227|  Mary Henderson|       Caguas|   PR|     725|
|        839|     Lisa Walker|       Caguas|   PR|     725|
|       7604|   Jonathan Hill|      Phoenix|   AZ|   85040|
|       6485|Carolyn Sheppard|Pompano Beach|   FL|   33063|
|       4737|    Mary Mendoza|       Caguas|   PR|     725|
|       5973|   Michael Smith|       Caguas|   PR|     725|
|       9205|    James Holmes|     Hilliard|   OH|   43026|
|        138|     Mary Dawson|       Caguas|   PR|     725|
|        371|    Adam Marquez|  San Anto

In [None]:
#sales_df = spark.read.csv("/content/salestxns.tsv", header=True, inferSchema=True)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Start Spark session
spark = SparkSession.builder.appName("AddHeadersToTSV").getOrCreate()

# Define the schema with the required column names and data types
schema = StructType([
    StructField("Sales_Txn_ID", StringType(), True),
    StructField("Category_ID", StringType(), True),
    StructField("Category_Name", StringType(), True),
    StructField("Product_ID", StringType(), True),
    StructField("Product_Name", StringType(), True),
    StructField("Price", StringType(), True),
    StructField("Quantity", StringType(), True),
    StructField("Customer_ID", StringType(), True)
])

# Load TSV data with the defined schema
file_path = "/content/salestxns.tsv"
sales_df = spark.read.option("delimiter", "\t").csv(file_path, schema=schema, header=False)

In [None]:
sales_df.show()

+------------+-----------+--------------------+----------+--------------------+------+--------+-----------+
|Sales_Txn_ID|Category_ID|       Category_Name|Product_ID|        Product_Name| Price|Quantity|Customer_ID|
+------------+-----------+--------------------+----------+--------------------+------+--------+-----------+
|           1|         43|    Camping & Hiking|       957|Diamondback Women...|299.98|       1|      11599|
|           2|         48|        Water Sports|      1073|Pelican Sunstream...|199.99|       1|        256|
|           3|         24|     Women's Apparel|       502|Nike Men's Dri-FI...|    50|       5|        256|
|           4|         18|      Men's Footwear|       403|Nike Men's CJ Eli...|129.99|       1|        256|
|           5|         40|         Accessories|       897|Team Golf New Eng...| 24.99|       2|       8827|
|           6|         17|              Cleats|       365|Perfect Fitness P...| 59.99|       5|       8827|
|           7|         24|  

## Data Preprocessing ✅



#### Checking for Data Nullity

- Creating a function that calculates the count of nulls for each column in a DataFrame.

In [None]:
from pyspark.sql import functions as F

# Function to count null values in each column
def count_nulls(df):
    return df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])


- Applying this function to both datasets to get a summary of missing values for each column.

In [None]:
# Count missing values in customers_df
null_counts_customers = count_nulls(customers_df)
null_counts_customers.show()

# Count missing values in sales_df
null_counts_sales = count_nulls(sales_df)
null_counts_sales.show()


+-----------+----+----+-----+--------+
|Customer_ID|Name|City|State|Zip_Code|
+-----------+----+----+-----+--------+
|          0|   0|   0|    0|       0|
+-----------+----+----+-----+--------+

+------------+-----------+-------------+----------+------------+-----+--------+-----------+----------------+-----+
|Sales_Txn_ID|Category_ID|Category_Name|Product_ID|Product_Name|Price|Quantity|Customer_ID|Transaction_Date|Month|
+------------+-----------+-------------+----------+------------+-----+--------+-----------+----------------+-----+
|           0|          0|            0|         0|           0|    0|       0|          0|               0|    0|
+------------+-----------+-------------+----------+------------+-----+--------+-----------+----------------+-----+



#### Convert necessary columns to appropriate data types.

In [None]:
from pyspark.sql.types import IntegerType, FloatType

# Convert data types for necessary columns
sales_df = sales_df.withColumn("Price", sales_df["Price"].cast(FloatType())) \
                   .withColumn("Quantity", sales_df["Quantity"].cast(IntegerType()))


## SQL Queries ⚡

#### 1. Total Number of Customers:
How many unique customers are there in the dataset?


In [None]:
unique_customers = customers_df.select("Customer_ID").distinct().count()
print("Total number of unique customers:", unique_customers)


Total number of unique customers: 1244


#### 2.Total Sales by State:
What is the total sales amount for each state?

In [None]:
from pyspark.sql import functions as F

# Join sales and customers data
joined_df = sales_df.join(customers_df, on="Customer_ID", how="inner")

# Calculate total sales by state
total_sales_by_state = joined_df.withColumn("Total_Price", F.col("Price") * F.col("Quantity")) \
                                .groupBy("State") \
                                .agg(F.sum("Total_Price").alias("Total_Sales"))

total_sales_by_state.show()


+-----+------------------+
|State|       Total_Sales|
+-----+------------------+
|   AZ|  48702.6809425354|
|   SC| 4144.680107116699|
|   LA| 24449.42046356201|
|   MN| 3549.600028991699|
|   NJ| 52303.09112358093|
|   DC| 8798.760160446167|
|   OR| 9544.780143737793|
|   VA|30488.970476150513|
|   RI| 5424.410140991211|
|   KY| 2749.700065612793|
|   MI| 83347.09171104431|
|   NV| 47103.61082458496|
|   WI| 24561.30038833618|
|   ID|10098.950242996216|
|   CA|503205.49938964844|
|   CT|19206.770456314087|
|   NC| 45275.89086151123|
|   MD| 51982.49096298218|
|   DE|1305.7600326538086|
|   MO| 34749.06068229675|
+-----+------------------+
only showing top 20 rows



#### 3. Top 10 Most Purchased Products:
Which are the top 10 most purchased products based on the quantity sold?

In [None]:
top_products = sales_df.groupBy("Product_Name") \
                       .agg(F.sum("Quantity").alias("Total_Quantity")) \
                       .orderBy(F.desc("Total_Quantity")) \
                       .limit(10)

top_products.show()


+--------------------+--------------+
|        Product_Name|Total_Quantity|
+--------------------+--------------+
|Perfect Fitness P...|         73698|
|Nike Men's Dri-FI...|         62956|
|O'Brien Men's Neo...|         57803|
|Nike Men's Free 5...|         36680|
|Under Armour Girl...|         31735|
|Nike Men's CJ Eli...|         22246|
|Field & Stream Sp...|         17325|
|Pelican Sunstream...|         15500|
|Diamondback Women...|         13729|
|ENO Atlas Hammock...|           998|
+--------------------+--------------+



#### 4. Average Transaction Value:
What is the average price of transactions across all sales?


In [None]:
avg_transaction_value = sales_df.withColumn("Total_Price", F.col("Price") * F.col("Quantity")) \
                                .agg(F.avg("Total_Price").alias("Avg_Transaction_Value"))

avg_transaction_value.show()


+---------------------+
|Avg_Transaction_Value|
+---------------------+
|    199.3206689847089|
+---------------------+



#### 5. Top 5 Customers by Expenditure:
Who are the top 5 customers by total amount spent?

In [None]:
top_customers = joined_df.withColumn("Total_Price", F.col("Price") * F.col("Quantity")) \
                         .groupBy("Customer_ID", "Name") \
                         .agg(F.sum("Total_Price").alias("Total_Expenditure")) \
                         .orderBy(F.desc("Total_Expenditure")) \
                         .limit(5)

top_customers.show()


+-----------+-----------------+-----------------+
|Customer_ID|             Name|Total_Expenditure|
+-----------+-----------------+-----------------+
|       9371|   Mary Patterson|9299.030221939087|
|        664|    Bobby Jimenez|8394.260208129883|
|      12431|        Mary Rios|8073.150127410889|
|      10591| Deborah Humphrey|7889.050121307373|
|       9271|Christopher Smith|7665.250144958496|
+-----------+-----------------+-----------------+



#### 6. Product Purchases by a Specific Customer:
List all products purchased by a specific customer (e.g., customer with ID 256)

In [None]:
customer_id = 256  # Example Customer ID

customer_purchases = sales_df.filter(sales_df["Customer_ID"] == customer_id) \
                             .select("Product_Name", "Quantity", (F.col("Price") * F.col("Quantity")).alias("Total_Spent"))

customer_purchases.show()




+--------------------+--------+-----------+
|        Product_Name|Quantity|Total_Spent|
+--------------------+--------+-----------+
|Pelican Sunstream...|       1|     199.99|
|Nike Men's Dri-FI...|       5|      250.0|
|Nike Men's CJ Eli...|       1|     129.99|
|Team Golf St. Lou...|       5|     124.95|
|TYR Boys' Team Di...|       5|  199.95001|
|Field & Stream Sp...|       1|     399.98|
|Field & Stream Sp...|       1|     399.98|
|Nike Men's Dri-FI...|       5|      250.0|
|Nike Men's CJ Eli...|       1|     129.99|
|Nike Men's CJ Eli...|       1|     129.99|
|Perfect Fitness P...|       5|     299.95|
|O'Brien Men's Neo...|       5|      249.9|
|Nike Men's CJ Eli...|       1|     129.99|
|O'Brien Men's Neo...|       4|     199.92|
|Under Armour Wome...|       1|      54.97|
|Nike Women's Temp...|       4|      120.0|
|Nike Men's Dri-FI...|       1|       50.0|
|Nike Men's Dri-FI...|       2|      100.0|
|Nike Men's Dri-FI...|       2|      100.0|
|Diamondback Women...|       1| 

#### 7. Monthly Sales Trends:
Assuming there is a date field, analyze the sales trends over the months. Which month had the highest sales?


- Since There is no 'Date' and 'Month' data column available in `Sales_df` dataset, so we can not extract Monthly Sales Data from it .

- Since the document doesn’t indicate the presence of a date field in `sales_df`, we need to add a date column manually .

- We will generate random dates within a specified range for each transaction, we can add a `Transaction_Date` column with dates spread randomly over a given range.


So What the things we need to able to get the desired output...? ☝


We can implement three things here:


1) **Define the Date Range** Choose a start and end date.

2)**Generate Random Dates** within that range for each row.

3)**Extract the Month from the random** dates for monthly analysis.



In [None]:
import random
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
from datetime import date, timedelta

# Define the start and end dates for the random date range
start_date = date(2023, 1, 1)
end_date = date(2023, 12, 31)

# Function to generate a random date within the specified range
def random_date(start, end):
    delta = end - start
    random_days = random.randint(0, delta.days)
    return start + timedelta(days=random_days)

# Register the function as a UDF
random_date_udf = F.udf(lambda: random_date(start_date, end_date), DateType())   # UDF = User Defined Function

# Add a Transaction_Date column with random dates for each transaction
sales_df = sales_df.withColumn("Transaction_Date", random_date_udf())

# Extract the month from the new Transaction_Date column
sales_df = sales_df.withColumn("Month", F.month("Transaction_Date"))

# Calculate monthly sales
monthly_sales = sales_df.withColumn("Total_Price", F.col("Price") * F.col("Quantity")) \
                        .groupBy("Month") \
                        .agg(F.sum("Total_Price").alias("Monthly_Sales")) \
                        .orderBy(F.desc("Monthly_Sales"))

# Show the result
sales_df.show()


+------------+-----------+--------------------+----------+--------------------+------+--------+-----------+----------------+-----+
|Sales_Txn_ID|Category_ID|       Category_Name|Product_ID|        Product_Name| Price|Quantity|Customer_ID|Transaction_Date|Month|
+------------+-----------+--------------------+----------+--------------------+------+--------+-----------+----------------+-----+
|           1|         43|    Camping & Hiking|       957|Diamondback Women...|299.98|       1|      11599|      2023-08-20|    8|
|           2|         48|        Water Sports|      1073|Pelican Sunstream...|199.99|       1|        256|      2023-12-01|   12|
|           3|         24|     Women's Apparel|       502|Nike Men's Dri-FI...|  50.0|       5|        256|      2023-03-20|    3|
|           4|         18|      Men's Footwear|       403|Nike Men's CJ Eli...|129.99|       1|        256|      2023-12-16|   12|
|           5|         40|         Accessories|       897|Team Golf New Eng...| 24.

In [None]:
# Calculate monthly sales
monthly_sales = sales_df.withColumn("Total_Price", F.col("Price") * F.col("Quantity")) \
                        .groupBy("Month") \
                        .agg(F.sum("Total_Price").alias("Monthly_Sales")) \
                        .orderBy(F.desc("Monthly_Sales"))

# Show the result
monthly_sales.show()

+-----+------------------+
|Month|     Monthly_Sales|
+-----+------------------+
|    8|2958404.6940670013|
|    3|2943308.9335212708|
|    1| 2935582.263519287|
|   12| 2912767.702472687|
|    7| 2908226.104948044|
|   10|2891314.8834323883|
|    5| 2888638.664302826|
|   11|2844694.1200847626|
|    6|2831771.3922538757|
|    4|2797452.9413547516|
|    9|2763058.1299476624|
|    2| 2647400.727924347|
+-----+------------------+



####8. Category with Highest Sales:
Which product category generated the highest total sales revenue?


In [None]:
category_sales = sales_df.withColumn("Total_Price", F.col("Price") * F.col("Quantity")) \
                         .groupBy("Category_Name") \
                         .agg(F.sum("Total_Price").alias("Total_Category_Sales")) \
                         .orderBy(F.desc("Total_Category_Sales")) \
                         .limit(1)

category_sales.show()


+-------------+--------------------+
|Category_Name|Total_Category_Sales|
+-------------+--------------------+
|      Fishing|   6929653.690338135|
+-------------+--------------------+



#### 9. State-wise Sales Comparison:
Compare the total sales between two specific states (e.g., Texas vs. Ohio). Which state had higher sales?

In [None]:
states_to_compare = ["TX", "OH"]  # Example states

state_comparison = joined_df.filter(joined_df.State.isin(states_to_compare)) \
                            .withColumn("Total_Price", F.col("Price") * F.col("Quantity")) \
                            .groupBy("State") \
                            .agg(F.sum("Total_Price").alias("Total_Sales")) \
                            .orderBy(F.desc("Total_Sales"))

state_comparison.show()


+-----+-----------------+
|State|      Total_Sales|
+-----+-----------------+
|   TX|184629.3032875061|
|   OH|82342.95152282715|
+-----+-----------------+



#### 10. Detailed Customer Purchase Report:
Generate a detailed report showing each customer along with their total purchases, the total number of transactions they have made, and the average transaction value.


The issue might be here is that there is no `Name` column in `sales_df` but there is in `customers_df`. So we have just to join these two dataset to get the desired outcome of  "Detailed Customer Purchase Report" having both `Name` and `Customer_ID`.


We can do it as flowing: ⚡


## Verify Join Condition ✅
Ensure that both sales_df and customers_df have matching column names for the join key `Coustomer_ID`.

In [None]:
print(sales_df.columns)
print(customers_df.columns)


['Sales_Txn_ID', 'Category_ID', 'Category_Name', 'Product_ID', 'Product_Name', 'Price', 'Quantity', 'Customer_ID', 'Transaction_Date', 'Month']
['Customer_ID', 'Name', 'City', 'State', 'Zip_Code']


 ## Specify The Join Type ✅
 Try a left join to retain all sales_df rows, even if no matching Name exists in customers_df.

In [None]:
# Perform a left join to retain all rows from sales_df and match with customers_df
joined_df = sales_df.join(customers_df, on="Customer_Id", how="left")


In [None]:
customer_report = joined_df.withColumn("Total_Price", F.col("Price") * F.col("Quantity")) \
                           .groupBy("Customer_Id", "Name") \
                           .agg(F.sum("Total_Price").alias("Total_Purchases"),
                                F.count("Sales_Txn_Id").alias("Total_Transactions"),
                                F.avg("Price").alias("Avg_Transaction_Value"))

customer_report.show(5)



+-----------+---------------+-----------------+------------------+---------------------+
|Customer_Id|           Name|  Total_Purchases|Total_Transactions|Avg_Transaction_Value|
+-----------+---------------+-----------------+------------------+---------------------+
|       5534|Elizabeth Smith|3594.650074005127|                19|   154.98842440153422|
|      11404|     Helen Cook|4017.410053253174|                21|   107.70333562578473|
|       2669|   Dorothy Buck|5090.280059814453|                26|   115.44961797274075|
|       5778|   Evelyn Smith|6587.310104370117|                28|   162.80821868351526|
|       8530|  William Smith|379.8599967956543|                 3|    46.65000025431315|
+-----------+---------------+-----------------+------------------+---------------------+
only showing top 5 rows

