In [1]:
import os 
import sys

python_path="/home/hduser/miniconda3/envs/pyspark/bin/python"

os.environ['PYSPARK_PYTHON']= python_path
os.environ['PYSPARK_DRIVER_PYTHON']=python_path

In [2]:
from pyspark.sql import SparkSession

try: 
    spark.stop()
except:
    pass
    
spark = SparkSession.builder \
    .appName("Fraud_ETL_YARN_Job") \
    .master("yarn") \
    .config("spark.sql.warehouse.dir", "hdfs:/hadoop-master:9000/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

print("Spark version:", spark.version)
print("Spark master:", spark.sparkContext.master)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/01 06:36:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/01 06:36:07 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


Spark version: 4.1.0
Spark master: yarn


In [6]:
hdfs_input_path = "hdfs://hadoop-master:9000/hduser/project/fraud_detection/data/Dataset_Fraud_Detection/financial_fraud_detection_dataset.csv"
df = spark.read.csv(hdfs_input_path, header=True, inferSchema=True)
df.show(5)

                                                                                

+--------------+--------------------+--------------+----------------+-------+----------------+-----------------+--------+-----------+--------+----------+---------------------------+------------------------+--------------+-----------------+---------------+---------------+-----------+
|transaction_id|           timestamp|sender_account|receiver_account| amount|transaction_type|merchant_category|location|device_used|is_fraud|fraud_type|time_since_last_transaction|spending_deviation_score|velocity_score|geo_anomaly_score|payment_channel|     ip_address|device_hash|
+--------------+--------------------+--------------+----------------+-------+----------------+-----------------+--------+-----------+--------+----------+---------------------------+------------------------+--------------+-----------------+---------------+---------------+-----------+
|       T100000|2023-08-22 09:22:...|     ACC877572|       ACC388389| 343.78|      withdrawal|        utilities|   Tokyo|     mobile|   false|      

# *Exploratory Data Analysis (EDA)*

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, count, when, isnan, lit, col, avg, stddev, min, max, corr
from pyspark.sql.types import DoubleType, FloatType, IntegerType, LongType
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
import pandas as pd

In [8]:
df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- sender_account: string (nullable = true)
 |-- receiver_account: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- location: string (nullable = true)
 |-- device_used: string (nullable = true)
 |-- is_fraud: boolean (nullable = true)
 |-- fraud_type: string (nullable = true)
 |-- time_since_last_transaction: double (nullable = true)
 |-- spending_deviation_score: double (nullable = true)
 |-- velocity_score: integer (nullable = true)
 |-- geo_anomaly_score: double (nullable = true)
 |-- payment_channel: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- device_hash: string (nullable = true)



In [9]:
total_rows = df.count()
print("Total baris data: ",  {total_rows})



Total baris data:  {5000000}


                                                                                

# Missing Value Checking and Handling

In [10]:
rows = []

for field in df.schema.fields:
    c = field.name
    dt = field.dataType

    if isinstance(dt, (DoubleType, FloatType)):
        missing = df.select(
            count(when(isnan(col(c)) | col(c).isNull(), c)).alias("missing")
        ).collect()[0]["missing"]
    else:
        missing = df.select(
            count(when(col(c).isNull(), c)).alias("missing")
        ).collect()[0]["missing"]

    rows.append((c, missing))

result_df = spark.createDataFrame(rows, ["column", "missing_count"])
result_df.show(truncate=False)

                                                                                

+---------------------------+-------------+
|column                     |missing_count|
+---------------------------+-------------+
|transaction_id             |0            |
|timestamp                  |0            |
|sender_account             |0            |
|receiver_account           |0            |
|amount                     |0            |
|transaction_type           |0            |
|merchant_category          |0            |
|location                   |0            |
|device_used                |0            |
|is_fraud                   |0            |
|fraud_type                 |4820447      |
|time_since_last_transaction|896513       |
|spending_deviation_score   |0            |
|velocity_score             |0            |
|geo_anomaly_score          |0            |
|payment_channel            |0            |
|ip_address                 |0            |
|device_hash                |0            |
+---------------------------+-------------+



In [11]:
cols_to_drop = [
    'transaction_id',
    'timestamp',
    'sender_account',
    'receiver_account',
    'fraud_type',
    'device_hash',
    'ip_address',
    'location'
]
df_clean = df.drop(*cols_to_drop)

In [12]:
df_filled = df_clean.fillna({'time_since_last_transaction': -1})

In [13]:
print("Jumlah baris saat ini:", {df_filled.count()})

rows = []

for field in df_filled.schema.fields:
    c = field.name
    dt = field.dataType

    if isinstance(dt, (DoubleType, FloatType)):
        missing = df_filled.select(
            count(when(isnan(col(c)) | col(c).isNull(), c)).alias("missing")
        ).collect()[0]["missing"]
    else:
        # STRING, BOOLEAN, TIMESTAMP, INTEGER, dll
        missing = df_filled.select(
            count(when(col(c).isNull(), c)).alias("missing")
        ).collect()[0]["missing"]

    rows.append((c, missing))

missing_df = spark.createDataFrame(rows, ["column", "missing_count"])
missing_df.show(truncate=False)

                                                                                

Jumlah baris saat ini: {5000000}


[Stage 95:>                                                         (0 + 1) / 1]

+---------------------------+-------------+
|column                     |missing_count|
+---------------------------+-------------+
|amount                     |0            |
|transaction_type           |0            |
|merchant_category          |0            |
|device_used                |0            |
|is_fraud                   |0            |
|time_since_last_transaction|0            |
|spending_deviation_score   |0            |
|velocity_score             |0            |
|geo_anomaly_score          |0            |
|payment_channel            |0            |
+---------------------------+-------------+



                                                                                

In [14]:
df.groupBy("is_fraud").count().show()



+--------+-------+
|is_fraud|  count|
+--------+-------+
|    true| 179553|
|   false|4820447|
+--------+-------+



                                                                                

In [15]:
df.groupBy("transaction_type").pivot("is_fraud").count().show()



+----------------+-------+-----+
|transaction_type|  false| true|
+----------------+-------+-----+
|      withdrawal|1203761|44874|
|         deposit|1205807|44786|
|        transfer|1205006|45328|
|         payment|1205873|44565|
+----------------+-------+-----+



                                                                                

In [16]:
df.groupBy("transaction_type").count().orderBy("count", ascending=False).show()



+----------------+-------+
|transaction_type|  count|
+----------------+-------+
|         deposit|1250593|
|         payment|1250438|
|        transfer|1250334|
|      withdrawal|1248635|
+----------------+-------+



                                                                                

In [17]:
df_filled.printSchema()

root
 |-- amount: double (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- device_used: string (nullable = true)
 |-- is_fraud: boolean (nullable = true)
 |-- time_since_last_transaction: double (nullable = false)
 |-- spending_deviation_score: double (nullable = true)
 |-- velocity_score: integer (nullable = true)
 |-- geo_anomaly_score: double (nullable = true)
 |-- payment_channel: string (nullable = true)



In [18]:
df.groupBy("merchant_category").count().orderBy("count", ascending=False).show()



+-----------------+------+
|merchant_category| count|
+-----------------+------+
|           retail|626319|
|           travel|625656|
|       restaurant|625483|
|    entertainment|625332|
|          grocery|624954|
|            other|624589|
|        utilities|624086|
|           online|623581|
+-----------------+------+



                                                                                

In [19]:
df.groupBy("device_used").count().orderBy("count", ascending=False).show()



+-----------+-------+
|device_used|  count|
+-----------+-------+
|     mobile|1251131|
|        web|1250071|
|        atm|1249640|
|        pos|1249158|
+-----------+-------+



                                                                                

In [20]:
numeric_cols = [
    'amount',
    'time_since_last_transaction',
    'spending_deviation_score',
    'velocity_score',
    'geo_anomaly_score'
]
df_filled = df_filled.withColumn("is_fraud", col ("is_fraud").cast(IntegerType()))

In [21]:
df_filled.select(numeric_cols).summary("mean", "stddev", "min", "25%", "50%", "75%", "max").show()

26/01/01 04:02:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 125:>                                                        (0 + 1) / 1]

+-------+------------------+---------------------------+------------------------+-----------------+-------------------+
|summary|            amount|time_since_last_transaction|spending_deviation_score|   velocity_score|  geo_anomaly_score|
+-------+------------------+---------------------------+------------------------+-----------------+-------------------+
|   mean|  358.934268763911|         1.0729165780591507|    -3.88115999999996...|       10.5013196| 0.5000292560000075|
| stddev|469.93331106593666|         3240.0977344690036|      1.0008069793965708|5.766842438453669|0.28863493504097754|
|    min|              0.01|         -8777.814181944444|                   -5.26|                1|                0.0|
|    25%|             26.57|        -1920.5026717130556|                   -0.68|                6|               0.25|
|    50%|            138.66|                       -1.0|                     0.0|               11|                0.5|
|    75%|            503.84|         192

                                                                                

In [22]:
df.groupBy("is_fraud").agg(
    avg("amount").alias("avg_amount"),
    avg("spending_deviation_score").alias("avg_spending_score"),
    avg("velocity_score").alias("avg_velocity"),
    avg("geo_anomaly_score").alias("avg_geo_score"),
    avg("time_since_last_transaction").alias("avg_time_since")
).show()



+--------+-----------------+--------------------+------------------+------------------+------------------+
|is_fraud|       avg_amount|  avg_spending_score|      avg_velocity|     avg_geo_score|    avg_time_since|
+--------+-----------------+--------------------+------------------+------------------+------------------+
|    true|358.5281994731318|6.518409606077314E-4|10.512377960824937|0.5004939488618937|1.7653716579192553|
|   false|358.9493941162692|-4.26852530481104...| 10.50090769590455| 0.500011947024837| 1.514836414934929|
+--------+-----------------+--------------------+------------------+------------------+------------------+



                                                                                

In [23]:
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features_corr")
df_vector = assembler.transform(df_filled).select("features_corr")

matrix = Correlation.corr(df_vector, "features_corr").head()
corr_array = matrix[0].toArray()

pdf_corr = pd.DataFrame(corr_array, columns=numeric_cols, index=numeric_cols)
print("Nilai mendekati 1 = Berhubungan kuat (positif)")
print("Nilai mendekati -1 = Berhubungan kuat (nehatif/kebalikan)")
print("Nilai 0 = Tidak ada kaitan sama sekali")
print(pdf_corr)

                                                                                

Nilai mendekati 1 = Berhubungan kuat (positif)
Nilai mendekati -1 = Berhubungan kuat (nehatif/kebalikan)
Nilai 0 = Tidak ada kaitan sama sekali
                               amount  time_since_last_transaction  \
amount                       1.000000                     0.000286   
time_since_last_transaction  0.000286                     1.000000   
spending_deviation_score     0.000799                     0.000182   
velocity_score              -0.000811                     0.000080   
geo_anomaly_score            0.000176                    -0.000028   

                             spending_deviation_score  velocity_score  \
amount                                       0.000799       -0.000811   
time_since_last_transaction                  0.000182        0.000080   
spending_deviation_score                     1.000000       -0.000155   
velocity_score                              -0.000155        1.000000   
geo_anomaly_score                            0.000472        0.000340 

In [24]:
print("Kolerasi Fitur Terhadap Target (is_fraud)")
for c in numeric_cols:
    val = df_filled.stat.corr(c, "is_fraud")
    print(f"{c} : {val:.4f}")

Kolerasi Fitur Terhadap Target (is_fraud)


                                                                                

amount : -0.0002


                                                                                

time_since_last_transaction : 0.0000


                                                                                

spending_deviation_score : 0.0002


                                                                                

velocity_score : 0.0004




geo_anomaly_score : 0.0003


                                                                                

In [25]:
from pyspark.ml.feature import StringIndexer

cat_cols = ['transaction_type', 'merchant_category', 'device_used', 'payment_channel']
stages = []

for col_name in cat_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index")
    stages.append(indexer)

In [26]:
input_cols = numeric_cols + [c + "_index" for c in cat_cols]

print(f"fitur final: {input_cols}")

fitur final: ['amount', 'time_since_last_transaction', 'spending_deviation_score', 'velocity_score', 'geo_anomaly_score', 'transaction_type_index', 'merchant_category_index', 'device_used_index', 'payment_channel_index']


In [27]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

stages=[]

In [28]:
for c in cat_cols:
    indexer = StringIndexer(
        inputCol=c,
        outputCol=c + "_index",
        handleInvalid="keep"
    )
    encoder = OneHotEncoder(
        inputCols=[c + "_index"],
        outputCols=[c + "_ohe"]
    )
    stages += [indexer, encoder]


In [29]:
input_cols = numeric_cols + [c + "_ohe" for c in cat_cols]

assembler = VectorAssembler(
    inputCols=input_cols,
    outputCol="features",
    handleInvalid="keep"
)

stages.append(assembler)

In [30]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)
model_pipeline = pipeline.fit(df_filled)
df_final = model_pipeline.transform(df_filled)

                                                                                

