In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

spark = SparkSession.builder \
    .appName("CreditCardFraudDetection") \
    .getOrCreate()

df = spark.read.csv("creditcard.csv", header=True, inferSchema=True)

In [None]:
df = df.dropna()

feature_cols = [c for c in df.columns if c not in ['Class']]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_unscaled")
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=True)

pipeline = Pipeline(stages=[assembler, scaler])

In [None]:
df_transformed = pipeline.fit(df).transform(df)

final_df = df_transformed.select("features", col("Class").alias("label"))
pandas_df = final_df.toPandas()

In [None]:
import databricks.connect as db_connect
import mlflow.tracking._model_registry.utils
import numpy as np
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score

# Workaround to set the registry URI manually
mlflow.tracking._model_registry.utils._get_registry_uri_from_spark_session = lambda: "databricks-uc"

mlflow.login()

In [None]:
X = np.vstack(pandas_df["features"].values)
y = pandas_df["label"].values

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=42)


In [None]:
with mlflow.start_run():
    n_estimators = 100
    max_depth = 8
    random_state = 42


    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)


    clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=random_state)
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    report = classification_report(y_test, y_pred, output_dict=True)

    mlflow.log_metric("accuracy", acc)
    mlflow.log_metric("precision", report['1']['precision'])
    mlflow.log_metric("recall", report['1']['recall'])
    mlflow.log_metric("f1_score", report['1']['f1-score'])

    mlflow.sklearn.log_model(clf, "random_forest_model")

    print("Run logged under run_id:", mlflow.active_run().info.run_id)