In [1]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))
%load_ext autoreload
%autoreload 2
import src.preprocess as preprocess
from src.config import get_spark

In [2]:
spark = get_spark('Fraudulent Transactions Data')
spark.sparkContext.setLogLevel("Warn")

In [3]:
df = spark.read.csv('../data_raw/Fraud.csv', header=True, inferSchema=True)

In [4]:
train_df = df.filter(df.step <= 600)
test_df = df.filter(df.step > 600)

In [5]:
train_copy = train_df
train_copy.createOrReplaceTempView('transactions')

In [6]:
train_copy.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [7]:
train_copy.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [8]:
train_copy.describe().show()

+-------+------------------+--------+------------------+-----------+------------------+------------------+-----------+------------------+-----------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|     oldbalanceOrg|    newbalanceOrig|   nameDest|    oldbalanceDest|   newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+------------------+------------------+-----------+------------------+-----------------+--------------------+--------------------+
|  count|           6259047| 6259047|           6259047|    6259047|           6259047|           6259047|    6259047|           6259047|          6259047|             6259047|             6259047|
|   mean|236.28799176615865|    NULL|179835.62435716562|       NULL|  837227.951295238| 858822.8135473176|       NULL|1099869.2750744992|1224634.919953178|0.001056550621843...|1.278149852525472...|
| stddev|1

所有特征均无缺失值，数值类型特征取值范围为0+

In [9]:
cols = train_copy.columns

In [10]:

train_copy.agg(F.countDistinct(cols[0])).show()
train_copy.select(cols[0]).dtypes

+--------------------+
|count(DISTINCT step)|
+--------------------+
|                 600|
+--------------------+



[('step', 'int')]

In [11]:
train_copy.agg(F.countDistinct(cols[1])).show()
train_copy.select(cols[1]).dtypes

+--------------------+
|count(DISTINCT type)|
+--------------------+
|                   5|
+--------------------+



[('type', 'string')]

In [12]:
train_copy.agg(F.countDistinct(cols[2])).show()
train_copy.select(cols[2]).dtypes

+----------------------+
|count(DISTINCT amount)|
+----------------------+
|               5244191|
+----------------------+



[('amount', 'double')]

In [13]:
train_copy.agg(F.countDistinct(cols[3])).show()
train_copy.select(cols[3]).dtypes

+------------------------+
|count(DISTINCT nameOrig)|
+------------------------+
|                 6250007|
+------------------------+



[('nameOrig', 'string')]

In [14]:
train_copy.agg(F.countDistinct(cols[4])).show()
train_copy.select(cols[4]).dtypes

+-----------------------------+
|count(DISTINCT oldbalanceOrg)|
+-----------------------------+
|                      1819590|
+-----------------------------+



[('oldbalanceOrg', 'double')]

In [15]:
train_copy.agg(F.countDistinct(cols[5])).show()
train_copy.select(cols[5]).dtypes

+------------------------------+
|count(DISTINCT newbalanceOrig)|
+------------------------------+
|                       2636970|
+------------------------------+



[('newbalanceOrig', 'double')]

In [16]:
train_copy.agg(F.countDistinct(cols[6])).show()
train_copy.select(cols[6]).dtypes

+------------------------+
|count(DISTINCT nameDest)|
+------------------------+
|                 2676208|
+------------------------+



[('nameDest', 'string')]

In [17]:
train_copy.agg(F.countDistinct(cols[7])).show()
train_copy.select(cols[7]).dtypes

+------------------------------+
|count(DISTINCT oldbalanceDest)|
+------------------------------+
|                       3558946|
+------------------------------+



[('oldbalanceDest', 'double')]

In [18]:
train_copy.agg(F.countDistinct(cols[8])).show()
train_copy.select(cols[8]).dtypes

+------------------------------+
|count(DISTINCT newbalanceDest)|
+------------------------------+
|                       3494405|
+------------------------------+



[('newbalanceDest', 'double')]

In [19]:
train_copy.agg(F.countDistinct(cols[9])).show()
train_copy.select(cols[9]).dtypes

+-----------------------+
|count(DISTINCT isFraud)|
+-----------------------+
|                      2|
+-----------------------+



[('isFraud', 'int')]

In [20]:
train_copy.agg(F.countDistinct(cols[10])).show()
train_copy.select(cols[10]).dtypes

+------------------------------+
|count(DISTINCT isFlaggedFraud)|
+------------------------------+
|                             2|
+------------------------------+



