In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = SparkSession.builder.appName("Project2").getOrCreate()

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

output_dir = "output/"
task2_output_total = output_dir + "task2/task2_total.csv"
task2_output_avgsales = output_dir + "task2/task2_avgsales.csv"
task2_output_season = output_dir + "task2/task2_season.csv"
task3_output = output_dir + "task3/task3.csv"
task4_output = output_dir + "task4/task4.csv"

checkpoint_dir = "checkpoint/task5/"
task5_output = output_dir + "task5.csv"

In [24]:
train_df = spark.read.json("train.json")
reviwes_df = spark.read.json("test.json")
train_df.show()
#reviwes_df = reviwes_df.withColumn("helpful", F.col("helpful").cast("double"))
#reviwes_df.printSchema()
#print(reviwes_df.describe())
# Check if any rows have nulls in the 'helpful' column after casting
#invalid_rows = reviwes_df.filter(F.col("sentence").isNull())
# Show rows that could not be cast to double
#invalid_rows.show()

+----------+-------+--------------------+--------------------+--------------------+
|      asin|helpful|      main_image_url|       product_title|            sentence|
+----------+-------+--------------------+--------------------+--------------------+
|B000AO3L84|    1.7|http://ecx.images...|Canon 430EX Speed...|this flash is a s...|
|B001SEQPGK|    1.3|http://ecx.images...|Sony Cyber-shot D...|The pictures were...|
|0553386697|    1.9|http://ecx.images...|The Whole-Brain C...|A very good resou...|
|B006SUWZH2|   0.25|http://ecx.images...|Memorex Portable ...|We have it in a c...|
|B000W7F5SS|    0.9|http://ecx.images...|Harry Potter and ...|Again the makers ...|
|B000AO3L84|    2.0|http://ecx.images...|Canon 430EX Speed...|This flash is a g...|
|B00081NX5U|   0.73|http://ecx.images...|iPod Detachable R...|So I've had these...|
|B00000F1D3|    0.9|http://ecx.images...|             Believe|they're cd's or t...|
|B00000FCBH|    1.3|http://ecx.images...|  2Pac Greatest Hits|he proved that

In [25]:
reviwes_df.show()

+----------+-------+--------------------+--------------------+--------------------+
|      asin|helpful|      main_image_url|       product_title|            sentence|
+----------+-------+--------------------+--------------------+--------------------+
|B00VG90446|   1.07|http://ecx.images...|Flexion KS-902 Ki...|so it stays in pl...|
|B001196MG0|   1.33|http://ecx.images...|Savage 107X12-1 S...|Love this seamles...|
|B00081NX5U|   1.17|http://ecx.images...|iPod Detachable R...|very happy with m...|
|B003HC9JIW|    1.6|http://ecx.images...|Start! Walking At...|Even for someone ...|
|B00C30FCUI|   1.49|http://ecx.images...|Symphonized NRG P...|, those have alwa...|
|B001196MG0|   1.47|http://ecx.images...|Savage 107X12-1 S...|but after a year ...|
|B00AR1G3FS|   1.24|http://ecx.images...|Farewell Live Fro...|While not quite a...|
|B007R3AZNK|   0.67|http://ecx.images...|Driving Towards T...|Until now, Sloe G...|
|0761165975|    1.0|http://ecx.images...|The Wedding Plann...|I considered t

In [26]:
online_retail_df = spark.read.csv("Online-Retail.csv", inferSchema=True, header=True)

In [27]:
online_retail_df.select([F.count(F.when(col(c).isNull(), c)).alias(c) for c in online_retail_df.columns]).show()

# Drop rows with nulls in important columns like CustomerID, Description, etc.
online_retail_df = online_retail_df.dropna(subset=["CustomerID", "Description", "InvoiceDate", "Quantity", "UnitPrice"])
#online_retail_df.select([F.count(F.when(col(c).isNull(), c)).alias(c) for c in online_retail_df.columns]).show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|       1454|       0|          0|        0|    135080|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [28]:
#drop duplicates
online_retail_df = online_retail_df.dropDuplicates()

In [29]:
#cast the invoice date to timestamp
online_retail_df = online_retail_df.withColumn("InvoiceDate", F.to_timestamp("InvoiceDate", "M/d/yyyy H:mm"))
online_retail_df = online_retail_df.withColumn("InvoiceDate", F.col("InvoiceDate").cast(TimestampType()))

