# Tiền xử lý dữ liệu gian lận giao dịch (Fraud Detection Preprocessing)

Notebook này thực hiện:
1. Đọc dữ liệu từ HDFS (namenode)
2. Thống kê mô tả và EDA cơ bản
3. Làm sạch dữ liệu
4. Feature Engineering
5. Lưu kết quả dưới dạng Parquet vào HDFS

## 1. Import thư viện và cấu hình Spark

In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import warnings
warnings.filterwarnings('ignore')

# Khởi tạo Spark Session với cấu hình HDFS
spark = SparkSession.builder \
    .appName("Fraud Detection Preprocessing") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")

Spark Version: 3.3.0


## 2. Đọc dữ liệu từ HDFS

In [2]:
# Đường dẫn HDFS
hdfs_input_path = "hdfs://namenode:9000/data/input/paysim_train.csv"

# Đọc dữ liệu từ HDFS
df_spark = spark.read.csv(
    hdfs_input_path,
    header=True,
    inferSchema=True
)

print(f"Đọc dữ liệu từ HDFS: {hdfs_input_path}")
print(f"Số dòng: {df_spark.count():,}")
print(f"Số cột: {len(df_spark.columns)}")
print("\nSchema:")
df_spark.printSchema()

Đọc dữ liệu từ HDFS: hdfs://namenode:9000/data/input/paysim_train.csv
Số dòng: 5,726,358
Số cột: 11

Schema:
root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



## 3. Thống kê mô tả và EDA cơ bản

In [3]:
# Xem mẫu dữ liệu
print("Mẫu 10 dòng đầu tiên:")
df_spark.show(10, truncate=False)

Mẫu 10 dòng đầu tiên:
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|type    |amount  |nameOrig   |oldbalanceOrg|newbalanceOrig|nameDest   |oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|1   |PAYMENT |1864.28 |C1666544295|21249.0      |19384.72      |M2044282225|0.0           |0.0           |0      |0             |
|1   |TRANSFER|181.0   |C1305486145|181.0        |0.0           |C553264065 |0.0           |0.0           |1      |0             |
|1   |CASH_OUT|181.0   |C840083671 |181.0        |0.0           |C38997010  |21182.0       |0.0           |1      |0             |
|1   |PAYMENT |11668.14|C2048537720|41554.0      |29885.86      |M1230701703|0.0           |0.0           |0      |0             |
|1   |PAYMENT |7817.71 |C90045638  |53860.0      |46042.29   

In [4]:
# Chuyển sang Pandas để thống kê chi tiết
# Lấy mẫu để tránh tràn bộ nhớ
total_count = df_spark.count()
sample_size = 100000 if total_count > 100000 else total_count
train_data = df_spark.limit(sample_size).toPandas()

print("\n" + "="*80)
print("THÔNG TIN CÁC CỘT:")
print("="*80)
print(train_data.info())

print("\n" + "="*80)
print("THỐNG KÊ MÔ TẢ CÁC CỘT SỐ:")
print("="*80)
print(train_data.describe().T)

# Tỷ lệ gian lận (isFraud)
fraud_ratio = train_data['isFraud'].mean()
print("\n" + "="*80)
print(f"TỶ LỆ GIAN LẬN: {fraud_ratio:.6f} ({fraud_ratio*100:.4f}%)")
print("="*80)

# Đếm số lượng từng loại giao dịch
print("\n" + "="*80)
print("SỐ LƯỢNG MỖI LOẠI GIAO DỊCH:")
print("="*80)
print(train_data['type'].value_counts())
print("\nTỷ lệ:")
print(train_data['type'].value_counts(normalize=True) * 100)


THÔNG TIN CÁC CỘT:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 11 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   step            100000 non-null  int32  
 1   type            100000 non-null  object 
 2   amount          100000 non-null  float64
 3   nameOrig        100000 non-null  object 
 4   oldbalanceOrg   100000 non-null  float64
 5   newbalanceOrig  100000 non-null  float64
 6   nameDest        100000 non-null  object 
 7   oldbalanceDest  100000 non-null  float64
 8   newbalanceDest  100000 non-null  float64
 9   isFraud         100000 non-null  int32  
 10  isFlaggedFraud  100000 non-null  int32  
