# Cài đặt Apache Spark & Java

In [None]:
# Cài Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Xoá các file .tgz cũ (nếu có) để tránh trùng tên
!rm -f spark-3.4.1-bin-hadoop3.tgz*

# Tải Spark bản 3.4.1 ổn định
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Kiểm tra file vừa tải
!ls -lh spark-3.4.1-bin-hadoop3.tgz

# Giải nén Spark
!tar -xzf spark-3.4.1-bin-hadoop3.tgz

# Kiểm tra thư mục sau khi giải nén
!ls -l spark-3.4.1-bin-hadoop3

# Cài findspark để khởi tạo Spark trong Python
!pip install -q findspark


-rw-r--r-- 1 root root 371M Jun 19  2023 spark-3.4.1-bin-hadoop3.tgz
total 156
drwxr-xr-x 2 1000 1000  4096 Jun 19  2023 bin
drwxr-xr-x 2 1000 1000  4096 Jun 19  2023 conf
drwxr-xr-x 5 1000 1000  4096 Jun 19  2023 data
drwxr-xr-x 4 1000 1000  4096 Jun 19  2023 examples
drwxr-xr-x 2 1000 1000 20480 Jun 19  2023 jars
drwxr-xr-x 4 1000 1000  4096 Jun 19  2023 kubernetes
-rw-r--r-- 1 1000 1000 22982 Jun 19  2023 LICENSE
drwxr-xr-x 2 1000 1000  4096 Jun 19  2023 licenses
-rw-r--r-- 1 1000 1000 57842 Jun 19  2023 NOTICE
drwxr-xr-x 9 1000 1000  4096 Jun 19  2023 python
drwxr-xr-x 3 1000 1000  4096 Jun 19  2023 R
-rw-r--r-- 1 1000 1000  4605 Jun 19  2023 README.md
-rw-r--r-- 1 1000 1000   165 Jun 19  2023 RELEASE
drwxr-xr-x 2 1000 1000  4096 Jun 19  2023 sbin
drwxr-xr-x 2 1000 1000  4096 Jun 19  2023 yarn


# Cấu hình môi trường & Khởi tạo SparkSession

In [None]:
import os
import findspark

# Cấu hình biến môi trường
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

# Khởi tạo findspark
findspark.init()

from pyspark.sql import SparkSession

# Tạo SparkSession
spark = SparkSession.builder \
    .appName("FraudDetectionDemo") \
    .getOrCreate()

# Kiểm tra SparkSession
print("Spark version:", spark.version)



Spark version: 3.4.1


# Tải dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Đọc dữ liệu vào Spark DataFrame

In [None]:
file_path = "/content/drive/MyDrive/Colab Notebooks/Paysim.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.printSchema()
df.show(5)


root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PA

# Tạo cột phân nhóm gian lận

Hệ thống gán nhãn gian lận (flag)

G1: Giao dịch isFraud = 1 và isFlaggedFraud = 1 → Flag đúng.

G2: Giao dịch isFraud = 1 và isFlaggedFraud = 0 → Bỏ sót.

G3: Giao dịch isFraud = 0 và isFlaggedFraud = 1 → Flag nhầm.


In [None]:
from pyspark.sql.functions import when, col

df = df.withColumn("fraud_group",
    when((col("isFraud") == 1) & (col("isFlaggedFraud") == 1), "Gian lận đúng flag")
    .when((col("isFraud") == 1) & (col("isFlaggedFraud") == 0), "Bỏ sót gian lận")
    .when((col("isFraud") == 0) & (col("isFlaggedFraud") == 1), "Flag sai")
    .otherwise("Bình thường")
)

df.select("isFraud", "isFlaggedFraud", "fraud_group").show(10)


+-------+--------------+---------------+
|isFraud|isFlaggedFraud|    fraud_group|
+-------+--------------+---------------+
|      0|             0|    Bình thường|
|      0|             0|    Bình thường|
|      1|             0|Bỏ sót gian lận|
|      1|             0|Bỏ sót gian lận|
|      0|             0|    Bình thường|
|      0|             0|    Bình thường|
|      0|             0|    Bình thường|
|      0|             0|    Bình thường|
|      0|             0|    Bình thường|
|      0|             0|    Bình thường|
+-------+--------------+---------------+
only showing top 10 rows



# So sánh đặc trưng giữa ba nhóm

In [None]:
from pyspark.sql.functions import col


# Loại giao dịch không bao giờ gian lận
df.filter(col("isFraud") == 1).groupBy("type").count().show()


# Thống kê số lượng giao dịch theo nhóm gian lận và loại giao dịch
count_by_type = df.groupBy("fraud_group", "type").count().orderBy("fraud_group", "count", ascending=False)
count_by_type.show(20)

# Tính tiền trung bình theo nhóm gian lận
avg_amount = df.groupBy("fraud_group").avg("amount")
avg_amount.show()


+--------+-----+
|    type|count|
+--------+-----+
|TRANSFER|  578|
|CASH_OUT|  599|
+--------+-----+

+---------------+--------+------+
|    fraud_group|    type| count|
+---------------+--------+------+
|       Flag sai|CASH_OUT|     4|
|Bỏ sót gian lận|CASH_OUT|   599|
|Bỏ sót gian lận|TRANSFER|   578|
|    Bình thường|CASH_OUT|373066|
|    Bình thường| PAYMENT|353807|
|    Bình thường| CASH_IN|227128|
|    Bình thường|TRANSFER| 86218|
|    Bình thường|   DEBIT|  7175|
+---------------+--------+------+