In [30]:
#check for outliers
online_retail_df.filter((col("Quantity") < 0) | (col("UnitPrice") < 0)).show()

# Drop rows where Quantity or UnitPrice are negative (common outlier check)
online_retail_df = online_retail_df.filter((F.col("Quantity") >= 0) & (F.col("UnitPrice") >= 0))
#online_retail_df.filter((col("Quantity") < 0) | (col("UnitPrice") < 0)).show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|  C538350|   85099F|JUMBO BAG STRAWBERRY|      -2|2010-12-10 15:01:00|     1.65|     13798|United Kingdom|
|  C538375|    22220|CAKE STAND LOVEBI...|      -1|2010-12-12 11:19:00|     9.95|     17126|United Kingdom|
|  C539726|    22791|T-LIGHT GLASS FLU...|     -10|2010-12-21 14:24:00|     1.25|     17007|United Kingdom|
|  C540307|    22084|PAPER CHAIN KIT E...|     -36|2011-01-06 12:58:00|     2.95|     15823|United Kingdom|
|  C542138|    20866|BLUE ROSE FABRIC ...|    -120|2011-01-25 17:21:00|     1.06|     17368|United Kingdom|
|  C543347|    22629| SPACEBOY LUNCH BOX |      -1|2011-02-07 12:44:00|     1.95|     12472|       Germany|
|  C544830|    22059|CERAMIC

In [31]:
#standardize for consistent description characters
online_retail_df = online_retail_df.withColumn("Description", F.upper(F.col("Description")))

#handle special cases in case for better performance for MLlib
#online_retail_df = online_retail_df.filter(~F.col("InvoiceNo").startswith("C"))

In [32]:
online_retail_df.show()



+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|     13047|United Kingdom|
|   536368|    22960|JAM MAKING SET WI...|       6|2010-12-01 08:34:00|     4.25|     13047|United Kingdom|
|   536388|    22915|ASSORTED BOTTLE T...|      12|2010-12-01 09:59:00|     0.42|     16250|United Kingdom|
|   536401|    21464|DISCO BALL ROTATO...|       1|2010-12-01 11:21:00|     4.25|     15862|United Kingdom|
|   536412|    22569|FELTCRAFT CUSHION...|       2|2010-12-01 11:49:00|     3.75|     17920|United Kingdom|
|   536425|    22645|CERAMIC HEART FAI...|      12|2010-12-01 12:08:00|     1.45|     13758|United Kingdom|
|   536488|    22376|AIRLINE

                                                                                

In [33]:
#-----------------------------------
#Task 2: Sales Data Aggregation and Feature Engineering
#-----------------------------------
#total sales per product per month
#get the month and year from invoicedate
online_retail_df = online_retail_df.withColumn("Month", F.month("InvoiceDate")).withColumn("Year", F.year("InvoiceDate"))

#make a revenue column, total quantity * price
online_retail_df = online_retail_df.withColumn("Revenue", col("Quantity") * col("UnitPrice"))

#total sales per product and month. calculated by summing total quantity * price
total_sales_df = online_retail_df.groupBy("StockCode", "Description", "Month", "Year").agg(F.sum("Revenue").alias("TotalSales"))
total_sales_df.show()



+---------+--------------------+-----+----+------------------+
|StockCode|         Description|Month|Year|        TotalSales|
+---------+--------------------+-----+----+------------------+
|    22692|DOORMAT WELCOME T...|    4|2011|             512.7|
|    22384|LUNCH BAG PINK PO...|    1|2011|            902.15|
|    21221|SET/4 BADGES CUTE...|    2|2011|             43.75|
|    22114|HOT WATER BOTTLE ...|   12|2010|1863.0500000000002|
|    22236|CAKE STAND 3 TIER...|    1|2011|403.04999999999995|
|    85213|MINI  ZINC GARDEN...|    3|2011|44.199999999999996|
|    22624|IVORY KITCHEN SCALES|    3|2011|            779.45|
|    20914|SET/5 RED RETROSP...|    3|2011| 962.6500000000001|
|    21051|      RIBBONS PURSE |   12|2010|              33.6|
|    21989|PACK OF 20 SKULL ...|    1|2011|115.59999999999998|
|    21615|4 LAVENDER BOTANI...|    1|2011|             165.0|
|    22807|SET OF 6 T-LIGHTS...|   12|2010|141.60000000000002|
|    21888|           BINGO SET|    2|2011|            

                                                                                