dtypes: float64(5), int32(3), object(3)
memory usage: 7.2+ MB
None

THỐNG KÊ MÔ TẢ CÁC CỘT SỐ:
                   count          mean           std   min         25%  \
step            100000.0  8.671590e+00  1.812917e+00  1.00      8.0000   
amount          100000.0  1.7

In [5]:
# Kiểm tra missing values
print("\n" + "="*80)
print("KIỂM TRA MISSING VALUES:")
print("="*80)
missing = train_data.isnull().sum()
if missing.sum() == 0:
    print("Không có missing values trong dữ liệu")
else:
    print(missing[missing > 0])


KIỂM TRA MISSING VALUES:
Không có missing values trong dữ liệu


In [6]:
# Phân tích gian lận theo loại giao dịch
print("\n" + "="*80)
print("TỶ LỆ GIAN LẬN THEO LOẠI GIAO DỊCH:")
print("="*80)
fraud_by_type = train_data.groupby('type')['isFraud'].agg(['sum', 'count', 'mean'])
fraud_by_type.columns = ['Số_gian_lận', 'Tổng_giao_dịch', 'Tỷ_lệ']
fraud_by_type['Tỷ_lệ_%'] = fraud_by_type['Tỷ_lệ'] * 100
print(fraud_by_type)


TỶ LỆ GIAN LẬN THEO LOẠI GIAO DỊCH:
          Số_gian_lận  Tổng_giao_dịch     Tỷ_lệ   Tỷ_lệ_%
type                                                     
CASH_IN             0           20169  0.000000  0.000000
CASH_OUT           56           31256  0.001792  0.179166
DEBIT               0             992  0.000000  0.000000
PAYMENT             0           39006  0.000000  0.000000
TRANSFER           54            8577  0.006296  0.629591


## 4. Làm sạch dữ liệu (Data Cleaning)

In [7]:
# Lọc chỉ lấy 2 loại giao dịch: TRANSFER và CASH_OUT
# (dựa theo phân tích EDA, chỉ 2 loại này có gian lận)
print("Bước 1: Lọc loại giao dịch TRANSFER và CASH_OUT")
df_cleaned = df_spark.filter(
    (col("type") == "TRANSFER") | (col("type") == "CASH_OUT")
)

print(f"Số dòng trước khi lọc: {df_spark.count():,}")
print(f"Số dòng sau khi lọc: {df_cleaned.count():,}")

# Mã hóa cột type thành số (0: CASH_OUT, 1: TRANSFER)
print("\nBước 2: Mã hóa cột 'type'")
df_cleaned = df_cleaned.withColumn(
    "type_encoded",
    when(col("type") == "CASH_OUT", 0).otherwise(1)
)

# Xóa các cột không cần thiết
print("\nBước 3: Xóa các cột ID không hữu ích")
columns_to_drop = ['nameOrig', 'nameDest', 'isFlaggedFraud']
df_cleaned = df_cleaned.drop(*columns_to_drop)

print(f"Đã xóa các cột: {columns_to_drop}")
print(f"Số cột còn lại: {len(df_cleaned.columns)}")

# Hiển thị schema sau khi làm sạch
print("\nSchema sau khi làm sạch:")
df_cleaned.printSchema()

Bước 1: Lọc loại giao dịch TRANSFER và CASH_OUT


Số dòng trước khi lọc: 5,726,358
Số dòng sau khi lọc: 2,493,276

Bước 2: Mã hóa cột 'type'

Bước 3: Xóa các cột ID không hữu ích
Đã xóa các cột: ['nameOrig', 'nameDest', 'isFlaggedFraud']
Số cột còn lại: 9

Schema sau khi làm sạch:
root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- type_encoded: integer (nullable = false)



## 5. Feature Engineering

In [8]:
print("Tạo các đặc trưng mới...")

eps = 1e-9

# 1. Sai số cân bằng nguồn (Origin)
df_features = df_cleaned.withColumn(
    "errorBalanceOrig",
    col("oldbalanceOrg") - col("newbalanceOrig") - col("amount")
)

