In [0]:
# project name: Project_1_Extraction_and_cleaning
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum, window, desc

# Create a SparkSession
spark = SparkSession.builder.appName("Online Retail Cleaning").getOrCreate()

# 1. Ingestion: Load data into DataFrame
file_path1 = "dbfs:/user/hive/warehouse/online_retail_data_2009_10"

file_path2 = "dbfs:/user/hive/warehouse/online_retail_data_2010_11"

# Load the Delta Table into a DataFrame
df1 = spark.read.format("delta").load(file_path1)

df2 = spark.read.format("delta").load(file_path2)

# CLEANING
# Combine datasets using union
combined_df = df1.unionByName(df2)

# Drop rows with null values in key columns
cleaned_df = combined_df.dropna(subset=["Invoice", "StockCode", "InvoiceDate", "Customer ID", "Quantity"])

# Remove duplicate rows
cleaned_df = cleaned_df.dropDuplicates()

In [0]:
# # Run the notebook that contains the cleaned_df DataFrame
# %run /Users/azuser2370_mml.local@techademy.com/Project_1_Extraction_and_cleaning


In [0]:
# Use the DataFrame directly
cleaned_df.show(10)

+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|     InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+
| 503063|    21495|         SKULLS WRAP|      25|29-03-2010 17:17| 0.42|      17664|United Kingdom|
| 503063|    21380|WOODEN HAPPY BIRT...|       6|29-03-2010 17:17| 2.95|      17664|United Kingdom|
| 503063|    20974|12 PENCILS SMALL ...|      24|29-03-2010 17:17| 0.65|      17664|United Kingdom|
| 503063|    20973|12 PENCIL SMALL T...|      24|29-03-2010 17:17| 0.65|      17664|United Kingdom|
| 503063|    10125|MINI FUNKY DESIGN...|      20|29-03-2010 17:17| 0.85|      17664|United Kingdom|
| 503063|   85226C|BLUE PULL BACK RA...|      12|29-03-2010 17:17| 0.55|      17664|United Kingdom|
| 503063|    22127|PARTY CONES CARNI...|      12|29-03-2010 17:17| 1.25|      17664|United Kingdom|


+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|     InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|01-12-2009 07:45| 6.95|      13085|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|01-12-2009 07:45| 6.75|      13085|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|01-12-2009 07:45| 6.75|      13085|United Kingdom|
| 489434|    22041|RECORD FRAME 7" S...|      48|01-12-2009 07:45|  2.1|      13085|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|01-12-2009 07:45| 1.25|      13085|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|01-12-2009 07:45| 1.65|      13085|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|01-12-2009 07:45| 1.25|      13085|United Kingdom|


+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|     InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+
| 536365|   85123A|WHITE HANGING HEA...|       6|01-12-2010 08:26| 2.55|      17850|United Kingdom|
| 536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 08:26| 3.39|      17850|United Kingdom|
| 536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 08:26| 2.75|      17850|United Kingdom|
| 536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 08:26| 3.39|      17850|United Kingdom|
| 536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 08:26| 3.39|      17850|United Kingdom|
| 536365|    22752|SET 7 BABUSHKA NE...|       2|01-12-2010 08:26| 7.65|      17850|United Kingdom|
| 536365|    21730|GLASS STAR FROSTE...|       6|01-12-2010 08:26| 4.25|      17850|United Kingdom|


In [0]:
from pyspark.sql.functions import col, round

# Add a TotalPrice column rounded to 2 decimal places
transformed_df = cleaned_df.withColumn("TotalPrice", round(col("Quantity") * col("Price"), 2))
print("Transformed Dataset with TotalPrice:")
transformed_df.show(5)



Transformed Dataset with TotalPrice:
+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+----------+
|Invoice|StockCode|         Description|Quantity|     InvoiceDate|Price|Customer ID|       Country|TotalPrice|
+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+----------+
| 503063|    21380|WOODEN HAPPY BIRT...|       6|29-03-2010 17:17| 2.95|      17664|United Kingdom|      17.7|
| 503063|   85226C|BLUE PULL BACK RA...|      12|29-03-2010 17:17| 0.55|      17664|United Kingdom|       6.6|
| 503063|    22127|PARTY CONES CARNI...|      12|29-03-2010 17:17| 1.25|      17664|United Kingdom|      15.0|
| 503063|    22236|CAKE STAND 3 TIER...|       1|29-03-2010 17:17|12.75|      17664|United Kingdom|     12.75|
| 503063|    22088|PAPER BUNTING COL...|       6|29-03-2010 17:17| 2.95|      17664|United Kingdom|      17.7|
+-------+---------+--------------------+--------+----------------+-----+---

