In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier

from pyspark.ml import Pipeline


In [2]:
# Stop any existing SparkContext to clean up resources
from pyspark import SparkContext
import time

try:
    sc = SparkContext.getOrCreate()
    sc.stop()
    time.sleep(2)  # Wait for Spark to clean up
except Exception as e:
    print(f"No existing SparkContext to stop: {e}")

# Create a fresh SparkSession with proper configuration
spark = SparkSession.builder \
    .appName("Fraud Detection Modeling") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()

input_path = "../data/features/fraud_features_v2.parquet"
df_model = spark.read.parquet(input_path)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/14 09:35:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/14 09:35:08 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
26/01/14 09:35:08 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
26/01/14 09:35:08 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
26/01/14 09:35:08 WARN Utils: Service 'sparkDriver' could not bind on a random free port. You may check whether configuring an appropriate binding address.
26/01/14 09:35:08 WARN

No existing SparkContext to stop: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: bind(..) failed with error(-49): Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.



### Data Spliting (Time-based split)

Train: Jan â†’ May (80%)

Validation: June (15%)

Test: July (5%)

The dataset spans from January 1, 2023 to July 2, 2023.
To prevent temporal leakage, a time-based split was applied. Data from January to May was used for training, June for validation, and early July for final testing. This setup simulates a real-world fraud detection scenario where models are evaluated on future transactions.

In [3]:
train_df = df_model.filter(col("TX_DATE") < "2023-06-01")
val_df   = df_model.filter((col("TX_DATE") >= "2023-06-01") & (col("TX_DATE") < "2023-07-01"))
test_df  = df_model.filter(col("TX_DATE") >= "2023-07-01")


print("Train:", train_df.count())
print("Val:  ", val_df.count())
print("Test: ", test_df.count())


                                                                                

Train: 1447243
Val:   287840
Test:  19072


In [4]:
feature_cols = [
    "TX_AMOUNT",
    "LOG_TX_AMOUNT",
    "TX_TIME_SECONDS",
    "TX_TIME_DAYS",
    "TX_HOUR",
    "IS_NIGHT",
    "IS_WEEKEND"
]


In [5]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)


In [6]:
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withMean=True,
    withStd=True
)

lr = LogisticRegression(
    featuresCol="scaled_features",
    labelCol="TX_FRAUD"
)

pipeline = Pipeline(stages=[assembler, scaler, lr])
lr_model = pipeline.fit(train_df)
val_pred = lr_model.transform(val_df)
val_pred.select("TX_FRAUD", "probability", "prediction").show(5)


26/01/14 09:35:14 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

+--------+--------------------+----------+
|TX_FRAUD|         probability|prediction|
+--------+--------------------+----------+
|       0|[0.99998825603487...|       0.0|
|       0|[0.99999004502516...|       0.0|
|       1|[0.00146348402120...|       1.0|
|       1|[0.09028602988445...|       1.0|
|       0|[0.99993244944999...|       0.0|
+--------+--------------------+----------+
only showing top 5 rows


In [7]:

gbt = GBTClassifier(
    featuresCol="features",
    labelCol="TX_FRAUD",
)

gbt_pipeline = Pipeline(stages=[assembler, gbt])
gbt_model = gbt_pipeline.fit(train_df)
pred = gbt_model.transform(val_df)
pred.select("TX_FRAUD", "probability", "prediction").show(5)



+--------+--------------------+----------+
|TX_FRAUD|         probability|prediction|
+--------+--------------------+----------+
|       0|[0.95339405751833...|       0.0|
|       0|[0.95339405751833...|       0.0|
|       1|[0.04364652142729...|       1.0|
|       1|[0.04364652142729...|       1.0|
|       0|[0.95321574928095...|       0.0|
+--------+--------------------+----------+
only showing top 5 rows


In [8]:
rf= RandomForestClassifier(
    featuresCol="features",
    labelCol="TX_FRAUD",
)

rf_pipeline = Pipeline(stages=[assembler, rf])
rf_model = rf_pipeline.fit(train_df)

rf_pred = rf_model.transform(val_df)
rf_pred.select("TX_FRAUD", "probability", "prediction").show(5)


+--------+--------------------+----------+
|TX_FRAUD|         probability|prediction|
+--------+--------------------+----------+
|       0|[0.98640462558873...|       0.0|
|       0|[0.98640462558873...|       0.0|
|       1|           [0.0,1.0]|       1.0|
|       1|           [0.0,1.0]|       1.0|
|       0|[0.98640462558873...|       0.0|
+--------+--------------------+----------+
only showing top 5 rows


In [10]:
# Save models and predictions for evaluation notebook
lr_model.write().overwrite().save("../models/lr_baseline_model")
gbt_model.write().overwrite().save("../models/gbt_baseline_model")
rf_model.write().overwrite().save("../models/rf_baseline_model")
# Save predictions (optional, can also recreate them in evaluation notebook)
val_pred.write.mode('overwrite').parquet("../data/predictions/lr_predictions.parquet")
pred.write.mode('overwrite').parquet("../data/predictions/gbt_predictions.parquet")
rf_pred.write.mode('overwrite').parquet("../data/predictions/rf_predictions.parquet")

print("Models and predictions saved successfully!")

spark.stop()


Models and predictions saved successfully!