In [34]:
#average revenue per customer
#get average revnue for each customer id
avg_revnue_df = online_retail_df.groupBy("CustomerID").agg(F.avg("Revenue").alias("AverageRevenue")).orderBy(F.desc("AverageRevenue"))
avg_revnue_df.show()



+----------+------------------+
|CustomerID|    AverageRevenue|
+----------+------------------+
|     12346|           77183.6|
|     16446|           56157.5|
|     15098|           13305.5|
|     15749|           4453.43|
|     15195|            3861.0|
|     13135|            3096.0|
|     17846|            2033.1|
|     18087|2027.8599999999997|
|     16532|1687.1999999999998|
|     16000|1377.0777777777776|
|     16754|            1001.2|
|     12755| 952.9874999999998|
|     18133| 931.4999999999999|
|     12798| 872.1299999999999|
|     17949|           835.864|
|     17553|             743.8|
|     15299| 643.8585714285715|
|     16308|             640.0|
|     16986|             624.4|
|     18080|            615.75|
+----------+------------------+
only showing top 20 rows



                                                                                

In [35]:
#seasonal patterns for top selling products
#get products by stock code and their highest total revnue as sum
top_products_df = online_retail_df.groupBy("StockCode").agg(F.sum("Revenue").alias("TotalRevenue")).orderBy(F.desc("TotalRevenue"))
    
#join with df to get montly data, group by product and month for each of their total revenue
seasonal_pattern = online_retail_df.join(top_products_df.limit(10), "StockCode").groupBy("StockCode", "Description", "Month").agg(F.sum("Revenue").alias("MonthlySales"))
seasonal_pattern.show()

                                                                                

+---------+--------------------+-----+------------------+
|StockCode|         Description|Month|      MonthlySales|
+---------+--------------------+-----+------------------+
|   85123A|WHITE HANGING HEA...|    4| 9581.650000000001|
|    84879|ASSORTED COLOUR B...|    1|2704.1899999999996|
|    23166|MEDIUM CERAMIC TO...|    9|            397.26|
|    47566|       PARTY BUNTING|    1|1815.1499999999999|
|        M|              MANUAL|    8|           2989.54|
|   85123A|WHITE HANGING HEA...|    2|4912.6500000000015|
|   85099B|JUMBO BAG RED RET...|   10| 9763.059999999998|
|    22423|REGENCY CAKESTAND...|    1|10765.499999999998|
|   85099B|JUMBO BAG RED RET...|    7| 5654.599999999999|
|     POST|             POSTAGE|   11|          10349.95|
|    47566|       PARTY BUNTING|   11|3715.7099999999996|
|   85123A|WHITE HANGING HEA...|   11|13849.929999999997|
|    47566|       PARTY BUNTING|    9| 4386.999999999999|
|     POST|             POSTAGE|    2|            3166.0|
|        M|   

In [42]:
#customer liftime value: total revenue per customer
clv_df = online_retail_df.groupBy("CustomerID", "StockCode").agg(F.sum("Revenue").alias("CustomerLifetimeValue"))
clv_df.show()



+----------+---------+---------------------+
|CustomerID|StockCode|CustomerLifetimeValue|
+----------+---------+---------------------+
|     15363|    22382|                 16.5|
|     17235|   85184C|   35.400000000000006|
|     17454|    21931|                 19.5|
|     13113|    22423|                700.8|
|     14498|    22457|                 11.8|
|     15059|    21175|   49.199999999999996|
|     13198|    20751|   25.200000000000003|
|     16609|    22969|                 34.8|
|     15719|    22411|                47.19|
|     16992|    22500|                 19.8|
|     12668|    23078|                 30.0|
|     14298|    22608|               157.32|
|     13081|    22132|   30.599999999999998|
|     15129|    22360|   35.400000000000006|
|     14156|    22113|                  7.5|
|     15529|    10002|   15.299999999999999|
|     13506|    84077|   13.919999999999998|
|     18116|    21381|                 5.07|
|     17406|    22795|                 13.5|
|     1786

                                                                                

