In [0]:
from pyspark.ml.feature import VectorAssembler
from sklearn.ensemble import IsolationForest
from pyspark.ml.clustering import BisectingKMeans  # Use IsolationForest if available
from pyspark.sql.functions import col, when, lit, current_timestamp, array

transactions = spark.table("silver.transactions_enriched")

# Aggregate features per client
features_df = (
    transactions.groupBy("client_id")
    .agg(
        {"transaction_amount": "avg", "transaction_id": "count", "rolling_txn_count_30d": "max", "rolling_txn_sum_30d": "max"}
    )
    .withColumnRenamed("avg(transaction_amount)", "avg_txn_amount")
    .withColumnRenamed("count(transaction_id)", "txn_count")
    .withColumnRenamed("max(rolling_txn_count_30d)", "max_rw_count")
    .withColumnRenamed("max(rolling_txn_sum_30d)", "max_rw_sum")
)

# 3. Convert to pandas for scikit-learn
features_pd = features_df.toPandas()
features_pd = features_pd.fillna(0)  # Handle any nulls

# 4. Train Isolation Forest
model = IsolationForest(contamination=0.05, random_state=42)
features = features_pd[["avg_txn_amount", "txn_count", "max_rw_count", "max_rw_sum"]]
features_pd["ml_risk_score"] = model.fit_predict(features)

# Convert sklearn output: -1 = anomaly, 1 = normal
features_pd["ml_risk_score"] = features_pd["ml_risk_score"].map({1: 0, -1: 1})
features_pd["ml_anomaly_score"] = -model.decision_function(features)

# 5. Convert back to Spark
scored_df = spark.createDataFrame(features_pd[["client_id", "ml_risk_score", "ml_anomaly_score"]]).withColumnRenamed("ml_risk_score", "output_value").withColumnRenamed("ml_anomaly_score", "confidence_level")
scored_df = scored_df.withColumn("model_name", lit("Isolation forest")).withColumn("output_type", lit("anomaly_score")).withColumn("evaluated_at", current_timestamp())

scored_df.write.mode("overwrite").saveAsTable("silver.model_outputs")