In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Base table
sales = spark.table("main.silver.sales_partitioned")

# Window for time-based features
w = Window.partitionBy("customer_id").orderBy("order_date")

features_df = (
    sales
    .withColumn("order_ts", F.col("order_date").cast("timestamp"))
    .withColumn("day_of_week", F.dayofweek("order_date"))
    .withColumn("is_weekend", F.col("day_of_week").isin([1, 7]))
    .withColumn("hour", F.hour("order_ts"))
    .withColumn("amount_log", F.log(F.col("amount") + 1))
    .withColumn(
        "time_since_first_purchase",
        F.unix_timestamp("order_ts") -
        F.unix_timestamp(F.first("order_ts").over(w))
    )
)

# Save as ML-ready table
features_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.silver.sales_features")


Descriptive Statistics

In [0]:
spark.table("main.silver.sales_features") \
     .select("amount", "amount_log") \
     .describe() \
     .show()


+-------+--------------------+--------------------+
|summary|              amount|          amount_log|
+-------+--------------------+--------------------+
|  count|            10000000|            10000000|
|   mean|   499.8968656063042|   5.915524583957402|
| stddev|   288.6036901293912|  0.9757680879532915|
|    min|6.692588383483411E-5|6.692364439778755E-5|
|    max|   999.9998477423807|   6.908754627209695|
+-------+--------------------+--------------------+



In [0]:
spark.table("main.silver.sales_features") \
     .groupBy("is_weekend") \
     .agg(
         F.count("*").alias("orders"),
         F.avg("amount").alias("avg_amount")
     ) \
     .show()


+----------+-------+------------------+
|is_weekend| orders|        avg_amount|
+----------+-------+------------------+
|      true|2876960| 499.8911069889734|
|     false|7123040|499.89919148285685|
+----------+-------+------------------+



In [0]:
spark.table("main.silver.sales_features") \
     .stat.corr("amount", "time_since_first_purchase")


0.0004580052161373298

Statistical Analysis & ML Prep (Executed)

In [0]:
sales = spark.table("main.silver.sales_partitioned")


In [0]:
sales.describe("amount").show()


+-------+--------------------+
|summary|              amount|
+-------+--------------------+
|  count|            10000000|
|   mean|  499.89686560631293|
| stddev|   288.6036901293912|
|    min|6.692588383483411E-5|
|    max|   999.9998477423807|
+-------+--------------------+



Advanced stats (more useful than describe)

In [0]:
from pyspark.sql import functions as F

sales.select(
    F.mean("amount").alias("mean_amount"),
    F.stddev("amount").alias("std_amount"),
    F.min("amount").alias("min_amount"),
    F.max("amount").alias("max_amount"),
    F.expr("percentile(amount, 0.5)").alias("median_amount")
).show()


+------------------+-----------------+--------------------+-----------------+------------------+
|       mean_amount|       std_amount|          min_amount|       max_amount|     median_amount|
+------------------+-----------------+--------------------+-----------------+------------------+
|499.89686560630616|288.6036901293912|6.692588383483411E-5|999.9998477423807|499.87714913657385|
+------------------+-----------------+--------------------+-----------------+------------------+



Create weekday/weekend flag

In [0]:
sales_flagged = sales.withColumn(
    "is_weekend",
    F.dayofweek("order_date").isin([1, 7])
)


Compare metrics

In [0]:
sales_flagged.groupBy("is_weekend").agg(
    F.count("*").alias("orders"),
    F.avg("amount").alias("avg_amount"),
    F.sum("amount").alias("total_revenue")
).show()


+----------+-------+------------------+-------------------+
|is_weekend| orders|        avg_amount|      total_revenue|
+----------+-------+------------------+-------------------+
|      true|2876960|499.89110698896263|1.438166719162966E9|
|     false|7123040| 499.8991914828593|3.560801936900066E9|
+----------+-------+------------------+-------------------+



Identify Correlations
Numeric correlations

In [0]:
sales.stat.corr("amount", "product_id")


-3.503152685638964e-05

In [0]:
sales_corr = sales.withColumn(
    "day_of_week", F.dayofweek("order_date")
)

sales_corr.stat.corr("amount", "day_of_week")


0.00020067576463052224

Feature Engineering for ML (Final Output)
Create ML-ready feature table

In [0]:
from pyspark.sql.window import Window

w = Window.partitionBy("customer_id").orderBy("order_date")

features_df = (
    sales
    .withColumn("order_ts", F.col("order_date").cast("timestamp"))
    .withColumn("day_of_week", F.dayofweek("order_date"))
    .withColumn("is_weekend", F.col("day_of_week").isin([1, 7]))
    .withColumn("amount_log", F.log(F.col("amount") + 1))
    .withColumn(
        "time_since_first_purchase",
        F.unix_timestamp("order_ts") -
        F.unix_timestamp(F.first("order_ts").over(w))
    )
)


Save ML Feature Table

In [0]:
features_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.silver.sales_features")