In [31]:
data_ready = df_final.select("features", "is_fraud")
data_ready.show(5, truncate=False)

+--------------------------------------------------------------------------+--------+
|features                                                                  |is_fraud|
+--------------------------------------------------------------------------+--------+
|(25,[0,1,2,3,4,8,15,17,23],[343.78,-1.0,-0.21,3.0,0.22,1.0,1.0,1.0,1.0])  |0       |
|(25,[0,1,2,3,4,8,16,19,22],[419.65,-1.0,-0.14,7.0,0.96,1.0,1.0,1.0,1.0])  |0       |
|(25,[0,1,2,3,4,5,14,20,23],[2773.86,-1.0,-1.78,20.0,0.89,1.0,1.0,1.0,1.0])|0       |
|(25,[0,1,2,3,4,5,16,20,21],[1666.22,-1.0,-0.6,6.0,0.37,1.0,1.0,1.0,1.0])  |0       |
|(25,[0,1,2,3,4,7,15,17,22],[24.43,-1.0,0.79,13.0,0.27,1.0,1.0,1.0,1.0])   |0       |
+--------------------------------------------------------------------------+--------+
only showing top 5 rows


In [32]:
data_ready.write.mode("overwrite").parquet(
    "hdfs://hadoop-master:9000/hduser/project/fraud_detection/data/ready_for_training"
)

                                                                                