+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|     InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+----------------+-----+-----------+--------------+
| 503063|    47566|       PARTY BUNTING|       5|29-03-2010 17:17| 4.65|      17664|United Kingdom|
| 503063|    21495|         SKULLS WRAP|      25|29-03-2010 17:17| 0.42|      17664|United Kingdom|
| 503063|    22084|PAPER CHAIN KIT E...|       6|29-03-2010 17:17| 2.95|      17664|United Kingdom|
| 503063|    21380|WOODEN HAPPY BIRT...|       6|29-03-2010 17:17| 2.95|      17664|United Kingdom|
| 503063|    20974|12 PENCILS SMALL ...|      24|29-03-2010 17:17| 0.65|      17664|United Kingdom|
| 503063|    20829|GLITTER HANGING B...|       8|29-03-2010 17:17|  2.1|      17664|United Kingdom|
| 503063|    20973|12 PENCIL SMALL T...|      24|29-03-2010 17:17| 0.65|      17664|United Kingdom|


In [0]:
# Group data by Country and calculate total sales, rounded to 2 decimal places
country_sales_df = transformed_df.groupBy("Country") \
    .sum("TotalPrice") \
    .withColumnRenamed("sum(TotalPrice)", "TotalSales") \
    .withColumn("TotalSales", round(col("TotalSales"), 2))  # Round to 2 decimal places
print("Total Sales Per Country:")
country_sales_df.show()


Total Sales Per Country:
+------------------+----------+
|           Country|TotalSales|
+------------------+----------+
|            Sweden|  87421.52|
|         Singapore|  13158.16|
|           Germany| 411959.16|
|               RSA|   1933.74|
|            France| 320046.26|
|            Greece|  18995.49|
|European Community|   1291.75|
|           Belgium|  63208.89|
|           Finland|  29514.45|
|             Malta|   5192.22|
|       Unspecified|   7370.46|
|           Nigeria|    140.39|
|             Italy|   30254.1|
|              EIRE| 573509.76|
|         Lithuania|   4892.68|
|            Norway|  35455.91|
|             Spain|  91013.44|
|           Denmark|  64459.59|
|       West Indies|    536.41|
|          Thailand|   3070.54|
+------------------+----------+
only showing top 20 rows



In [0]:
# Group data by StockCode and Description, calculate total sales, rounded to 2 decimal places
top_products_df = transformed_df.groupBy("StockCode", "Description") \
    .sum("TotalPrice") \
    .withColumnRenamed("sum(TotalPrice)", "TotalSales") \
    .withColumn("TotalSales", round(col("TotalSales"), 2)) \
    .orderBy(col("TotalSales").desc()) \
    .limit(10)
print("Top 10 Best-Selling Products:")
top_products_df.show()

Top 10 Best-Selling Products:
+---------+--------------------+----------+
|StockCode|         Description|TotalSales|
+---------+--------------------+----------+
|    22423|REGENCY CAKESTAND...| 261110.95|
|   85123A|WHITE HANGING HEA...| 237678.61|
|   85099B|JUMBO BAG RED RET...| 132180.02|
|    84879|ASSORTED COLOUR B...| 123631.87|
|     POST|             POSTAGE| 110338.51|
|    47566|       PARTY BUNTING| 102089.38|
|    22086|PAPER CHAIN KIT 5...|  75388.48|
|    79321|       CHILLI LIGHTS|   68453.5|
|   85099F|JUMBO BAG STRAWBERRY|  63615.53|
|    21137|BLACK RECORD COVE...|  63009.83|
+---------+--------------------+----------+



In [0]:
# Group data by Customer ID and calculate total sales
top_customers_df = transformed_df \
    .groupBy("Customer ID") \
    .sum("TotalPrice") \
    .withColumnRenamed("sum(TotalPrice)", "TotalSales") \
    .withColumn("TotalSales", round(col("TotalSales"), 2)) \
    .orderBy(col("TotalSales").desc()) \
    .limit(10)

# Display top customers by sales
print("Top 10 Customers by Sales:")
top_customers_df.show(truncate=False)

Top 10 Customers by Sales:
+-----------+----------+
|Customer ID|TotalSales|
+-----------+----------+
|18102      |570380.61 |
|14646      |523342.07 |
|14156      |296063.44 |
|14911      |265757.91 |
|17450      |231390.55 |
|13694      |190020.84 |
|17511      |168491.62 |
|12415      |143269.29 |
|16684      |141502.25 |
|15061      |124961.98 |
+-----------+----------+



In [0]:
# Grouping sales by product category (Description) and calculating total sales
sales_by_category_df = transformed_df \
    .groupBy("Description") \
    .sum("TotalPrice") \
    .withColumnRenamed("sum(TotalPrice)", "TotalSales") \
    .withColumn("TotalSales", round(col("TotalSales"), 2)) \
    .orderBy(col("TotalSales").desc())

# Display the top categories based on sales
print("Sales by Product Category (Top 10):")
sales_by_category_df.show(truncate=False)

