In [1]:
import pandas as pd
import numpy as np
from pathlib import Path

# 基础路径
base_path = Path(r"C:\Users\田\Desktop\python实操\kaggle\Synthetic Financial Datasets For Fraud Detection")  # 你的原始路径

In [2]:
PS = pd.read_csv(base_path / 'PS_20174392719_1491204439457_log.csv')

In [3]:
PS

Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
0,1,PAYMENT,9839.64,C1231006815,170136.00,160296.36,M1979787155,0.00,0.00,0,0
1,1,PAYMENT,1864.28,C1666544295,21249.00,19384.72,M2044282225,0.00,0.00,0,0
2,1,TRANSFER,181.00,C1305486145,181.00,0.00,C553264065,0.00,0.00,1,0
3,1,CASH_OUT,181.00,C840083671,181.00,0.00,C38997010,21182.00,0.00,1,0
4,1,PAYMENT,11668.14,C2048537720,41554.00,29885.86,M1230701703,0.00,0.00,0,0
...,...,...,...,...,...,...,...,...,...,...,...
6362615,743,CASH_OUT,339682.13,C786484425,339682.13,0.00,C776919290,0.00,339682.13,1,0
6362616,743,TRANSFER,6311409.28,C1529008245,6311409.28,0.00,C1881841831,0.00,0.00,1,0
6362617,743,CASH_OUT,6311409.28,C1162922333,6311409.28,0.00,C1365125890,68488.84,6379898.11,1,0
6362618,743,TRANSFER,850002.52,C1685995037,850002.52,0.00,C2080388513,0.00,0.00,1,0


In [4]:
PS.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6362620 entries, 0 to 6362619
Data columns (total 11 columns):
 #   Column          Dtype  
---  ------          -----  
 0   step            int64  
 1   type            object 
 2   amount          float64
 3   nameOrig        object 
 4   oldbalanceOrg   float64
 5   newbalanceOrig  float64
 6   nameDest        object 
 7   oldbalanceDest  float64
 8   newbalanceDest  float64
 9   isFraud         int64  
 10  isFlaggedFraud  int64  
dtypes: float64(5), int64(3), object(3)
memory usage: 534.0+ MB


In [5]:
PS.describe()

Unnamed: 0,step,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
count,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0
mean,243.3972,179861.9,833883.1,855113.7,1100702.0,1224996.0,0.00129082,2.514687e-06
std,142.332,603858.2,2888243.0,2924049.0,3399180.0,3674129.0,0.0359048,0.001585775
min,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,156.0,13389.57,0.0,0.0,0.0,0.0,0.0,0.0
50%,239.0,74871.94,14208.0,0.0,132705.7,214661.4,0.0,0.0
75%,335.0,208721.5,107315.2,144258.4,943036.7,1111909.0,0.0,0.0
max,743.0,92445520.0,59585040.0,49585040.0,356015900.0,356179300.0,1.0,1.0


In [15]:
  # 数据集: Synthetic Financial Dataset for Fraud Detection (PaySim)
  # 来源: https://www.kaggle.com/datasets/ealaxi/paysim1
  # 特点: 模拟银行交易，含 isFraud 标签，适用于团伙欺诈检测

<big>PySpark + RandomForest<big>

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, when, expr
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# 初始化 Spark 会话
spark = SparkSession.builder \
    .appName("GraphFeatureExtractionForRiskControl") \
    .getOrCreate()

# ----------------------------
# 1. 加载数据（适配 PaySim 格式）
# ----------------------------
print("正在加载金融欺诈数据（PaySim 格式）...")
df = spark.read.csv(r'C:\Users\田\Desktop\python实操\kaggle\Synthetic Financial Datasets For Fraud Detection\PS_20174392719_1491204439457_log.csv',
                    header=True, inferSchema=True)  # 替换为你的文件名

# 保留关键字段：欺诈标签 + 发起方账户 + 金额
cols_to_keep = ['isFraud', 'amount', 'nameOrig']
df = df.select(cols_to_keep)

# 处理缺失值（用特殊值填充）
for column in cols_to_keep:
    df = df.withColumn(column, when(col(column).isNull(), -999).otherwise(col(column)))

print(f"数据加载完成，共 {df.count()} 条交易记录")

# ----------------------------
# 2. 构建“共享实体”
# ----------------------------
print("\n正在构建共享实体（以发起方账户 nameOrig 作为实体）...")

# 实体ID = 发起方账户（nameOrig）
df = df.withColumn('entity_id', col('nameOrig'))

print(f"共构建 {df.select('entity_id').distinct().count()} 个唯一实体")

# ----------------------------
# 3. 提取图衍生特征（使用窗口函数）
# ----------------------------
print("\n正在提取图衍生特征...")

# 特征1: 该实体的历史欺诈率
df = df.withColumn('entity_fraud_rate',
                   expr('avg(isFraud) over (partition by entity_id)'))

# 特征2: 该实体的总交易次数
df = df.withColumn('entity_tx_count',
                   expr('count(*) over (partition by entity_id)'))

# 特征3: 该实体的平均交易金额
df = df.withColumn('entity_mean_amt',
                   expr('avg(amount) over (partition by entity_id)'))  # 注意：字段是 amount

# ----------------------------
# 4. 准备训练数据
# ----------------------------
print("\n准备训练数据...")

# 基础特征（无图）
base_features = ['amount']

# 图增强特征
graph_features = ['entity_fraud_rate', 'entity_tx_count', 'entity_mean_amt']

# 合并
all_features = base_features + graph_features

# 注意：label 列必须存在，且不能在 features 中
assembler = VectorAssembler(inputCols=all_features, outputCol="features")

# 分割训练测试集
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# ----------------------------
# 5. 训练模型 & 评估
# ----------------------------
print("\n训练 Random Forest 模型...")

rf = RandomForestClassifier(labelCol="isFraud", featuresCol="features", seed=42)

pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(train_data)

predictions = model.transform(test_data)

evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction",
    labelCol="isFraud",  # 注意：这里要匹配实际 label 列名
    metricName="areaUnderROC"
)
auc_score = evaluator.evaluate(predictions)

print(f"AUC score: {auc_score:.4f}")

正在加载金融欺诈数据（PaySim 格式）...
数据加载完成，共 6362620 条交易记录

正在构建共享实体（以发起方账户 nameOrig 作为实体）...
共构建 6353307 个唯一实体

正在提取图衍生特征...

准备训练数据...

训练 Random Forest 模型...
AUC score: 0.9989


In [14]:
spark.stop()