In [34]:
spark = SparkSession.builder.appName("Fraud_Project").getOrCreate()

26/01/01 04:07:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

In [38]:
spark = SpakSession.builder.appName("Fraud_Project").getOrCreate()

NameError: name 'SpakSession' is not defined

In [39]:
path = "hdfs://hadoop-master:9000/hduser/project/fraud_detection/data/ready_for_training"
df = spark.read.parquet(path)

print(f"Total data bersih: {df.count()} baris")

Total data bersih: 5000000 baris


## Split Data

In [40]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

print(f"Jumlah data training: {train_data.count()}")
print(f"Jumlah data test: {test_data.count()}")

                                                                                

Jumlah data training: 3999208




Jumlah data test: 1000792


                                                                                

## Modelling

In [41]:
rf = RandomForestClassifier(labelCol="is_fraud", featuresCol="features", numTrees=50, maxDepth=10, maxBins=128)

In [42]:
model = rf.fit(train_data)

26/01/01 04:15:44 WARN DAGScheduler: Broadcasting large task binary with size 1858.2 KiB
26/01/01 04:16:45 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
26/01/01 04:17:52 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
26/01/01 04:19:10 WARN DAGScheduler: Broadcasting large task binary with size 1461.1 KiB
                                                                                

In [43]:
predictions = model.transform(test_data)