In [37]:
#product popularity: counted by unique transaction made under each stock code
product_popularity = online_retail_df.groupBy("StockCode").agg(F.countDistinct("InvoiceNo").alias("PopularityScore"))
product_popularity.show()



+---------+---------------+
|StockCode|PopularityScore|
+---------+---------------+
|    21889|            449|
|    21259|            237|
|    22728|            613|
|    21452|            133|
|    21894|             71|
|    22121|            114|
|    21248|             52|
|    22254|             41|
|    21249|             79|
|    90143|              7|
|    22596|            234|
|    84881|              5|
|    23318|            329|
|    23459|             19|
|    21331|              7|
|   90210B|              6|
|    20868|             31|
|    23843|              1|
|    22314|             93|
|    21535|            310|
+---------+---------------+
only showing top 20 rows



                                                                                

In [38]:
#seasonal trends:
#make season column based on month
online_retail_df = online_retail_df.withColumn("Season",
                    F.when(col("Month").isin(12, 1, 2), "Winter") #if month is in one of these numbers
                    .when(col("Month").isin(3, 4, 5), "Spring")
                    .when(col("Month").isin(6, 7, 8), "Summer")
                    .when(col("Month").isin(9, 10, 11), "Fall")
                    )
    
#total revenue of each product and season
seasonal_trends = online_retail_df.groupBy("StockCode", "Season").agg(F.sum("Revenue").alias("SeasonalSales"))
seasonal_trends.show()



+---------+------+------------------+
|StockCode|Season|     SeasonalSales|
+---------+------+------------------+
|    21110|Winter|            1349.3|
|    22668|Winter|            1411.7|
|    22966|Winter|2459.8599999999997|
|   90002D|Winter|              30.0|
|    37446|Winter| 659.5999999999998|
|    22252|Spring|             97.94|
|    22760|Winter| 849.8999999999999|
|   84558A|Winter|268.45000000000005|
|    22149|Spring| 2487.700000000001|
|    22421|Winter|            159.48|
|    22301|Winter|1107.4500000000003|
|    22537|Spring|            190.26|
|    22107|Winter|            448.74|
|    21564|Spring|430.70000000000005|
|   84575A|Winter|              5.95|
|   84952C|Spring|              30.0|
|   15056P|Winter|             708.7|
|    22452|Winter|            520.25|
|    84818|Winter|             408.0|
|    21463|Spring|382.65000000000003|
+---------+------+------------------+
only showing top 20 rows



                                                                                

In [39]:
#write the aggregated data to the directory as csv files
#try:
#    total_sales_df.write.csv(task2_output_total, header=True)
#    avg_revnue_df.write.csv(task2_output_avgsales, header=True)
#    seasonal_pattern.write.csv(task2_output_season, header=True)
#except ValueError as e:
#    print(f"error {e}")

In [43]:
# Join features into a consolidated DataFrame
forecasting_df = total_sales_df.join(clv_df, "StockCode", "left") \
                               .join(product_popularity, "StockCode", "left") \
                               .join(seasonal_trends, "StockCode", "left")

# Fill nulls with 0 or appropriate values for ML training
forecasting_df = forecasting_df.fillna(0)

# Display the consolidated DataFrame structure
forecasting_df.show()


                                                                                

+---------+--------------------+-----+----+-----------------+----------+---------------------+---------------+------+-----------------+
|StockCode|         Description|Month|Year|       TotalSales|CustomerID|CustomerLifetimeValue|PopularityScore|Season|    SeasonalSales|
+---------+--------------------+-----+----+-----------------+----------+---------------------+---------------+------+-----------------+
|    20914|SET/5 RED RETROSP...|    3|2011|962.6500000000001|     14107|   17.700000000000003|            764|Spring|3220.250000000001|
|    20914|SET/5 RED RETROSP...|    3|2011|962.6500000000001|     14107|   17.700000000000003|            764|Summer|5201.250000000004|
|    20914|SET/5 RED RETROSP...|    3|2011|962.6500000000001|     14107|   17.700000000000003|            764|  Fall|4641.490000000001|
|    20914|SET/5 RED RETROSP...|    3|2011|962.6500000000001|     14107|   17.700000000000003|            764|Winter|4208.540000000003|
|    20914|SET/5 RED RETROSP...|    3|2011|962.6