In [3]:
from pyspark.sql import SparkSession, types, functions as F
from pyspark.sql.functions import col, to_timestamp, expr
from pyspark.sql.window import Window
import os

In [5]:
# Path to the credentials file
credential_location = './keys/credentials.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'gs://europe-west1-composer-env-0e6b8b9f-bucket/credentials.json'


In [7]:
# Initialize Spark session with BigQuery and GCS support
spark = SparkSession.builder \
    .appName("UK_Online_Retail_Data_Exploration") \
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.32.2") \
    .config("spark.jars", "gcs-connector-hadoop3-2.2.5.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credential_location) \
    .getOrCreate()


:: loading settings :: url = jar:file:/opt/anaconda3/envs/spark_env/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/victorianweke/.ivy2/cache
The jars for the packages stored in: /Users/victorianweke/.ivy2/jars
com.google.cloud.spark#spark-bigquery-with-dependencies_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-586697c9-6beb-4ded-9e9a-c75c63ab04f0;1.0
	confs: [default]
	found com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.32.2 in central
:: resolution report :: resolve 70ms :: artifacts dl 1ms
	:: modules in use:
	com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.32.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	--------------------------------------------------

In [9]:
df_spark = spark.read \
    .option("header", "true") \
    .csv('gs://online-retail-data-bucket/uk_online_retail_data/online_retail.csv')

In [10]:
df_spark.count()

                                                                                

541909

In [13]:
df_spark.show()

+-----+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|index|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+-----+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|    0|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|   17850.0|United Kingdom|
|    1|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
|    2|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|   17850.0|United Kingdom|
|    3|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
|    4|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|   17850.0|United Kingdom|
|    5|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|   17850.0|United Kingdom|
|    6|   536365|  

In [15]:
# Count duplicates
df_spark.groupBy(df_spark.columns).count().filter("count > 1").show()


                                                                                

+-----+---------+---------+-----------+--------+-----------+---------+----------+-------+-----+
|index|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|count|
+-----+---------+---------+-----------+--------+-----------+---------+----------+-------+-----+
+-----+---------+---------+-----------+--------+-----------+---------+----------+-------+-----+



In [15]:
# Count nulls for each column
df_spark.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_spark.columns]).show()



+-----+---------+---------+-----------+--------+-----------+---------+----------+-------+
|index|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+-----+---------+---------+-----------+--------+-----------+---------+----------+-------+
|    0|        0|        0|       1454|       0|          0|        0|    135080|      0|
+-----+---------+---------+-----------+--------+-----------+---------+----------+-------+



                                                                                

In [17]:
df_spark.schema