+---------------+------------------+
|    fraud_group|       avg(amount)|
+---------------+------------------+
|    Bình thường| 157542.1819822183|
|Bỏ sót gian lận|1157280.1532965186|
|       Flag sai|            5000.0|
+---------------+------------------+



# Random Forest

Xây dựng một pipeline ML hoàn chỉnh từ tiền xử lý (chuyển đổi biến), tạo vector đặc trưng, huấn luyện mô hình Random Forest, dự đoán và đánh giá hiệu suất mô hình

=> Dự đoán xem một giao dịch có phải gian lận (fraud) hay không dựa trên các đặc trưng đầu vào.



ML:  Sử dụng tập dữ liệu mẫu (df_sample) để huấn luyện mô hình Random Forest (model) có khả năng phát hiện gian lận.

Phát hiện và cảnh báo tài khoản có dấu hiệu gian lận: sử dụng mô hình  (`model`) để **dự đoán và phân tích rủi ro gian lận** trên toàn bộ tập dữ liệu (`df`),  đưa ra cảnh báo đối với những tài khoản có xác suất cao hoặc nhiều lần bị nghi ngờ gian lận.






In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when, col, avg

# Sample 10% dữ liệu gốc
df_sample = df.sample(fraction=0.1, seed=42)


# Oversample isFraud = 1 để dữ liệu cân bằng hơn
fraud_df = df_sample.filter(col("isFraud") == 1)
nonfraud_df = df_sample.filter(col("isFraud") == 0)
fraud_df_oversampled = fraud_df.sample(withReplacement=True, fraction=10.0, seed=42)

df_balanced = nonfraud_df.union(fraud_df_oversampled)

# Tạo các đặc trưng hành vi vì các đặc trưng nguyên thủy (amount, oldbalanceOrg,...) không mô tả được hành vi

df_balanced = df_balanced.withColumn("diff_balance_orig", col("oldbalanceOrg") - col("newbalanceOrig")) #Tiền thật sự bị trừ
df_balanced = df_balanced.withColumn("diff_balance_dest", col("newbalanceDest") - col("oldbalanceDest")) #Tiền thực sự cộng vào
df_balanced = df_balanced.withColumn("amount_ratio", col("amount") / (col("oldbalanceOrg") + 1))  #Tỉ lệ tiền giao dịch

# Chuyển cột 'type' sang số với StringIndexer
type_indexer = StringIndexer(inputCol="type", outputCol="typeIndex")

# Tạo các features
assembler = VectorAssembler(
    inputCols=[
        "typeIndex", "amount", "oldbalanceOrg", "newbalanceOrig",
        "oldbalanceDest", "newbalanceDest", "diff_balance_orig",
        "diff_balance_dest", "amount_ratio"
    ],
    outputCol="features"
)

# Chuyển nhãn 'isFraud' thành label
label_indexer = StringIndexer(inputCol="isFraud", outputCol="label")

# Khởi tạo Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50, maxDepth=10)

# Tạo pipeline
pipeline = Pipeline(stages=[type_indexer, assembler, label_indexer, rf])

# Chia tập train/test
train_df, test_df = df_balanced.randomSplit([0.8, 0.2], seed=42)

# Huấn luyện mô hình
model = pipeline.fit(train_df)

# Dự đoán
predictions = model.transform(test_df)

# Đánh giá
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

accuracy = evaluator_acc.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
f1 = evaluator_f1.evaluate(predictions)

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

Accuracy: 0.9981945170333064
Precision: 0.9981572758566477
Recall: 0.9981945170333064
F1 Score: 0.9981451159075917


In [None]:
from pyspark.sql.functions import avg, count, when, col

# 1. Dự đoán cho toàn bộ dữ liệu (hoặc df_sample)
predictions_all = model.transform(test_df)

# 2. Lấy xác suất dự đoán gian lận (lấy cột 'probability' từ vector xác suất)
# probability là Vector( [p(not_fraud), p(fraud)] ), ta lấy phần tử thứ 2
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

get_fraud_prob = udf(lambda v: float(v[1]), DoubleType())
predictions_all = predictions_all.withColumn("fraud_prob", get_fraud_prob(col("probability")))

# 3. Tổng hợp theo tài khoản nameOrig: trung bình xác suất gian lận, số lần dự đoán gian lận
account_risk = predictions_all.groupBy("nameOrig") \
    .agg(
        avg("fraud_prob").alias("avg_fraud_prob"),
        count(when(col("prediction") == 1, True)).alias("count_flagged_fraud"),
        count("*").alias("total_transactions")
    ) \
    .orderBy(col("avg_fraud_prob").desc())


# 4. Hiển thị 20 tài khoản đầu tiên theo mức độ rủi ro (dù có cảnh báo hay không)
account_risk.show(5)


+-----------+------------------+-------------------+------------------+
|   nameOrig|    avg_fraud_prob|count_flagged_fraud|total_transactions|
+-----------+------------------+-------------------+------------------+
| C127639062|0.9984903947627225|                  2|                 2|
| C874465366|0.9984903947627225|                  1|                 1|
|C1267904754|0.9984903947627225|                  2|                 2|
| C262990065|0.9984903947627225|                  3|                 3|
| C365968372| 0.998279868446933|                  2|                 2|
+-----------+------------------+-------------------+------------------+
only showing top 5 rows

