In [0]:
# Databricks Notebook: Fraud_Detection_Baseline_Serverless.py

# COMMAND ----------
# MAGIC %md
# MAGIC # Fraud Detection Baseline (Serverless-Compatible)
# MAGIC This notebook builds a fraud detection baseline using Pandas + scikit-learn, suitable for Databricks Serverless.

# COMMAND ----------
# 1. Setup and Imports

import pyspark.sql.functions as F
from pyspark.sql.functions import expr, col, regexp_replace, to_date, year, current_date, avg
from pyspark.sql import SparkSession

import pandas as pd
import numpy as np

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    precision_score, recall_score, f1_score, roc_auc_score,
    classification_report, confusion_matrix
)

import mlflow
import mlflow.sklearn

# Initialize SparkSession
spark = SparkSession.builder.appName("ProviderFraudDetectionServerless").getOrCreate()

# Set MLflow experiment and registry
MLFLOW_EXPERIMENT_NAME = "/Shared/Provider_Fraud_Detection_Graph_Embeddings"
mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)
mlflow.set_registry_uri("databricks-uc")

LABEL_COL = "PotentialFraud"

print(f"MLflow Experiment: {MLFLOW_EXPERIMENT_NAME}")
print(f"MLflow Registry URI: {mlflow.get_registry_uri()}")

# COMMAND ----------
# 2. Load Data

print("Loading Databricks tables...")
df_providers_train = spark.read.table("workspace.default.provider_train")
df_beneficiary_train = spark.read.table("workspace.default.beneficiary_train")
df_inpatient_train = spark.read.table("workspace.default.inpatient_claim_train")
df_outpatient_train = spark.read.table("workspace.default.outpatient_claim_train")
print("Tables loaded.")

# COMMAND ----------
# 3. Clean and Parse DOB

print("Cleaning DOB column...")
# Attempt to convert MM/dd/yy to yyyy-MM-dd by inferring 20th or 21st century
# This assumes any 2-digit year >= 30 is from 1900s, otherwise 2000s
century_udf = F.udf(lambda x: f"19{x}" if int(x) >= 30 else f"20{x}")
df_beneficiary_train = df_beneficiary_train.withColumn(
    "DOB_fixed",
    F.when(F.col("DOB").rlike("^\\d{1,2}/\\d{1,2}/\\d{2}$"),
           F.concat_ws("-",
               century_udf(F.regexp_extract("DOB", r"\\d{1,2}/\\d{1,2}/(\\d{2})", 1)),
               F.lpad(F.regexp_extract("DOB", r"(\\d{1,2})/\\d{1,2}/\\d{2}", 1), 2, '0'),
               F.lpad(F.regexp_extract("DOB", r"\\d{1,2}/(\\d{1,2})/\\d{2}", 1), 2, '0')
           )
       ).otherwise(F.col("DOB"))
)

# Parse to date
df_beneficiary_train = df_beneficiary_train.withColumn(
    "DOB_clean",
    to_date("DOB_fixed", "yyyy-MM-dd")
)

# Filter invalid or unrealistic DOBs
print("Filtering invalid DOBs...")
df_beneficiary_train = df_beneficiary_train.filter(
    (col("DOB_clean").isNotNull()) &
    (year("DOB_clean") <= year(current_date()) - 18) &
    (year("DOB_clean") >= 1900)
)

# COMMAND ----------
# 4. Feature Engineering

print("Aggregating inpatient claims...")
df_inpatient_agg = df_inpatient_train.groupBy("ProviderID").agg(
    F.count("ClaimID").alias("Inpatient_ClaimCount"),
    F.sum("InscClaimAmtReimbursed").alias("Inpatient_TotalClaimAmt"),
    F.avg("InscClaimAmtReimbursed").alias("Inpatient_AvgClaimAmt"),
    F.countDistinct("BeneID").alias("Inpatient_UniquePatients_Claim"),
    F.countDistinct("AttendingPhysician").alias("Inpatient_UniqueAttendingPhysicians"),
    F.countDistinct("OperatingPhysician").alias("Inpatient_UniqueOperatingPhysicians"),
    F.countDistinct("OtherPhysician").alias("Inpatient_UniqueOtherPhysicians"),
    F.avg(F.datediff("DischargeDt", "AdmissionDt")).alias("Inpatient_AvgAdmissionDuration"),
    F.sum("DeductibleAmtPaid").alias("Inpatient_TotalDeductiblePaid")
)