[('isFlaggedFraud', 'int')]

通过查看每个特征的独立列数和数据类型，可以初步判断：  
`step` 可作为时间序列类型特征  
`type` 类型少可直接作为类别特征  
`amount` `oldbalanceOrg` `newbalanceOrig` `oldbalanceDest` `newbalanceDest` 这几个特征都可以作为连续数值特征。
`isFraud` 作为目标变量  
`isFlaggedFraud` 为旧系统预测，可以在最后和我的模型比较评分  
其中 `nameOrig` 为唯一标识符特征；`nameDest` 则为高基数类别特征，因都为相似字母开头，因此可尝试提取首字母为类别特征。

In [21]:
y_distribution = spark.sql(
    '''
    select isFraud, count(*) as count
    from transactions
    group by isFraud
    '''
).show()

+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   6613|
|      0|6252434|
+-------+-------+



通过查看y的分布可以看出实际欺诈极少，比例大约为1:945，为极度不平衡的类别目标。

In [22]:
spark.sql(
    '''
    select type, count(*) as count
    from transactions
    group by type
    order by count desc
    '''
).show()

+--------+-------+
|    type|  count|
+--------+-------+
|CASH_OUT|2204075|
| PAYMENT|2116354|
| CASH_IN|1375225|
|TRANSFER| 522784|
|   DEBIT|  40609|
+--------+-------+



In [23]:
spark.sql(
    '''
    select floor(amount/10000)*10000 as amount_bin,
          count(*) as count,
          mean(isFraud) as fraud_rate
    from transactions
    group by amount_bin
    order by amount_bin
    '''
).toPandas()

Unnamed: 0,amount_bin,count,fraud_rate
0,0,1263915,0.000182
1,10000,711603,0.000228
2,20000,368077,0.000481
3,30000,235252,0.000676
4,40000,180679,0.000697
...,...,...,...
2453,69330000,1,0.000000
2454,69880000,1,0.000000
2455,71170000,1,0.000000
2456,73820000,1,0.000000


In [24]:
spark.sql(
    '''
    select floor(oldbalanceOrg/10000)*10000 as oldbalanceOrg_bin,
            count(*) as count,
            mean(isFraud) as fraud_rate
    from transactions
    group by oldbalanceOrg_bin
    order by oldbalanceOrg_bin
    '''
).toPandas()

Unnamed: 0,oldbalanceOrg_bin,count,fraud_rate
0,0,2834754,0.000091
1,10000,496959,0.000326
2,20000,381477,0.000461
3,30000,229758,0.000688
4,40000,169938,0.000730
...,...,...,...
3403,43680000,1,0.000000
3404,43810000,1,0.000000
3405,44890000,1,1.000000
3406,49580000,1,1.000000


In [25]:
spark.sql(
    '''
    select floor(newbalanceOrig/10000)*10000 as newbalanceOrig_bin,
            count(*) as count,
            mean(isFraud) as fraud_rate
    from transactions
    group by newbalanceOrig_bin
    order by newbalanceOrig_bin
    '''
).toPandas()

Unnamed: 0,newbalanceOrig_bin,count,fraud_rate
0,0,3763182,0.001729
1,10000,162968,0.000000
2,20000,117192,0.000000
3,30000,93107,0.000000
4,40000,87010,0.000000
...,...,...,...
3415,41430000,1,0.000000
3416,41690000,1,0.000000
3417,43670000,1,0.000000
3418,43680000,1,0.000000


值得注意的是， `newbalanceOrig` 特征与欺诈率呈断崖式关系：余额低于约1万元时欺诈率显著，之后急剧下降至接近0。  
接下来，我将查看0-10000区间内的具体欺诈率分布。

In [26]:
spark.sql(
    '''
    select newbalanceOrig,
        count(*) as count,
        mean(isFraud) as fraud_rate
    from transactions
    where newbalanceOrig<10000
    group by newbalanceOrig
    order by newbalanceOrig
    '''
).toPandas()

Unnamed: 0,newbalanceOrig,count,fraud_rate
0,0.00,3554124,0.001831
1,0.01,1,0.000000
2,0.03,1,0.000000
3,0.05,1,0.000000
4,0.12,1,0.000000
...,...,...,...
188657,9999.54,1,0.000000
188658,9999.57,1,0.000000
188659,9999.87,1,0.000000
188660,9999.88,1,0.000000


