In [1]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
import time

In [2]:
raw = spark.read.csv(
    "gs://fraud_detection_dataset_1/Synthetic_Financial_datasets_log.csv",
    header=True, inferSchema=True
)

                                                                                

In [3]:
df = (raw
      .drop("nameOrig", "nameDest", "isFlaggedFraud", "step")
      .withColumn("deltaOrig",           col("oldbalanceOrg")  - col("newbalanceOrig"))
      .withColumn("deltaDest",           col("newbalanceDest") - col("oldbalanceDest"))
      .withColumn("amount_to_orig_ratio", col("amount")/(col("oldbalanceOrg")+1))
      .withColumn("amount_to_dest_ratio", col("amount")/(col("oldbalanceDest")+1))
     )
si_model    = StringIndexer(inputCol="type", outputCol="type_idx", handleInvalid="keep").fit(df)
df_indexed  = si_model.transform(df)
ohe_model   = OneHotEncoder(inputCols=["type_idx"], outputCols=["type_vec"]).fit(df_indexed)
df_encoded  = ohe_model.transform(df_indexed)

                                                                                

In [4]:
full   = df_encoded.stat.sampleBy("isFraud", {0:1.0, 1:1.0}, seed=2025)
train  = full.stat.sampleBy("isFraud", {0:0.8, 1:0.8}, seed=2025)
total  = train.count()
frauds = train.filter(col("isFraud")==1).count()
w      = 1.0/(frauds/total)
train  = train.withColumn("weight", when(col("isFraud")==1, w).otherwise(1.0))
feature_cols = [
    'amount','oldbalanceOrg','newbalanceOrig',
    'oldbalanceDest','newbalanceDest',
    'deltaOrig','deltaDest',
    'amount_to_orig_ratio','amount_to_dest_ratio',
    'type_vec'
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures")
scaler    = StandardScaler(inputCol="rawFeatures", outputCol="features", withStd=True, withMean=False)
rf        = RandomForestClassifier(labelCol="isFraud", featuresCol="features",
                                   weightCol="weight", numTrees=100, maxDepth=8, seed=42)

pipeline = Pipeline(stages=[assembler, scaler, rf])

                                                                                

In [5]:
start = time.time()
model = pipeline.fit(train)
print(f"100% sample — TRAIN TIME: {time.time() - start:.2f} sec")
model_path = "gs://fraud_detection_dataset_1/rf_100pct_model"
model.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

25/05/04 00:36:32 WARN DAGScheduler: Broadcasting large task binary with size 1294.2 KiB
                                                                                

100% sample — TRAIN TIME: 382.13 sec


                                                                                

Model saved to: gs://fraud_detection_dataset_1/rf_100pct_model


In [6]:
from pyspark.sql.functions import col
from pyspark.ml import PipelineModel
from pyspark.ml.functions import vector_to_array
from pyspark.mllib.evaluation import MulticlassMetrics
model = PipelineModel.load("gs://fraud_detection_dataset_1/rf_100pct_model")
raw = spark.read.csv("gs://fraud_detection_dataset_1/Synthetic_Financial_datasets_log.csv",header=True, inferSchema=True)
df = (raw
      .drop("nameOrig", "nameDest", "isFlaggedFraud", "step")
      .withColumn("deltaOrig",           col("oldbalanceOrg")  - col("newbalanceOrig"))
      .withColumn("deltaDest",           col("newbalanceDest") - col("oldbalanceDest"))
      .withColumn("amount_to_orig_ratio", col("amount")/(col("oldbalanceOrg")+1))
      .withColumn("amount_to_dest_ratio", col("amount")/(col("oldbalanceDest")+1))
)
si_model   = StringIndexer(inputCol="type", outputCol="type_idx", handleInvalid="keep").fit(df)
df_indexed = si_model.transform(df)
ohe_model  = OneHotEncoder(inputCols=["type_idx"], outputCols=["type_vec"]).fit(df_indexed)
df_encoded = ohe_model.transform(df_indexed)

full  = df_encoded.stat.sampleBy("isFraud", {0:1.0, 1:1.0}, seed=2025)
train = full.stat.sampleBy("isFraud", {0:0.8, 1:0.8}, seed=2025)
test  = full.subtract(train)
preds = (model.transform(test)
               .withColumn("prob_arr", vector_to_array(col("probability")))
               .withColumn("pred_label",
                           when(col("prob_arr")[1] > 0.5, 1).otherwise(0))
)
pred_and_lbl = preds.select("pred_label","isFraud") \
                    .rdd.map(lambda r: (float(r[0]), float(r[1])))
metrics = MulticlassMetrics(pred_and_lbl)

print(f"Accuracy: {metrics.accuracy:.4f}")
print("=== Confusion Matrix ===")
print(metrics.confusionMatrix().toArray())
labels = sorted(pred_and_lbl.map(lambda x: x[1]).distinct().collect())
support = dict(test.groupBy("isFraud").count().rdd.map(lambda r: (float(r[0]), r[1])).collect())

print("\n=== Classification Report ===")
for lbl in labels:
    print(f"Class {int(lbl)} — precision: {metrics.precision(lbl):.4f}, "
          f"recall: {metrics.recall(lbl):.4f}, "
          f"f1-score: {metrics.fMeasure(lbl):.4f}, "
          f"support: {support.get(lbl,0)}")

25/05/04 00:40:08 WARN DAGScheduler: Broadcasting large task binary with size 1106.5 KiB
25/05/04 00:40:13 WARN DAGScheduler: Broadcasting large task binary with size 1119.1 KiB
25/05/04 00:40:28 WARN DAGScheduler: Broadcasting large task binary with size 1111.1 KiB


Accuracy: 0.9999
=== Confusion Matrix ===
[[1.235633e+06 1.250000e+02]
 [7.000000e+00 1.658000e+03]]





=== Classification Report ===
Class 0 — precision: 1.0000, recall: 0.9999, f1-score: 0.9999, support: 1237080
Class 1 — precision: 0.9299, recall: 0.9958, f1-score: 0.9617, support: 1672


                                                                                

In [7]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder,
    VectorAssembler, StandardScaler
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.functions import vector_to_array
from pyspark.mllib.evaluation import MulticlassMetrics
import time

# 1) Load & feature‑engineer
raw = spark.read.csv(
    "gs://fraud_detection_dataset_1/Synthetic_Financial_datasets_log.csv",
    header=True, inferSchema=True
)
df = (raw
      .drop("nameOrig","nameDest","isFlaggedFraud","step")
      .withColumn("deltaOrig", col("oldbalanceOrg")  - col("newbalanceOrig"))
      .withColumn("deltaDest", col("newbalanceDest") - col("oldbalanceDest"))
      .withColumn("amount_to_orig_ratio", col("amount")/(col("oldbalanceOrg")+1))
      .withColumn("amount_to_dest_ratio",   col("amount")/(col("oldbalanceDest")+1))
)

# 2) Encode & assemble
si_model    = StringIndexer(inputCol="type", outputCol="type_idx", handleInvalid="keep").fit(df)
df_indexed  = si_model.transform(df)
ohe_model   = OneHotEncoder(inputCols=["type_idx"], outputCols=["type_vec"]).fit(df_indexed)
df_encoded  = ohe_model.transform(df_indexed)

# 3) **Stratified 75% sample**, then 80/20 split
sample75 = df_encoded.stat.sampleBy("isFraud", {0:0.75, 1:0.75}, seed=2025)
train75  = sample75.stat.sampleBy("isFraud", {0:0.8, 1:0.8}, seed=2025)
test75   = sample75.subtract(train75)

# 4) Compute class weights on TRAIN75
total75  = train75.count()
frauds75 = train75.filter(col("isFraud")==1).count()
w75      = 1.0/(frauds75/total75)
train75  = train75.withColumn("weight", when(col("isFraud")==1, w75).otherwise(1.0))

# 5) Build pipeline
feature_cols = [
    'amount','oldbalanceOrg','newbalanceOrig',
    'oldbalanceDest','newbalanceDest',
    'deltaOrig','deltaDest',
    'amount_to_orig_ratio','amount_to_dest_ratio',
    'type_vec'
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures")
scaler    = StandardScaler(inputCol="rawFeatures", outputCol="features",
                            withStd=True, withMean=False)
rf75      = RandomForestClassifier(
    labelCol="isFraud", featuresCol="features", weightCol="weight",
    numTrees=100, maxDepth=8, seed=42
)
pipeline75 = Pipeline(stages=[assembler, scaler, rf75])

# 6) Train & time on 75% sample
start = time.time()
model75 = pipeline75.fit(train75)
print(f"75% sample — TRAIN TIME: {time.time() - start:.2f} sec")

# 7) Save the 75% model
model_path75 = "gs://fraud_detection_dataset_1/rf_75pct_model"
model75.write().overwrite().save(model_path75)
print(f"Model saved to: {model_path75}")

25/05/04 00:53:17 WARN DAGScheduler: Broadcasting large task binary with size 1273.2 KiB
                                                                                

75% sample — TRAIN TIME: 240.86 sec


                                                                                

Model saved to: gs://fraud_detection_dataset_1/rf_75pct_model


In [9]:
preds75 = (
    model75.transform(test75)
           .withColumn("prob_arr", vector_to_array(col("probability")))
           .withColumn("pred_label",
               when(col("prob_arr")[1] > 0.5, 1).otherwise(0))
)

rdd75 = preds75.select("pred_label","isFraud") \
               .rdd.map(lambda r: (float(r[0]), float(r[1])))
metrics75 = MulticlassMetrics(rdd75)

print(f"Accuracy: {metrics75.accuracy:.4f}")
print("=== Confusion Matrix ===")
print(metrics75.confusionMatrix().toArray())

# Classification report
labels75  = sorted(rdd75.map(lambda x: x[1]).distinct().collect())
support75 = dict(test75.groupBy("isFraud").count().rdd.map(lambda r: (float(r[0]), r[1])).collect())
print("\n=== Classification Report ===")
for lbl in labels75:
    print(f"Class {int(lbl)} — "
          f"precision: {metrics75.precision(lbl):.4f}, "
          f"recall:    {metrics75.recall(lbl):.4f}, "
          f"f1-score:  {metrics75.fMeasure(lbl):.4f}, "
          f"support:   {support75.get(lbl,0)}")

25/05/04 00:55:54 WARN DAGScheduler: Broadcasting large task binary with size 1027.5 KiB
25/05/04 00:55:54 WARN DAGScheduler: Broadcasting large task binary with size 1039.9 KiB
25/05/04 00:56:01 WARN DAGScheduler: Broadcasting large task binary with size 1032.1 KiB


Accuracy: 0.9999
=== Confusion Matrix ===
[[9.33532e+05 9.00000e+01]
 [2.00000e+00 1.25600e+03]]





=== Classification Report ===
Class 0 — precision: 1.0000, recall:    0.9999, f1-score:  1.0000, support:   933622
Class 1 — precision: 0.9331, recall:    0.9984, f1-score:  0.9647, support:   1258


                                                                                

In [10]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder,
    VectorAssembler, StandardScaler
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
from pyspark.mllib.evaluation import MulticlassMetrics
import time

# 1) Load & feature-engineer
raw = spark.read.csv(
    "gs://fraud_detection_dataset_1/Synthetic_Financial_datasets_log.csv",
    header=True, inferSchema=True
)
df = (raw
      .drop("nameOrig","nameDest","isFlaggedFraud","step")
      .withColumn("deltaOrig", col("oldbalanceOrg")  - col("newbalanceOrig"))
      .withColumn("deltaDest", col("newbalanceDest") - col("oldbalanceDest"))
      .withColumn("amount_to_orig_ratio", col("amount")/(col("oldbalanceOrg")+1))
      .withColumn("amount_to_dest_ratio",   col("amount")/(col("oldbalanceDest")+1))
)

# 2) Encode & assemble
si_model    = StringIndexer(inputCol="type", outputCol="type_idx", handleInvalid="keep").fit(df)
df_indexed  = si_model.transform(df)
ohe_model   = OneHotEncoder(inputCols=["type_idx"], outputCols=["type_vec"]).fit(df_indexed)
df_encoded  = ohe_model.transform(df_indexed)

# 3) **Stratified 50% sample**, then 80/20 split
sample50 = df_encoded.stat.sampleBy("isFraud", {0: 0.50, 1: 0.50}, seed=2025)
train50  = sample50.stat.sampleBy("isFraud", {0: 0.80, 1: 0.80}, seed=2025)
test50   = sample50.subtract(train50)

# 4) Compute class weights on TRAIN50
total50  = train50.count()
frauds50 = train50.filter(col("isFraud") == 1).count()
w50      = 1.0 / (frauds50 / total50)
train50  = train50.withColumn("weight", when(col("isFraud") == 1, w50).otherwise(1.0))

# 5) Build pipeline
feature_cols = [
    'amount','oldbalanceOrg','newbalanceOrig',
    'oldbalanceDest','newbalanceDest',
    'deltaOrig','deltaDest',
    'amount_to_orig_ratio','amount_to_dest_ratio',
    'type_vec'
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures")
scaler    = StandardScaler(inputCol="rawFeatures", outputCol="features",
                            withStd=True, withMean=False)
rf50      = RandomForestClassifier(
    labelCol="isFraud", featuresCol="features", weightCol="weight",
    numTrees=100, maxDepth=8, seed=42
)
pipeline50 = Pipeline(stages=[assembler, scaler, rf50])

# 6) Train & time on 50% sample
start = time.time()
model50 = pipeline50.fit(train50)
print(f"50% sample — TRAIN TIME: {time.time() - start:.2f} sec")

# 7) Save the 50% model
model_path50 = "gs://fraud_detection_dataset_1/rf_50pct_model"
model50.write().overwrite().save(model_path50)
print(f"Model saved to: {model_path50}")

# 8) Predict & metrics
preds50 = (
    model50.transform(test50)
           .withColumn("prob_arr", vector_to_array(col("probability")))
           .withColumn("pred_label",
               when(col("prob_arr")[1] > 0.5, 1).otherwise(0))
)

rdd50 = preds50.select("pred_label","isFraud") \
               .rdd.map(lambda r: (float(r[0]), float(r[1])))
metrics50 = MulticlassMetrics(rdd50)

print(f"Accuracy: {metrics50.accuracy:.4f}")
print("=== Confusion Matrix ===")
print(metrics50.confusionMatrix().toArray())

# Classification report
labels50  = sorted(rdd50.map(lambda x: x[1]).distinct().collect())
support50 = dict(test50.groupBy("isFraud").count().rdd.map(lambda r: (float(r[0]), r[1])).collect())
print("\n=== Classification Report ===")
for lbl in labels50:
    print(f"Class {int(lbl)} — "
          f"precision: {metrics50.precision(lbl):.4f}, "
          f"recall:    {metrics50.recall(lbl):.4f}, "
          f"f1-score:  {metrics50.fMeasure(lbl):.4f}, "
          f"support:   {support50.get(lbl,0)}")


25/05/04 01:02:18 WARN DAGScheduler: Broadcasting large task binary with size 1132.3 KiB
                                                                                

50% sample — TRAIN TIME: 167.04 sec


                                                                                

Model saved to: gs://fraud_detection_dataset_1/rf_50pct_model


                                                                                

Accuracy: 1.0000
=== Confusion Matrix ===
[[6.25801e+05 1.50000e+01]
 [2.00000e+00 8.51000e+02]]





=== Classification Report ===
Class 0 — precision: 1.0000, recall:    1.0000, f1-score:  1.0000, support:   626015
Class 1 — precision: 0.9827, recall:    0.9977, f1-score:  0.9901, support:   806


                                                                                

In [11]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder,
    VectorAssembler, StandardScaler
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
from pyspark.mllib.evaluation import MulticlassMetrics
import time

# 1) Load & feature-engineer
raw = spark.read.csv(
    "gs://fraud_detection_dataset_1/Synthetic_Financial_datasets_log.csv",
    header=True, inferSchema=True
)
df = (raw
      .drop("nameOrig","nameDest","isFlaggedFraud","step")
      .withColumn("deltaOrig", col("oldbalanceOrg")  - col("newbalanceOrig"))
      .withColumn("deltaDest", col("newbalanceDest") - col("oldbalanceDest"))
      .withColumn("amount_to_orig_ratio", col("amount")/(col("oldbalanceOrg")+1))
      .withColumn("amount_to_dest_ratio",   col("amount")/(col("oldbalanceDest")+1))
)

# 2) Encode & assemble
si_model    = StringIndexer(inputCol="type", outputCol="type_idx", handleInvalid="keep").fit(df)
df_indexed  = si_model.transform(df)
ohe_model   = OneHotEncoder(inputCols=["type_idx"], outputCols=["type_vec"]).fit(df_indexed)
df_encoded  = ohe_model.transform(df_indexed)

# 3) **Stratified 20% sample**, then 80/20 split
sample20 = df_encoded.stat.sampleBy("isFraud", {0: 0.20, 1: 0.20}, seed=2025)
train20  = sample20.stat.sampleBy("isFraud", {0: 0.80, 1: 0.80}, seed=2025)
test20   = sample20.subtract(train20)

# 4) Compute class weights on TRAIN20
total20  = train20.count()
frauds20 = train20.filter(col("isFraud") == 1).count()
w20      = 1.0 / (frauds20 / total20)
train20  = train20.withColumn("weight", when(col("isFraud") == 1, w20).otherwise(1.0))

# 5) Build pipeline
feature_cols = [
    'amount','oldbalanceOrg','newbalanceOrig',
    'oldbalanceDest','newbalanceDest',
    'deltaOrig','deltaDest',
    'amount_to_orig_ratio','amount_to_dest_ratio',
    'type_vec'
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures")
scaler    = StandardScaler(inputCol="rawFeatures", outputCol="features",
                            withStd=True, withMean=False)
rf20      = RandomForestClassifier(
    labelCol="isFraud", featuresCol="features", weightCol="weight",
    numTrees=100, maxDepth=8, seed=42
)
pipeline20 = Pipeline(stages=[assembler, scaler, rf20])

# 6) Train & time on 20% sample
start = time.time()
model20 = pipeline20.fit(train20)
print(f"20% sample — TRAIN TIME: {time.time() - start:.2f} sec")

# 7) Save the 20% model
model_path20 = "gs://fraud_detection_dataset_1/rf_20pct_model"
model20.write().overwrite().save(model_path20)
print(f"Model saved to: {model_path20}")

# 8) Predict & metrics
preds20 = (
    model20.transform(test20)
            .withColumn("prob_arr", vector_to_array(col("probability")))
            .withColumn("pred_label",
                when(col("prob_arr")[1] > 0.5, 1).otherwise(0))
)

rdd20 = preds20.select("pred_label","isFraud") \
               .rdd.map(lambda r: (float(r[0]), float(r[1])))
metrics20 = MulticlassMetrics(rdd20)

print(f"Accuracy: {metrics20.accuracy:.4f}")
print("=== Confusion Matrix ===")
print(metrics20.confusionMatrix().toArray())

# Classification report
labels20  = sorted(rdd20.map(lambda x: x[1]).distinct().collect())
support20 = dict(test20.groupBy("isFraud").count().rdd.map(lambda r: (float(r[0]), r[1])).collect())
print("\n=== Classification Report ===")
for lbl in labels20:
    print(f"Class {int(lbl)} — "
          f"precision: {metrics20.precision(lbl):.4f}, "
          f"recall:    {metrics20.recall(lbl):.4f}, "
          f"f1-score:  {metrics20.fMeasure(lbl):.4f}, "
          f"support:   {support20.get(lbl,0)}")


                                                                                

20% sample — TRAIN TIME: 68.70 sec


                                                                                

Model saved to: gs://fraud_detection_dataset_1/rf_20pct_model


                                                                                

Accuracy: 1.0000
=== Confusion Matrix ===
[[2.52905e+05 9.00000e+00]
 [0.00000e+00 3.25000e+02]]





=== Classification Report ===
Class 0 — precision: 1.0000, recall:    1.0000, f1-score:  1.0000, support:   252914
Class 1 — precision: 0.9731, recall:    1.0000, f1-score:  0.9863, support:   325


                                                                                