print("Aggregating outpatient claims...")
df_outpatient_agg = df_outpatient_train.groupBy("ProviderID").agg(
    F.count("ClaimID").alias("Outpatient_ClaimCount"),
    F.sum("InscClaimAmtReimbursed").alias("Outpatient_TotalClaimAmt"),
    F.avg("InscClaimAmtReimbursed").alias("Outpatient_AvgClaimAmt"),
    F.countDistinct("BeneID").alias("Outpatient_UniquePatients_Claim"),
    F.countDistinct("AttendingPhysician").alias("Outpatient_UniqueAttendingPhysicians"),
    F.countDistinct("OperatingPhysician").alias("Outpatient_UniqueOperatingPhysicians"),
    F.countDistinct("OtherPhysician").alias("Outpatient_UniqueOtherPhysicians")
)

print("Aggregating beneficiary demographics...")
df_all_claims_bene_link = df_inpatient_train.select("ProviderID", "BeneID").unionAll(
    df_outpatient_train.select("ProviderID", "BeneID")
).distinct()

df_bene_agg = df_all_claims_bene_link.join(df_beneficiary_train, "BeneID", "left").groupBy("ProviderID").agg(
    F.countDistinct("BeneID").alias("Total_UniquePatients"),
    avg(year(current_date()) - year(col("DOB_clean"))).alias("Avg_Patient_Age"),
    F.mean("Gender").alias("Avg_Bene_Gender"),
    F.mean("Race").alias("Avg_Bene_Race"),
    F.mean("RenalDiseaseIndicator").alias("Avg_Bene_RenalDiseaseIndicator"),
    F.mean("NoOfMonths_PartACov").alias("Avg_Bene_PartACovMonths"),
    F.mean("NoOfMonths_PartBCov").alias("Avg_Bene_PartBCovMonths"),
    *[F.avg(f"ChronicCond_{cond}").alias(f"Avg_ChronicCond_{cond}") for cond in [
        "Alzheimer", "Heartfailure", "KidneyDisease", "Cancer", "ObstrPulmonary",
        "Depression", "Diabetes", "IschemicHeart", "Osteoporasis", "rheumatoidarthritis"]]
)

print("Joining features into final dataset...")
df_provider_features_baseline = df_providers_train.select("ProviderID", LABEL_COL) \
    .join(df_inpatient_agg, "ProviderID", "left") \
    .join(df_outpatient_agg, "ProviderID", "left") \
    .join(df_bene_agg, "ProviderID", "left")

numerical_cols = [c for c in df_provider_features_baseline.columns if c not in ["ProviderID", LABEL_COL]]
for c in numerical_cols:
    df_provider_features_baseline = df_provider_features_baseline.withColumn(
        c, F.when(F.col(c).isNull(), 0).otherwise(F.col(c))
    )

print(f"Final feature count: {len(numerical_cols)}")

# COMMAND ----------
# 5. Train Baseline Model with scikit-learn

print("Training RandomForest with scikit-learn...")
pdf = df_provider_features_baseline.toPandas()

X = pdf[numerical_cols]
y = pdf[LABEL_COL]

X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3, random_state=42)

clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)
y_proba = clf.predict_proba(X_test)[:, 1]

precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_proba)

print("Classification Report:")
print(classification_report(y_test, y_pred))
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))

# COMMAND ----------
# 6. Log to MLflow

print("Logging metrics to MLflow...")
with mlflow.start_run(run_name="Baseline_Tabular_Model_Sklearn"):
    mlflow.log_param("model_type", "RandomForestClassifier (sklearn)")
    mlflow.log_param("features", "tabular_only")
    mlflow.log_param("num_trees", 100)

    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f1_score", f1)
    mlflow.log_metric("auc_roc", auc)

    mlflow.sklearn.log_model(clf, "baseline_fraud_model_sklearn")
    dbutils.fs.put("/tmp/baseline_precision.txt", str(precision), True)

print("Baseline experiment logged successfully.")

# COMMAND ----------
# 7. Save Features to Delta Table

print("Saving baseline features to Delta table for next notebook...")
df_provider_features_baseline.write.mode("overwrite").saveAsTable("workspace.default.temp_provider_features_for_augmented_run")

print("\u2705 Baseline features saved.")
print("\ud83c\udfd1 Notebook complete. Proceed to the graph-based experiment in Notebook 2.")