因为 `newbalanceOrig` 为0时欺诈率断档领先，因此构造二值特征 `newbalanceOrig_isZero` 以单独识别该风险模式。  
同时，考虑到 `newbalanceOrig` 与 `amount`、`oldbalanceOrg` 存在线性依赖关系（amount = oldbalanceOrg - newbalanceOrig），
为保持模型简洁性与稳健性，移除原始变量 `newbalanceOrig。`

In [27]:
train_copy = (
    train_copy
    .withColumn('newbalanceOrig_isZero', F.when(F.col('newbalanceOrig')==0, 1).otherwise(0))
    .drop('newbalanceOrig')
)

In [28]:
spark.sql(
    '''
    select floor(oldbalanceDest/10000)*10000 as oldbalanceDest_bin,
            count(*) as count,
            mean(isFraud) as fraud_rate
    from transactions
    group by oldbalanceDest_bin
    order by oldbalanceDest_bin
    '''
).toPandas()

Unnamed: 0,oldbalanceDest_bin,count,fraud_rate
0,0,2703671,0.001613
1,10000,40839,0.001224
2,20000,37957,0.001871
3,30000,35551,0.001688
4,40000,33761,0.001451
...,...,...,...
7187,328190000,1,0.000000
7188,355180000,1,0.000000
7189,355380000,2,0.000000
7190,355550000,1,0.000000


In [29]:
spark.sql(
    '''
    select floor(newbalanceDest/10000)*10000 as newbalanceDest_bin,
            count(*) as count,
            mean(isFraud) as fraud_rate
    from transactions
    group by newbalanceDest_bin
    order by newbalanceDest_bin
    '''
).toPandas()

Unnamed: 0,newbalanceDest_bin,count,fraud_rate
0,0,2430316,0.001367
1,10000,33168,0.000814
2,20000,33299,0.000841
3,30000,33756,0.001066
4,40000,33891,0.000826
...,...,...,...
7492,355180000,1,0.000000
7493,355380000,2,0.000000
7494,355550000,2,0.000000
7495,356010000,1,0.000000


以上所有连续数值类特征（`amount` `oldbalanceOrg` `oldbalanceDest` `newbalanceDest`）都右偏分布严重且跨越多个数量级，因此考虑对他们进行**log1p变换**处理以平滑分布。  

In [30]:
train_copy = (
    train_copy
    .withColumn('amount_log', F.log1p(F.col('amount')))
    .withColumn('oldbalanceOrg_log', F.log1p(F.col('oldbalanceOrg')))
    .withColumn('oldbalanceDest_log', F.log1p(F.col('oldbalanceDest')))
    .withColumn('newbalanceDest_log', F.log1p(F.col('newbalanceDest')))
    .drop('amount', 'oldbalanceOrg', 'oldbalanceDest', 'newbalanceDest')
)

另对 `nameOrig` 和 `nameDest` 提取首字母后进行组合，构造交叉特征 `trans_direct`，表示交易发起方与接收方之间的转账方向（如 C→M、C→C、M→C 等）。

In [31]:
train_copy = (
    train_copy
    .withColumn('nameDest', F.substring(F.col('nameDest'), 1, 1))
    .withColumn('nameOrig', F.substring(F.col('nameOrig'), 1, 1))
    .withColumn('trans_direct', F.concat_ws('_', F.col('nameOrig'), F.col('nameDest')))
    .drop('nameDest', 'nameOrig')
)

In [32]:
train_copy.show(5)

+----+--------+-------+--------------+---------------------+-----------------+------------------+------------------+------------------+------------+
|step|    type|isFraud|isFlaggedFraud|newbalanceOrig_isZero|       amount_log| oldbalanceOrg_log|oldbalanceDest_log|newbalanceDest_log|trans_direct|
+----+--------+-------+--------------+---------------------+-----------------+------------------+------------------+------------------+------------+
|   1| PAYMENT|      0|             0|                    0|9.194276028581655| 12.04435927383651|               0.0|               0.0|         C_M|
|   1| PAYMENT|      0|             0|                    0|7.531166454857185| 9.964112174352563|               0.0|               0.0|         C_M|
|   1|TRANSFER|      1|             0|                    1|5.204006687076795| 5.204006687076795|               0.0|               0.0|         C_C|
|   1|CASH_OUT|      1|             0|                    1|5.204006687076795| 5.204006687076795| 9.960954

In [33]:
train_copy.createOrReplaceTempView('transactions_1')