# 2. Sai số cân bằng đích (Destination)
df_features = df_features.withColumn(
    "errorBalanceDest",
    col("oldbalanceDest") + col("amount") - col("newbalanceDest")
)

# 3. Tỷ lệ amount trên số dư cũ
df_features = df_features.withColumn(
    "amount_over_oldbalance",
    col("amount") / (col("oldbalanceOrg") + eps)
)

# 4. Giờ trong ngày (hour từ step)
df_features = df_features.withColumn(
    "hour",
    (col("step") % 24).cast("int")
)

# 5. Log transformation của amount
df_features = df_features.withColumn(
    "amount_log",
    log1p(col("amount"))
)

print("Đã tạo các đặc trưng:")
print("- errorBalanceOrig: Sai số cân bằng tài khoản nguồn")
print("- errorBalanceDest: Sai số cân bằng tài khoản đích")
print("- amount_over_oldbalance: Tỷ lệ amount/số dư cũ")
print("- hour: Giờ trong ngày")
print("- amount_log: Log(amount + 1)")
# Hiển thị mẫu dữ liệu
print("\nMẫu dữ liệu sau khi tạo features:")
df_features.select(
    "type", "type_encoded", "amount", "amount_log",
    "errorBalanceOrig", "errorBalanceDest",
    "amount_over_oldbalance", "hour", "isFraud"
).show(10)

Tạo các đặc trưng mới...
Đã tạo các đặc trưng:
- errorBalanceOrig: Sai số cân bằng tài khoản nguồn
- errorBalanceDest: Sai số cân bằng tài khoản đích
- amount_over_oldbalance: Tỷ lệ amount/số dư cũ
- hour: Giờ trong ngày
- amount_log: Log(amount + 1)

Mẫu dữ liệu sau khi tạo features:
+--------+------------+---------+------------------+--------------------+-------------------+----------------------+----+-------+
|    type|type_encoded|   amount|        amount_log|    errorBalanceOrig|   errorBalanceDest|amount_over_oldbalance|hour|isFraud|
+--------+------------+---------+------------------+--------------------+-------------------+----------------------+----+-------+
|TRANSFER|           1|    181.0| 5.204006687076795|                 0.0|              181.0|    0.9999999999944752|   1|      1|
|CASH_OUT|           0|    181.0| 5.204006687076795|                 0.0|            21363.0|    0.9999999999944752|   1|      1|
|TRANSFER|           1| 215310.3|12.279840166879954|           -

In [9]:
# Thống kê các features mới
print("\nThống kê mô tả các features mới:")
df_features.select(
    "errorBalanceOrig",
    "errorBalanceDest",
    "amount_over_oldbalance",
    "hour",
    "amount_log"
).describe().show()


Thống kê mô tả các features mới:
+-------+--------------------+--------------------+----------------------+------------------+------------------+
|summary|    errorBalanceOrig|    errorBalanceDest|amount_over_oldbalance|              hour|        amount_log|
+-------+--------------------+--------------------+----------------------+------------------+------------------+
|  count|             2493276|             2493276|               2493276|           2493276|           2493276|
|   mean|  -285652.8224099819| -28527.016244154267|  1.573648988800993...|15.309927982301197|11.927381616160313|
| stddev|   873446.1929679182|   592043.6992647167|  7.594543656054228E14| 4.006872273311488| 1.232706449444181|
|    min|      -9.244551664E7|-7.588572563000001E7|                   0.0|                 0|               0.0|
|    max|0.010000001639127731|               1.0E7|        9.244551664E16|                23|18.342130030468418|
+-------+--------------------+--------------------+-----------

In [10]:
# Chọn các cột cuối cùng để lưu
final_columns = [
    "step",
    "type",
    "type_encoded",
    "amount",
    "oldbalanceOrg",
    "newbalanceOrig",
    "oldbalanceDest",
    "newbalanceDest",
    "isFraud",
    "errorBalanceOrig",
    "errorBalanceDest",
    "amount_over_oldbalance",
    "hour",
    "amount_log"
]

df_final = df_features.select(*final_columns)