Sales by Product Category (Top 10):
+-----------------------------------+----------+
|Description                        |TotalSales|
+-----------------------------------+----------+
|REGENCY CAKESTAND 3 TIER           |261110.95 |
|WHITE HANGING HEART T-LIGHT HOLDER |237678.61 |
|JUMBO BAG RED RETROSPOT            |132180.02 |
|ASSORTED COLOUR BIRD ORNAMENT      |123631.87 |
|POSTAGE                            |110338.51 |
|PARTY BUNTING                      |102089.38 |
|PAPER CHAIN KIT 50'S CHRISTMAS     |75388.48  |
|CHILLI LIGHTS                      |68453.5   |
|JUMBO BAG STRAWBERRY               |63615.53  |
|BLACK RECORD COVER FRAME           |63009.83  |
|ROTATING SILVER ANGELS T-LIGHT HLDR|55577.77  |
|WOOD BLACK BOARD ANT WHITE FINISH  |54200.36  |
|VINTAGE UNION JACK BUNTING         |53888.62  |
|EDWARDIAN PARASOL NATURAL          |53563.46  |
|JUMBO  BAG BAROQUE BLACK WHITE     |51617.3   |
|RABBIT NIGHT LIGHT                 |51042.84  |
|HEART OF WICKER LARGE           

In [0]:
# Group by Country and calculate total sales
country_sales_df = transformed_df \
    .groupBy("Country") \
    .sum("TotalPrice") \
    .withColumnRenamed("sum(TotalPrice)", "TotalSales") \
    .withColumn("TotalSales", round(col("TotalSales"), 2)) \
    .orderBy(col("TotalSales").desc()) \
    .limit(5)

# Display top 5 countries
print("Top 5 Countries by Sales:")
country_sales_df.show(truncate=False)

Top 5 Countries by Sales:
+--------------+-------------+
|Country       |TotalSales   |
+--------------+-------------+
|United Kingdom|1.348250507E7|
|EIRE          |573509.76    |
|Netherlands   |548330.7     |
|Germany       |411959.16    |
|France        |320046.26    |
+--------------+-------------+



In [0]:
from pyspark.sql.functions import when

# Define segmentation based on total sales
customer_segmentation_df = transformed_df \
    .groupBy("Customer ID") \
    .sum("TotalPrice") \
    .withColumnRenamed("sum(TotalPrice)", "TotalSales") \
    .withColumn("SalesSegment", when(col("TotalSales") < 50, "Low")
                .when((col("TotalSales") >= 50) & (col("TotalSales") < 200), "Medium")
                .otherwise("High"))

# Display customer segmentation
print("Customer Segmentation by Sales:")
customer_segmentation_df.show(truncate=False)

Customer Segmentation by Sales:
+-----------+------------------+------------+
|Customer ID|TotalSales        |SalesSegment|
+-----------+------------------+------------+
|17043      |2593.6499999999996|High        |
|17499      |1296.84           |High        |
|15173      |1399.26           |High        |
|13723      |1194.08           |High        |
|17048      |1991.4499999999998|High        |
|15432      |301.26            |High        |
|16742      |2334.9700000000003|High        |
|16896      |103.94999999999999|Medium      |
|15194      |11781.519999999997|High        |
|17971      |161.94            |Medium      |
|15437      |393.45000000000005|High        |
|15057      |3184.5600000000004|High        |
|18196      |1457.19           |High        |
|15375      |60.96             |Medium      |
|16530      |48.35             |Low         |
|13401      |-1834.14          |Low         |
|17703      |1094.84           |High        |
|15322      |1314.1499999999999|High        |
|1

In [0]:
# Get the column names of the DataFrame
column_names = customer_segmentation_df.columns

# Display the column names
print("Column Names:", column_names)


Column Names: ['Customer ID', 'TotalSales', 'SalesSegment']


In [0]:
customer_segmentation_df = customer_segmentation_df \
    .withColumnRenamed("Customer ID", "Customer_ID")

In [0]:
# LOADING
# write the data to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/customer_segmentation"
customer_segmentation_df.write.format("delta").mode("overwrite").save(delta_table_path)

# Register the Delta table in the Hive metastore
spark.sql(f"CREATE TABLE IF NOT EXISTS customer_segmentation USING DELTA LOCATION '{delta_table_path}'")


# Verify the table by running a query
spark.sql("SELECT * FROM customer_segmentation").show(truncate=False)


+-----------+------------------+------------+
|Customer_ID|TotalSales        |SalesSegment|
+-----------+------------------+------------+
|17043      |2593.6499999999996|High        |
|17499      |1296.84           |High        |
|15173      |1399.26           |High        |
|13723      |1194.08           |High        |
|17048      |1991.4499999999998|High        |
|15432      |301.26            |High        |
|16742      |2334.9700000000003|High        |
|16896      |103.94999999999999|Medium      |
|15194      |11781.519999999997|High        |
|17971      |161.94            |Medium      |
|15437      |393.45000000000005|High        |
|15057      |3184.5600000000004|High        |
|18196      |1457.19           |High        |
|15375      |60.96             |Medium      |
|16530      |48.35             |Low         |
|13401      |-1834.14          |Low         |
|17703      |1094.84           |High        |
|15322      |1314.1499999999999|High        |
|14846      |406.79999999999995|Hi