In [34]:
spark.sql(
    '''
    select trans_direct,
            count(*) as count,
            mean(isFraud) as fraud_rate
    from transactions_1
    group by trans_direct
    order by count
    '''
).show()

+------------+-------+--------------------+
|trans_direct|  count|          fraud_rate|
+------------+-------+--------------------+
|         C_M|2116354|                 0.0|
|         C_C|4142693|0.001596304626000...|
+------------+-------+--------------------+



该交叉特征显示，不同交易方向的欺诈率差异显著：  
客户→商户（C_M）为正常交易，占比 33%，欺诈率为 0%；  
客户→客户（C_C）欺诈率显著更高（0.16%），表明欺诈行为更常发生于客户间的转账场景。  
因此，trans_direct 是一个具有较强区分度的结构型特征。  

In [35]:
spark.sql(
    '''
    select trans_direct,
           `type`,
           count(*) as count,
           mean(isFraud) as fraud_rate
    from transactions_1
    group by trans_direct, `type`
    order by count
    '''
).show()

+------------+--------+-------+--------------------+
|trans_direct|    type|  count|          fraud_rate|
+------------+--------+-------+--------------------+
|         C_C|   DEBIT|  40609|                 0.0|
|         C_C|TRANSFER| 522784|0.006306619942461896|
|         C_C| CASH_IN|1375225|                 0.0|
|         C_M| PAYMENT|2116354|                 0.0|
|         C_C|CASH_OUT|2204075|0.001504486008869934|
+------------+--------+-------+--------------------+



作为探索，再对交易方向（`trans_direct`）与交易类型（`type`）进行交叉，构造交叉特征 `trans_type_cross`，用于捕捉不同交易模式下的欺诈差异。
结果显示，C_C_TRANSFER 和 C_C_CASH_OUT 组合的欺诈率显著高于其他类别，该交叉特征能有效提高模型的风险识别能力。  
在构造交叉特征 `trans_type_cross` 的同时，保留原始特征 `trans_direct` 与 `type`。其中 `trans_direct` 体现账户间的资金流向结构，而 `trans_type_cross` 捕捉具体的方向 × 行为模式差异。

In [36]:
train_copy = train_copy.withColumn('trans_direct_payment', F.concat_ws('_', F.col('trans_direct'), F.col('type')))

In [37]:
train_copy.show(5)

+----+--------+-------+--------------+---------------------+-----------------+------------------+------------------+------------------+------------+--------------------+
|step|    type|isFraud|isFlaggedFraud|newbalanceOrig_isZero|       amount_log| oldbalanceOrg_log|oldbalanceDest_log|newbalanceDest_log|trans_direct|trans_direct_payment|
+----+--------+-------+--------------+---------------------+-----------------+------------------+------------------+------------------+------------+--------------------+
|   1| PAYMENT|      0|             0|                    0|9.194276028581655| 12.04435927383651|               0.0|               0.0|         C_M|         C_M_PAYMENT|
|   1| PAYMENT|      0|             0|                    0|7.531166454857185| 9.964112174352563|               0.0|               0.0|         C_M|         C_M_PAYMENT|
|   1|TRANSFER|      1|             0|                    1|5.204006687076795| 5.204006687076795|               0.0|               0.0|         C_C|  

In [38]:
train_cleaned = preprocess.preprocess(train_df)
test_cleaned = preprocess.preprocess(test_df)

In [39]:
train_cleaned.show(5)

+----+--------+-------+--------------+---------------------+-----------------+------------------+------------------+------------------+---------------------+------------------------------+-----+
|step|    type|isFraud|isFlaggedFraud|newbalanceOrig_isZero|       amount_log| oldbalanceOrg_log|oldbalanceDest_log|newbalanceDest_log|nameOrig__X__nameDest|nameOrig__X__nameDest__X__type|index|
+----+--------+-------+--------------+---------------------+-----------------+------------------+------------------+------------------+---------------------+------------------------------+-----+
|   1| PAYMENT|      0|             0|                    0|9.194276028581655| 12.04435927383651|               0.0|               0.0|                  C_M|                   C_M_PAYMENT|    0|
|   1| PAYMENT|      0|             0|                    0|7.531166454857185| 9.964112174352563|               0.0|               0.0|                  C_M|                   C_M_PAYMENT|    1|
|   1|TRANSFER|      1|  

In [40]:
train_cleaned.write.mode('overwrite').parquet('../data_processed/train_cleaned.parquet')
test_cleaned.write.mode('overwrite').parquet('../data_processed/test_cleaned.parquet')