print("Các cột trong dataset cuối cùng:")
for i, col_name in enumerate(final_columns, 1):
    print(f"   {i:2d}. {col_name}")

print(f"\nKích thước dataset cuối cùng:")
print(f"Số dòng: {df_final.count():,}")
print(f"Số cột: {len(df_final.columns)}")

# Hiển thị mẫu
print("\nMẫu dữ liệu cuối cùng:")
df_final.show(10)

Các cột trong dataset cuối cùng:
    1. step
    2. type
    3. type_encoded
    4. amount
    5. oldbalanceOrg
    6. newbalanceOrig
    7. oldbalanceDest
    8. newbalanceDest
    9. isFraud
   10. errorBalanceOrig
   11. errorBalanceDest
   12. amount_over_oldbalance
   13. hour
   14. amount_log

Kích thước dataset cuối cùng:
Số dòng: 2,493,276
Số cột: 14

Mẫu dữ liệu cuối cùng:
+----+--------+------------+---------+-------------+--------------+--------------+--------------+-------+--------------------+-------------------+----------------------+----+------------------+
|step|    type|type_encoded|   amount|oldbalanceOrg|newbalanceOrig|oldbalanceDest|newbalanceDest|isFraud|    errorBalanceOrig|   errorBalanceDest|amount_over_oldbalance|hour|        amount_log|
+----+--------+------------+---------+-------------+--------------+--------------+--------------+-------+--------------------+-------------------+----------------------+----+------------------+
|   1|TRANSFER|           1|    

## 6. Lưu dữ liệu vào HDFS dưới dạng Parquet

In [11]:
# Lưu dữ liệu đã xử lý vào HDFS dưới dạng Parquet
hdfs_output_path = "hdfs://namenode:9000/data/output/paysim_preprocessed.parquet"

print(f"Đang lưu dữ liệu vào HDFS: {hdfs_output_path}")

df_final.write \
    .mode("overwrite") \
    .parquet(hdfs_output_path)

print(f"Đã lưu thành công!")
print(f"Đường dẫn: {hdfs_output_path}")

Đang lưu dữ liệu vào HDFS: hdfs://namenode:9000/data/output/paysim_preprocessed.parquet
Đã lưu thành công!
Đường dẫn: hdfs://namenode:9000/data/output/paysim_preprocessed.parquet


## 7. Kiểm tra kết quả đã lưu

In [12]:
# Đọc lại file parquet để kiểm tra
print("Đọc lại file Parquet để kiểm tra...")
df_verify = spark.read.parquet(hdfs_output_path)

print(f"\nĐã đọc thành công file Parquet từ HDFS")
print(f"Số dòng: {df_verify.count():,}")
print(f"Số cột: {len(df_verify.columns)}")

print("\nSchema của file Parquet:")
df_verify.printSchema()

print("\nMẫu dữ liệu từ Parquet:")
df_verify.show(5)

# Thống kê tỷ lệ fraud
fraud_count = df_verify.filter(col("isFraud") == 1).count()
total_count_verify = df_verify.count()
fraud_rate = fraud_count / total_count_verify

print(f"\nThống kê sau xử lý:")
print(f"Tổng giao dịch: {total_count_verify:,}")
print(f"Giao dịch gian lận: {fraud_count:,}")
print(f"Tỷ lệ gian lận: {fraud_rate:.6f} ({fraud_rate*100:.4f}%)")

Đọc lại file Parquet để kiểm tra...

Đã đọc thành công file Parquet từ HDFS
Số dòng: 2,493,276
Số cột: 14

Schema của file Parquet:
root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- type_encoded: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- errorBalanceOrig: double (nullable = true)
 |-- errorBalanceDest: double (nullable = true)
 |-- amount_over_oldbalance: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- amount_log: double (nullable = true)


Mẫu dữ liệu từ Parquet:
+----+--------+------------+---------+-------------+--------------+--------------+--------------+-------+-------------------+------------------+----------------------+----+------------------+
|step|    type|type_encoded|   amount

In [13]:
# Dừng Spark Session
spark.stop()
print("\nĐã dừng Spark Session")


Đã dừng Spark Session