## Evaluasi Model

In [49]:
auc_evaluation = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="probability", metricName="areaUnderROC")

auc = auc_evaluation.evaluate(predictions)
print(f"AUC Score: {auc*100:.2f}%")

26/01/01 04:52:02 WARN DAGScheduler: Broadcasting large task binary with size 1400.0 KiB
[Stage 244:>                                                        (0 + 2) / 2]

AUC Score: 59.16%


                                                                                

In [48]:
print("Hasil Evaluasi Model")
print(f"Accuracy: {accuracy*100:.2f}%")
print(f"F1 Score: {f1*100:.2f}%")
print(f"AUC Score: {auc*100:.2f}%")

Hasil Evaluasi Model
Accuracy: 96.40%
F1 Score: 94.64%
AUC Score: 59.16%


In [51]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction")

precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 =  multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

print("\nConfusion Matrix")
predictions.groupBy("is_fraud", "prediction").count().show()

26/01/01 04:58:28 WARN DAGScheduler: Broadcasting large task binary with size 1407.9 KiB
26/01/01 04:59:10 WARN DAGScheduler: Broadcasting large task binary with size 1407.9 KiB
26/01/01 04:59:59 WARN DAGScheduler: Broadcasting large task binary with size 1407.9 KiB
                                                                                

Precision: 0.9294
Recall: 0.9640
F1 Score: 0.9464

Confusion Matrix


26/01/01 05:01:18 WARN DAGScheduler: Broadcasting large task binary with size 1401.3 KiB
26/01/01 05:03:13 WARN DAGScheduler: Broadcasting large task binary with size 1391.8 KiB
                                                                                

+--------+----------+------+
|is_fraud|prediction| count|
+--------+----------+------+
|       1|       0.0| 35988|
|       0|       0.0|964804|
+--------+----------+------+



26/01/01 05:46:36 ERROR TransportClient: Failed to send RPC RPC 7949047576278228036 to /192.168.149.129:53382
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
26/01/01 05:46:37 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 2 at RPC address 192.168.149.129:44614, but got no response. Marking as agent lost.
java.io.IOException: Failed to send RPC RPC 7949047576278228036 to /192.168.149.129:53382: io.netty.channel.StacklessClosedChannelException
	at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:395)
	at org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:372)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:604)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:571)

In [None]:
spark.stop()