StructType([StructField('index', StringType(), True), StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', StringType(), True), StructField('InvoiceDate', StringType(), True), StructField('UnitPrice', StringType(), True), StructField('CustomerID', StringType(), True), StructField('Country', StringType(), True)])

In [17]:
schema = types.StructType([
    types.StructField("index", types.IntegerType(), True),
    types.StructField("InvoiceNo", types.StringType(), True),
    types.StructField("StockCode", types.StringType(), True),
    types.StructField("Description", types.StringType(), True),
    types.StructField("Quantity", types.IntegerType(), True),
    types.StructField("InvoiceDate", types.StringType(), True),   
    types.StructField("UnitPrice", types.FloatType(), True),
    types.StructField("CustomerID", types.StringType(), True),
    types.StructField("Country", types.StringType(), True)
])

In [19]:
# Load the data with new schema
df_spark = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('gs://online-retail-data-bucket/uk_online_retail_data/online_retail.csv')

In [21]:
# Explicitly convert InvoiceDate to datetime format
#df_spark = df_spark.withColumn("InvoiceDate",  F.to_date("InvoiceDate", "MM/dd/yyyy HH:mm"))

df_spark = df_spark.withColumn("InvoiceDate", expr("try_to_timestamp(InvoiceDate, 'M/d/yyyy H:mm')"))


In [23]:
df_spark.show()

+-----+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|index|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+-----+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|    0|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|    1|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|    2|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|    3|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|    4|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|    5|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65| 

In [25]:
# Initialize BigQuery client
project_id = 'genial-upgrade-455023-g5'  
dataset_id = 'online_retail'  
table_id = 'retail'  


In [27]:
# Step 1: Identify the most common CustomerID per InvoiceNo (excluding null CustomerIDs)
window_spec_customer = Window.partitionBy("InvoiceNo").orderBy(F.desc("count"))

most_common_customer = (df_spark
    .filter(F.col("CustomerID").isNotNull())
    .groupBy("InvoiceNo", "CustomerID")
    .count()
    .withColumn("rank", F.row_number().over(window_spec_customer))
    .filter(F.col("rank") == 1)
    .select("InvoiceNo", "CustomerID")
)

# Rename the CustomerID column in most_common_customer to avoid ambiguity
most_common_customer = most_common_customer.withColumnRenamed("CustomerID", "MostCommonCustomerID")

# Step 2: Join back to the original DataFrame to fill missing CustomerID values
df_filled_customer = df_spark.alias("df").join(
    most_common_customer.alias("mc"),
    on="InvoiceNo",
    how="left"
).withColumn(
    "CustomerID", 
    F.coalesce(F.col("df.CustomerID"), F.col("mc.MostCommonCustomerID"))
)

# Drop 'MostCommonCustomerID' as we no longer need it
df_filled_customer = df_filled_customer.drop("MostCommonCustomerID")

# Option: Handle null CustomerID by setting a default value
default_customer_id = df_spark.filter(F.col("CustomerID").isNotNull()).groupBy("CustomerID").count().orderBy(F.desc("count")).first()[0]

df_filled_customer = df_filled_customer.withColumn(
    "CustomerID",
    F.when(F.col("CustomerID").isNull(), default_customer_id).otherwise(F.col("CustomerID"))
)

# Step 3: Identify the most common Description per StockCode
window_spec_description = Window.partitionBy("StockCode")

mode_desc = (df_spark
    .groupBy("StockCode", "Description")
    .count()
    .withColumn("rank", F.row_number().over(Window.partitionBy("StockCode").orderBy(F.desc("count"))))
    .filter(F.col("rank") == 1)
    .select("StockCode", "Description")
)

# Rename Description to avoid ambiguity
most_common_description = mode_desc.withColumnRenamed("Description", "MostCommonDescription")

# Step 4: Join back to the original DataFrame to fill missing Description values
df_filled_final = df_filled_customer.alias("df").join(
    most_common_description.alias("mc"),
    on="StockCode",
    how="left"
).withColumn(
    "Description", 
    F.coalesce(F.col("df.Description"), F.col("mc.MostCommonDescription"))
)

# Drop 'MostCommonDescription' as it's no longer needed
df_filled_final = df_filled_final.drop("MostCommonDescription")

# Option: Handle null Description by setting a default value
default_description = df_spark.filter(F.col("Description").isNotNull()).groupBy("Description").count().orderBy(F.desc("count")).first()[0]

df_filled_final = df_filled_final.withColumn(
    "Description",
    F.when(F.col("Description").isNull(), default_description).otherwise(F.col("Description"))
)

# Step 5: Extract Date Components
df_filled_final = df_filled_final.withColumn(
    "Year", F.year("InvoiceDate"))

df_filled_final = df_filled_final.withColumn(
    "Month", F.month("InvoiceDate"))

df_filled_final = df_filled_final.withColumn(
    "MonthName", F.date_format("InvoiceDate", "MMMM"))

df_filled_final = df_filled_final.withColumn(
    "DayOfWeek", F.dayofweek("InvoiceDate"))

df_filled_final = df_filled_final.withColumn(
    "NameOfDay", F.date_format("InvoiceDate", "EEEE"))

# Step 6: Compute Sales Volume per Product
df_sales = df_filled_final.withColumn("SalesValue", F.col("Quantity") * F.col("UnitPrice"))

sales_volume = df_sales.groupBy("StockCode").agg(
    F.sum("SalesValue").alias("TotalSales"),
    F.sum("Quantity").alias("TotalQuantity")
)

# Step 7: Compute Profit Margin per Product (Assume Cost Price = 80% of Unit Price)
df_profit = df_sales.withColumn("CostPrice", F.col("UnitPrice") * 0.8) \
                    .withColumn("ProfitMargin", (F.col("UnitPrice") - F.col("CostPrice")) / F.col("UnitPrice"))

profit_margin = df_profit.groupBy("StockCode").agg(
    F.avg("ProfitMargin").alias("AvgProfitMargin"),
    F.sum("SalesValue").alias("TotalSales_Profit")  # Renamed to avoid ambiguity
)

# Step 8: Merge Sales and Profit Data
product_performance = sales_volume.join(profit_margin, on="StockCode", how="inner")

# Step 8.1: Classify Sales Volume and Profit Margin
window_spec_sales = Window.orderBy(F.col("TotalSales"))
sales_volume = sales_volume.withColumn("SalesCategory", 
    F.when(F.ntile(3).over(window_spec_sales) == 1, "Low")
     .when(F.ntile(3).over(window_spec_sales) == 2, "Medium")
     .otherwise("High")
)

# Step 8.2: Classify Profit Margin
window_spec_profit = Window.orderBy(F.col("AvgProfitMargin"))
product_performance = product_performance.withColumn(
    "ProfitCategory",
    F.when(F.ntile(3).over(window_spec_profit) == 1, "Low")
     .when(F.ntile(3).over(window_spec_profit) == 2, "Medium")
     .otherwise("High")
)

# Ensure SalesCategory is merged with product_performance**
product_performance = product_performance.join(
    sales_volume.select("StockCode", "SalesCategory"), 
    on="StockCode", 
    how="left"
)

# Step 9: Merge with Main Dataset
df_final = df_filled_final.join(product_performance, on="StockCode", how="left")

# Drop duplicate TotalSales_Profit and rename the column to TotalSales
df_final = df_final.drop("TotalSales_Profit")

# Rename the correctly merged TotalSales
df_final = df_final.withColumnRenamed("TotalSales", "TotalSales")

# Step 10: Reapply the schema by selecting columns and casting to the correct types
window_spec = Window.orderBy(F.lit(1))  # Ensuring a stable order for indexing
df_final = df_final.withColumn("index", F.row_number().over(window_spec))

df_final = df_final.select(
    F.col("index").cast(types.IntegerType()),
    F.col("InvoiceNo").cast(types.StringType()),
    F.col("StockCode").cast(types.StringType()),
    F.col("Description").cast(types.StringType()),
    F.col("Quantity").cast(types.IntegerType()),
    F.col("InvoiceDate").cast(types.TimestampType()),
    F.col("UnitPrice").cast(types.FloatType()),
    F.col("CustomerID").cast(types.IntegerType()),  # Changed from FloatType to IntegerType
    F.col("Country").cast(types.StringType()),
    F.col("Year").cast(types.IntegerType()),
    F.col("Month").cast(types.IntegerType()),
    F.col("MonthName").cast(types.StringType()),
    F.col("DayOfWeek").cast(types.IntegerType()),
    F.col("NameOfDay").cast(types.StringType()),
    F.col("TotalSales").cast(types.FloatType()),
    F.col("TotalQuantity").cast(types.IntegerType()),
    F.col("AvgProfitMargin").cast(types.FloatType()),
    F.col("SalesCategory").cast(types.StringType()),
    F.col("ProfitCategory").cast(types.StringType())
)


# Check the final schema of the dataframe
df_final.printSchema()

# Write to BigQuery
# df_final.write.format("bigquery") \
#     .option("temporaryGcsBucket", "mortgage-data-bucket") \
#     .option("project", project_id) \
#     .option("dataset", dataset_id) \
#     .option("table", table_id) \
#     .option("partitionField", "InvoiceDate")    \
#     .option("clusteredFields", "CustomerID")    \
#     .mode("overwrite") \
#     .save()


                                                                                

root
 |-- index: integer (nullable = false)
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- MonthName: string (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- NameOfDay: string (nullable = true)
 |-- TotalSales: float (nullable = true)
 |-- TotalQuantity: integer (nullable = true)
 |-- AvgProfitMargin: float (nullable = true)
 |-- SalesCategory: string (nullable = true)
 |-- ProfitCategory: string (nullable = true)

