In [0]:
from pyspark.sql import functions as F


df_raw = spark.table("project.customer_behaviour")


df = df_raw.toDF(
    "customer_id",
    "age",
    "gender",
    "item_purchased",
    "category",
    "purchase_amount_usd",
    "location",
    "season",
    "review_rating",
    "shipping_type",
    "previous_purchases",
    "payment_method"
)

df.show(5)


+-----------+---+------+--------------+--------+-------------------+-------------+------+-------------+-------------+------------------+--------------+
|customer_id|age|gender|item_purchased|category|purchase_amount_usd|     location|season|review_rating|shipping_type|previous_purchases|payment_method|
+-----------+---+------+--------------+--------+-------------------+-------------+------+-------------+-------------+------------------+--------------+
|          1| 55|  Male|        Blouse|Clothing|                 53|     Kentucky|Winter|          3.1|      Express|                14|         Venmo|
|          2| 19|  Male|       Sweater|Clothing|                 64|        Maine|Winter|          3.1|      Express|                 2|          Cash|
|          3| 50|  Male|         Jeans|Clothing|                 73|Massachusetts|Spring|          3.1|Free Shipping|                23|   Credit Card|
|          4| 21|  Male|       Sandals|Footwear|                 90| Rhode Island|Spring

In [0]:
df = (
    df.withColumn("customer_id", F.col("customer_id").cast("int"))
      .withColumn("age", F.col("age").cast("int"))
      .withColumn("purchase_amount_usd", F.col("purchase_amount_usd").cast("double"))
      .withColumn("review_rating", F.col("review_rating").cast("double"))
      .withColumn("previous_purchases", F.col("previous_purchases").cast("int"))
)

df.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- item_purchased: string (nullable = true)
 |-- category: string (nullable = true)
 |-- purchase_amount_usd: double (nullable = true)
 |-- location: string (nullable = true)
 |-- season: string (nullable = true)
 |-- review_rating: double (nullable = true)
 |-- shipping_type: string (nullable = true)
 |-- previous_purchases: integer (nullable = true)
 |-- payment_method: string (nullable = true)



In [0]:
df = df.withColumn(
    "age_group",
    F.when(F.col("age") < 30, "18-29")
     .when(F.col("age") < 40, "30-39")
     .when(F.col("age") < 50, "40-49")
     .when(F.col("age") < 60, "50-59")
     .otherwise("60+")
)

df.select("customer_id", "age", "age_group").show(10)


+-----------+---+---------+
|customer_id|age|age_group|
+-----------+---+---------+
|          1| 55|    50-59|
|          2| 19|    18-29|
|          3| 50|    50-59|
|          4| 21|    18-29|
|          5| 45|    40-49|
|          6| 46|    40-49|
|          7| 63|      60+|
|          8| 27|    18-29|
|          9| 26|    18-29|
|         10| 57|    50-59|
+-----------+---+---------+
only showing top 10 rows


In [0]:
df.write.mode("overwrite").saveAsTable("project.customer_behaviour_final")


In [0]:
df.groupBy("category").agg(
    F.count("*").alias("num_purchases"),
    F.avg("purchase_amount_usd").alias("avg_purchase")
).orderBy(F.col("num_purchases").desc()).show()


+-----------+-------------+------------------+
|   category|num_purchases|      avg_purchase|
+-----------+-------------+------------------+
|   Clothing|         1737|60.025331030512376|
|Accessories|         1240| 59.83870967741935|
|   Footwear|          599| 60.25542570951586|
|  Outerwear|          324| 57.17283950617284|
+-----------+-------------+------------------+



In [0]:
df_final = spark.table("project.customer_behaviour_final")
df_final.show(5)


+-----------+---+------+--------------+--------+-------------------+-------------+------+-------------+-------------+------------------+--------------+---------+
|customer_id|age|gender|item_purchased|category|purchase_amount_usd|     location|season|review_rating|shipping_type|previous_purchases|payment_method|age_group|
+-----------+---+------+--------------+--------+-------------------+-------------+------+-------------+-------------+------------------+--------------+---------+
|          1| 55|  Male|        Blouse|Clothing|               53.0|     Kentucky|Winter|          3.1|      Express|                14|         Venmo|    50-59|
|          2| 19|  Male|       Sweater|Clothing|               64.0|        Maine|Winter|          3.1|      Express|                 2|          Cash|    18-29|
|          3| 50|  Male|         Jeans|Clothing|               73.0|Massachusetts|Spring|          3.1|Free Shipping|                23|   Credit Card|    50-59|
|          4| 21|  Male|    

In [0]:
df_final.describe().show()
df_final.groupBy("season").count().show()
df_final.groupBy("age_group").avg("purchase_amount_usd").show()


+-------+------------------+-----------------+------+--------------+-----------+-------------------+--------+------+------------------+--------------+------------------+--------------+---------+
|summary|       customer_id|              age|gender|item_purchased|   category|purchase_amount_usd|location|season|     review_rating| shipping_type|previous_purchases|payment_method|age_group|
+-------+------------------+-----------------+------+--------------+-----------+-------------------+--------+------+------------------+--------------+------------------+--------------+---------+
|  count|              3900|             3900|  3900|          3900|       3900|               3900|    3900|  3900|              3900|          3900|              3900|          3900|     3900|
|   mean|            1950.5|44.06846153846154|  NULL|          NULL|       NULL|  59.76435897435898|    NULL|  NULL| 3.749948717948712|          NULL| 25.35153846153846|          NULL|     NULL|
| stddev|1125.97735323584

