<a href="https://colab.research.google.com/github/iraj259/Machine-Learning/blob/main/FraudAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, log1p, isnan, count, sum as spark_sum, expr
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import time

In [36]:
spark = SparkSession.builder \
.appName("FraudDetection") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()


spark.sparkContext.setLogLevel("WARN")

In [37]:
csv_path = "./fraudDetection.csv"

In [40]:
schema = StructType([
StructField("step", IntegerType(), True),
StructField("type", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("nameOrig", StringType(), True),
StructField("oldbalanceOrg", DoubleType(), True),
StructField("newbalanceOrig", DoubleType(), True),
StructField("nameDest", StringType(), True),
StructField("oldbalanceDest", DoubleType(), True),
StructField("newbalanceDest", DoubleType(), True),
StructField("isFraud", DoubleType(), True),
StructField("isFlaggedFraud", DoubleType(), True)
])


df = spark.read.csv(csv_path, header=True, schema=schema)


print("Schema:")
df.printSchema()
print("Row count:", df.count())

Schema:
root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: double (nullable = true)
 |-- isFlaggedFraud: double (nullable = true)

Row count: 6362620


In [41]:
null_counts = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show(truncate=False)

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|0   |0   |0     |0       |0            |0             |0       |0             |0             |0      |0             |
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



In [42]:
numeric_stats = df.select("step","amount","oldbalanceOrg","newbalanceOrig","oldbalanceDest","newbalanceDest","isFraud").describe()
numeric_stats.show()

+-------+------------------+-----------------+-----------------+------------------+------------------+------------------+--------------------+
|summary|              step|           amount|    oldbalanceOrg|    newbalanceOrig|    oldbalanceDest|    newbalanceDest|             isFraud|
+-------+------------------+-----------------+-----------------+------------------+------------------+------------------+--------------------+
|  count|           6362620|          6362620|          6362620|           6362620|           6362620|           6362620|             6362620|
|   mean|243.39724563151657|179861.9035491287|833883.1040744764| 855113.6685785812|1100701.6665196533|1224996.3982019224|0.001290820448180152|
| stddev|142.33197104913066|603858.2314629209|2888242.673037527|2924048.5029542595|3399180.1129944525|3674128.9421196915|0.035904796801604424|
|    min|                 1|              0.0|              0.0|               0.0|               0.0|               0.0|                 0.0|

In [43]:
df = df.drop("isFlaggedFraud", "nameOrig", "nameDest")
df = df.filter(col("isFraud").isNotNull())

In [44]:
df = df.dropDuplicates()

In [45]:
class_dist = df.groupBy("isFraud").count().toPandas()
print(class_dist)

   isFraud    count
0      0.0  6353880
1      1.0     8197


In [46]:
fraud_by_type = df.groupBy("type").agg(count("type").alias("count"), spark_sum(col("isFraud")).alias("fraud_count"))
fraud_by_type = fraud_by_type.withColumn("fraud_rate", col("fraud_count")/col("count"))
fraud_by_type.orderBy(col("fraud_rate").desc()).show()

+--------+-------+-----------+--------------------+
|    type|  count|fraud_count|          fraud_rate|
+--------+-------+-----------+--------------------+
|TRANSFER| 532909|     4097.0|0.007687991758442811|
|CASH_OUT|2237484|     4100.0|0.001832415337942...|
| CASH_IN|1399284|        0.0|                 0.0|
| PAYMENT|2150968|        0.0|                 0.0|
|   DEBIT|  41432|        0.0|                 0.0|
+--------+-------+-----------+--------------------+



In [None]:
time_fraud = df.groupBy("step").agg(count("step").alias("volume"), spark_sum(col("isFraud")).alias("fraud_count"))
time_fraud = time_fraud.withColumn("fraud_rate", col("fraud_count")/col("volume"))
time_fraud.orderBy("step").show(24)

In [None]:
from pyspark.sql.functions import (round as spark_round)

In [None]:
df = df.withColumn("orig_balance_diff", col("oldbalanceOrg") - col("newbalanceOrig")) \
.withColumn("dest_balance_diff", col("newbalanceDest") - col("oldbalanceDest")) \
.withColumn("orig_balance_zero", when((col("oldbalanceOrg") > 0) & (col("orig_balance_diff") == 0), 1.0).otherwise(0.0)) \
.withColumn("dest_balance_zero", when((col("amount") > 0) & (col("dest_balance_diff") == 0), 1.0).otherwise(0.0))

In [None]:
for c in ["amount","oldbalanceOrg","newbalanceOrig","oldbalanceDest","newbalanceDest"]:
     df = df.withColumn(f"log_{c}", log1p(col(c)))

In [None]:
df = df.cache()
print('Cached dataframe')

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
type_indexer = StringIndexer(inputCol="type", outputCol="type_index", handleInvalid='keep')
type_encoder = OneHotEncoder(inputCols=["type_index"], outputCols=["type_vec"])

In [None]:
feature_cols = [
'step',
'log_amount', 'log_oldbalanceOrg', 'log_newbalanceOrig', 'log_oldbalanceDest', 'log_newbalanceDest',
'orig_balance_diff','dest_balance_diff','orig_balance_zero','dest_balance_zero','type_vec'
]

In [None]:
assembler = VectorAssembler(
inputCols=[c for c in ['step','log_amount','log_oldbalanceOrg','log_newbalanceOrig','log_oldbalanceDest','log_newbalanceDest','orig_balance_diff','dest_balance_diff','orig_balance_zero','dest_balance_zero','type_vec']],
outputCol='raw_features',
handleInvalid='keep'
)


scaler = StandardScaler(inputCol='raw_features', outputCol='features')

In [None]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print('Train count:', train_df.count(), 'Test count:', test_df.count())


# If class imbalance is extreme, consider resampling only on training set
fraud_count = train_df.filter(col('isFraud') == 1.0).count()
nonfraud_count = train_df.filter(col('isFraud') == 0.0).count()
print('Train fraud:', fraud_count, 'Train non-fraud:', nonfraud_count)

In [None]:
if fraud_count > 0 and fraud_count < nonfraud_count:
  ratio = int(nonfraud_count / fraud_count)
fraud_df = train_df.filter(col('isFraud') == 1.0)
replicated = fraud_df
for i in range(ratio-1):
    replicated = replicated.union(fraud_df)
train_df = train_df.filter(col('isFraud') == 0.0).union(replicated)
print('After replication train counts -> fraud:', train_df.filter(col('isFraud')==1.0).count(), 'nonfraud:', train_df.filter(col('isFraud')==0.0).count())

In [None]:
preprocessing_stages = [type_indexer, type_encoder, assembler, scaler]


# Logistic Regression
lr = LogisticRegression(featuresCol='features', labelCol='isFraud', maxIter=20)
pipeline_lr = Pipeline(stages=preprocessing_stages + [lr])


# Random Forest
rf = RandomForestClassifier(featuresCol='features', labelCol='isFraud', numTrees=100)
pipeline_rf = Pipeline(stages=preprocessing_stages + [rf])


# Gradient-Boosted Trees
gbt = GBTClassifier(featuresCol='features', labelCol='isFraud', maxIter=50)
pipeline_gbt = Pipeline(stages=preprocessing_stages + [gbt])

In [None]:
evaluator = BinaryClassificationEvaluator(labelCol='isFraud', rawPredictionCol='rawPrediction', metricName='areaUnderROC')


models = {}
metrics = {}


for name, pipeline in [('LR', pipeline_lr), ('RF', pipeline_rf), ('GBT', pipeline_gbt)]:
    print(f"Training {name} ...")
t0 = time.time()
model = pipeline.fit(train_df)
t1 = time.time()
print(f"{name} training time: {t1-t0:.2f}s")
models[name] = model


# Predictions
preds = model.transform(test_df)
auc = evaluator.evaluate(preds)

In [None]:
tp = preds.filter((col('isFraud')==1.0) & (col('prediction')==1.0)).count()
tn = preds.filter((col('isFraud')==0.0) & (col('prediction')==0.0)).count()
fp = preds.filter((col('isFraud')==0.0) & (col('prediction')==1.0)).count()
fn = preds.filter((col('isFraud')==1.0) & (col('prediction')==0.0)).count()


precision = tp / (tp+fp) if (tp+fp) > 0 else 0.0
recall = tp / (tp+fn) if (tp+fn) > 0 else 0.0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0


metrics[name] = {
'auc': auc,
'precision': precision,
'recall': recall,
'f1': f1,
'tp': tp,
'tn': tn,
'fp': fp,
'fn': fn,
'train_time_s': t1-t0
}
print(f"{name} metrics -> AUC: {auc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")

In [None]:
print('\n=== Metrics Summary ===')
for k,v in metrics.items():
    print(k, v)

In [None]:
rf_model = models.get('RF')
if rf_model:
  rf_stage = [s for s in rf_model.stages if s.__class__.__name__ == 'RandomForestClassificationModel']
if rf_stage:
  rf_stage = rf_stage[0]
importances = rf_stage.featureImportances
print('RF feature importances vector:', importances)

In [None]:
model_dir = './saved_models'
for name, model in models.items():
    out = f"{model_dir}/{name}_pipeline"
    try:
        model.write().overwrite().save(out)
        print(f"Saved {name} to {out}")
    except Exception as e:
        print(f"Warning: could not save {name} -> {e}")

In [None]:
sample_sizes = [0.01, 0.05, 0.1, 0.25, 0.5]
benchmarks = []
for frac in sample_sizes:
     sample_train = train_df.sample(withReplacement=False, fraction=frac, seed=42)
t0 = time.time()
_ = pipeline_rf.fit(sample_train)
t1 = time.time()
benchmarks.append((frac, t1-t0))
print(f"RF fit time for {frac*100:.1f}% of train: {t1-t0:.2f}s")

In [None]:
import json
with open('metrics_summary.json', 'w') as f:
      json.dump(metrics, f)


bench_df = spark.createDataFrame(benchmarks, schema=['fraction','seconds'])
bench_df.coalesce(1).write.mode('overwrite').option('header',True).csv('./benchmarks')