In [0]:
from pyspark.sql import SparkSession
df = spark.read.csv("/FileStore/tables/raw_transactions.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("raw_transactions")


In [0]:
df.display()

transaction_id,customer_id,transaction_date,transaction_amount,payment_method,product_category
T001,C001,2024-11-10,100.0,Credit Card,Electronics
T002,C002,2024-11-12,50.0,PayPal,Clothing
T003,C001,2024-11-13,75.0,Credit Card,Books
T004,C003,2024-11-14,120.0,Debit Card,Electronics
T005,C002,2024-11-15,60.0,PayPal,Beauty
T006,C003,2024-11-16,30.0,UPI,Clothing
T007,C001,2024-11-17,90.0,Credit Card,Home
T008,C004,2024-11-18,110.0,Credit Card,Electronics
T009,C005,2024-11-19,95.0,Debit Card,Books
T010,C002,2024-11-20,85.0,UPI,Clothing


In [0]:
from pyspark.sql.functions import avg, count, max

features_df = df.groupBy("customer_id").agg(
    count("*").alias("total_transactions"),
    avg("transaction_amount").alias("avg_transaction_amount"),
    max("transaction_date").alias("last_transaction_date")
)

In [0]:
features_df.display()

customer_id,total_transactions,avg_transaction_amount,last_transaction_date
C003,2,75.0,2024-11-16
C004,1,110.0,2024-11-18
C005,1,95.0,2024-11-19
C001,3,88.33333333333333,2024-11-17
C002,3,65.0,2024-11-20


In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS feature_store")

Out[5]: DataFrame[]

In [0]:
features_df.write.format("delta").mode("overwrite").saveAsTable("feature_store_customer_features")
# Step 1: Load Features + Labels
from pyspark.sql import SparkSession

labels_df = spark.createDataFrame([
    ("C001", 1),
    ("C002", 0),
    ("C003", 1),
    ("C004", 0),
    ("C005", 1)
], ["customer_id", "label"])

features_df = spark.read.table("feature_store_customer_features")

training_df = labels_df.join(features_df, on="customer_id", how="inner")
training_df.show()

+-----------+-----+------------------+----------------------+---------------------+
|customer_id|label|total_transactions|avg_transaction_amount|last_transaction_date|
+-----------+-----+------------------+----------------------+---------------------+
|       C001|    1|                 3|     88.33333333333333|           2024-11-17|
|       C002|    0|                 3|                  65.0|           2024-11-20|
|       C003|    1|                 2|                  75.0|           2024-11-16|
|       C004|    0|                 1|                 110.0|           2024-11-18|
|       C005|    1|                 1|                  95.0|           2024-11-19|
+-----------+-----+------------------+----------------------+---------------------+



In [0]:
#Step 2: Prepare Data for ML
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

feature_cols = ["total_transactions", "avg_transaction_amount"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_df = assembler.transform(training_df)

final_df = assembled_df.select("features", "label")


In [0]:
# Step 3: Train a Model
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=42)
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

In [0]:
test_data.display()
train_data.display()

features,label
"Map(vectorType -> dense, length -> 2, values -> List(3.0, 88.33333333333333))",1


features,label
"Map(vectorType -> dense, length -> 2, values -> List(3.0, 65.0))",0
"Map(vectorType -> dense, length -> 2, values -> List(2.0, 75.0))",1
"Map(vectorType -> dense, length -> 2, values -> List(1.0, 110.0))",0
"Map(vectorType -> dense, length -> 2, values -> List(1.0, 95.0))",1


In [0]:
# Step 4: Evaluate Model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.2f}")

Test AUC: 1.00
