In [1]:
pip install pyspark



In [2]:
#Reading customers dataset

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CSVReader").getOrCreate()

df = spark.read.format("csv").option("header", True).option("delimiter", ",").load("/content/drive/MyDrive/Datasets/customers.csv")

df.show(5)

spark.stop()

+-----------+--------------+---------+-----+-------+
|Customer_Id|          Name|     City|State|Zipcode|
+-----------+--------------+---------+-----+-------+
|      11039|   Mary Torres|   Caguas|   PR|    725|
|       5623|    Jose Haley| Columbus|   OH|  43207|
|       5829|    Mary Smith|  Houston|   TX|  77015|
|       6336|Richard Maddox|   Caguas|   PR|    725|
|       1708|Margaret Booth|Arlington|   TX|  76010|
+-----------+--------------+---------+-----+-------+
only showing top 5 rows



In [4]:
#Reading sales transaction dataset

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CSVReader").getOrCreate()

df = spark.read.format("csv").option("header", True).option("delimiter", ",").load("/content/drive/MyDrive/Datasets/salestxns.csv")

df.show(5)

spark.stop()

+------------+-----------+----------------+----------+--------------------+------+---+-----------+
|Sales_Txn_Id|Category_Id|   Category_Name|Product_Id|        Product_Name| Price|Qty|Customer_Id|
+------------+-----------+----------------+----------+--------------------+------+---+-----------+
|           1|         43|Camping & Hiking|       957|Diamondback Women...|299.98|  1|      11599|
|           2|         48|    Water Sports|      1073|Pelican Sunstream...|199.99|  1|        256|
|           3|         24| Women's Apparel|       502|Nike Men's Dri-FI...|    50|  5|        256|
|           4|         18|  Men's Footwear|       403|Nike Men's CJ Eli...|129.99|  1|        256|
|           5|         40|     Accessories|       897|Team Golf New Eng...| 24.99|  2|       8827|
+------------+-----------+----------------+----------+--------------------+------+---+-----------+
only showing top 5 rows



In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum,avg
import pandas as pd

# Initialize Spark session
spark = SparkSession.builder.appName("CustomerSalesAggregation").getOrCreate()

# Load Customer Data
df = spark.read.format("csv").option("header", True).load("/content/drive/MyDrive/Datasets/customers.csv")

# Collect the relevant customer data into a Python list and broadcast it
broadcast_customer_data = spark.sparkContext.broadcast(
    df.select("Customer_Id", "Name").collect()
)

# Load Sales Transaction Data
sales_df = spark.read.format("csv").option("header", True).load("/content/drive/MyDrive/Datasets/salestxns.csv")

# Join Sales Transaction Data with Broadcasted Customer Data
joined_df = sales_df.join(
    spark.createDataFrame(broadcast_customer_data.value, schema=["Customer_Id", "Name"]),
    "Customer_Id"
)

# Perform Aggregations
aggregated_df = joined_df.groupBy("Customer_Id", "Name", "Product_Id", "Product_Name", "Price") \
    .agg(
        sum("Qty").alias("Total_Quantity"),
        sum(col("Qty") * col("Price")).alias("Total_Amount_Paid")
    )

# Display the Result
aggregated_df.show(truncate=False)

print('Top Customers by Total Amount Paid')
top_customers = aggregated_df.groupBy("Customer_Id", "Name").agg(sum("Total_Amount_Paid").alias("Total_Amount_Paid"))
top_customers = top_customers.orderBy(col("Total_Amount_Paid").desc())
top_customers.show()

print('Product Analysis')
product_sales = aggregated_df.groupBy("Product_Id", "Product_Name").agg(
    sum("Total_Quantity").alias("Total_Quantity"),
    sum("Total_Amount_Paid").alias("Total_Amount_Paid")
)
product_sales = product_sales.orderBy(col("Total_Quantity").desc())
product_sales.show()

print('Average Transaction Value')
avg_transaction_value = aggregated_df.groupBy("Customer_Id", "Name").agg(
    avg("Total_Amount_Paid").alias("Average_Transaction_Value")
)
avg_transaction_value = avg_transaction_value.orderBy(col("Average_Transaction_Value").desc())
avg_transaction_value.show()

# Stop Spark Session
spark.stop()

+-----------+----------------+----------+---------------------------------------------+------+--------------+------------------+
|Customer_Id|Name            |Product_Id|Product_Name                                 |Price |Total_Quantity|Total_Amount_Paid |
+-----------+----------------+----------+---------------------------------------------+------+--------------+------------------+
|9285       |Gloria Smith    |235       |Under Armour Hustle Storm Medium Duffle Bag  |34.99 |5.0           |174.95000000000002|
|8582       |Edward Smith    |403       |Nike Men's CJ Elite 2 TD Football Cleat      |129.99|1.0           |129.99            |
|9119       |Mary Smith      |1014      |O'Brien Men's Neoprene Life Vest             |49.98 |3.0           |149.94            |
|1598       |Mary Green      |403       |Nike Men's CJ Elite 2 TD Football Cleat      |129.99|4.0           |519.96            |
|10447      |Mary Smith      |502       |Nike Men's Dri-FIT Victory Golf Polo         |50    |27.