In [0]:
df_final.groupBy("item_purchased").count().orderBy("count", ascending=False).show(10)


+--------------+-----+
|item_purchased|count|
+--------------+-----+
|         Pants|  171|
|        Blouse|  171|
|       Jewelry|  171|
|         Shirt|  169|
|         Dress|  166|
|       Sweater|  164|
|        Jacket|  163|
|    Sunglasses|  161|
|          Belt|  161|
|          Coat|  161|
+--------------+-----+
only showing top 10 rows


In [0]:
df_final.groupBy("category").avg("review_rating").orderBy("avg(review_rating)", ascending=False).show()


+-----------+------------------+
|   category|avg(review_rating)|
+-----------+------------------+
|   Footwear|3.7906510851419055|
|Accessories|3.7686290322580676|
|  Outerwear| 3.746913580246914|
|   Clothing|3.7231433506044884|
+-----------+------------------+



In [0]:
df_final.stat.corr("age", "purchase_amount_usd")


-0.010423647378686559

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator


In [0]:
df_ml = spark.table("project.customer_behaviour_final")

df_ml.select(
    "customer_id", 
    "age", 
    "review_rating", 
    "previous_purchases", 
    "purchase_amount_usd"
).show(5)


+-----------+---+-------------+------------------+-------------------+
|customer_id|age|review_rating|previous_purchases|purchase_amount_usd|
+-----------+---+-------------+------------------+-------------------+
|          1| 55|          3.1|                14|               53.0|
|          2| 19|          3.1|                 2|               64.0|
|          3| 50|          3.1|                23|               73.0|
|          4| 21|          3.5|                49|               90.0|
|          5| 45|          2.7|                31|               49.0|
+-----------+---+-------------+------------------+-------------------+
only showing top 5 rows


In [0]:
# Select only numeric features + label
df_model = df_ml.select(
    "age",
    "review_rating",
    "previous_purchases",
    "purchase_amount_usd"
)

df_model.printSchema()
df_model.show(5)


root
 |-- age: integer (nullable = true)
 |-- review_rating: double (nullable = true)
 |-- previous_purchases: integer (nullable = true)
 |-- purchase_amount_usd: double (nullable = true)

+---+-------------+------------------+-------------------+
|age|review_rating|previous_purchases|purchase_amount_usd|
+---+-------------+------------------+-------------------+
| 55|          3.1|                14|               53.0|
| 19|          3.1|                 2|               64.0|
| 50|          3.1|                23|               73.0|
| 21|          3.5|                49|               90.0|
| 45|          2.7|                31|               49.0|
+---+-------------+------------------+-------------------+
only showing top 5 rows


In [0]:
feature_cols = ["age", "review_rating", "previous_purchases"]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

df_assembled = assembler.transform(df_model) \
    .select("features", "purchase_amount_usd")

df_assembled.show(5, truncate=False)


+---------------+-------------------+
|features       |purchase_amount_usd|
+---------------+-------------------+
|[55.0,3.1,14.0]|53.0               |
|[19.0,3.1,2.0] |64.0               |
|[50.0,3.1,23.0]|73.0               |
|[21.0,3.5,49.0]|90.0               |
|[45.0,2.7,31.0]|49.0               |
+---------------+-------------------+
only showing top 5 rows


In [0]:
train_df, test_df = df_assembled.randomSplit([0.8, 0.2], seed=42)

print(f"Training rows: {train_df.count()}, Test rows: {test_df.count()}")


Training rows: 3177, Test rows: 723


In [0]:

lr = LinearRegression(
    featuresCol="features",
    labelCol="purchase_amount_usd",
    predictionCol="prediction"
)


lr_model = lr.fit(train_df)

print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)


Coefficients: [-0.011641482307935012,1.0037846958164585,0.02035850551562408]
Intercept: 56.18078273881139


In [0]:

predictions = lr_model.transform(test_df)

predictions.select(
    "features",
    "purchase_amount_usd",
    "prediction"
).show(10, truncate=False)


evaluator_rmse = RegressionEvaluator(
    labelCol="purchase_amount_usd",
    predictionCol="prediction",
    metricName="rmse"
)

evaluator_r2 = RegressionEvaluator(
    labelCol="purchase_amount_usd",
    predictionCol="prediction",
    metricName="r2"
)

rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"RMSE: {rmse:.2f}")
print(f"R²: {r2:.3f}")


+---------------+-------------------+------------------+
|features       |purchase_amount_usd|prediction        |
+---------------+-------------------+------------------+
|[18.0,2.6,17.0]|82.0               |58.92717086015696 |
|[18.0,2.7,22.0]|43.0               |59.129341857316724|
|[18.0,2.9,11.0]|79.0               |59.10615523580815 |
|[18.0,2.9,32.0]|58.0               |59.533683851636255|
|[18.0,3.1,21.0]|62.0               |59.51049723012768 |
|[18.0,3.4,1.0] |71.0               |59.40446252856014 |
|[18.0,3.6,14.0]|25.0               |59.86988003942655 |
|[18.0,3.9,16.0]|77.0               |60.21173245920273 |
|[18.0,4.2,32.0]|25.0               |60.83860395619765 |
|[18.0,4.2,33.0]|32.0               |60.858962461713276|
+---------------+-------------------+------------------+
only showing top 10 rows
RMSE: 23.79
R²: -0.001


In [0]:
model_path = "/Volumes/workspace/project/model_store/customer_spend_lr_model"

lr_model.write().overwrite().save(model_path)
