In [1]:
import pyspark.sql.types as typ

labels = [
    ('step', typ.IntegerType()),
    ('type', typ.StringType()),
    ('amount', typ.FloatType()),
    ('nameOrig', typ.StringType()),
    ('oldbalanceOrg', typ.FloatType()),
    ('newbalanceOrig', typ.FloatType()),
    ('nameDest', typ.StringType()),
    ('oldbalanceDest', typ.FloatType()),
    ('newbalanceDest', typ.FloatType()),
    ('isFraud', typ.IntegerType()),
    ('isFlaggedFraud', typ.IntegerType())
]

schema = typ.StructType([
    typ.StructField(e[0], e[1], True) for e in labels
])

df = spark.read.csv('data/PS_20174392719_1491204439457_log.csv.gz',
                    header=True,
                    schema=schema)

In [2]:
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [3]:
import pandas as pd
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import xgboost as xgb

In [4]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [6]:
%%time
import pyspark.sql.functions as fn
df = spark.read.csv('data/PS_20174392719_1491204439457_log.csv.gz',
                    header=True,
                    schema=schema)
df_nan = df.select([fn.count(fn.when(fn.isnan(c), c)).alias(c) for c in df.columns])
df_nan.show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+

CPU times: user 29.6 ms, sys: 6.31 ms, total: 35.9 ms
Wall time: 34.5 s


In [14]:
%%time
import multiprocessing

num_cpu = multiprocessing.cpu_count()
df = spark.read.csv('data/PS_20174392719_1491204439457_log.csv.gz',
                    header=True,
                    schema=schema)
df = df.repartition(4 * num_cpu)
df_nan = df.select([fn.count(fn.when(fn.isnan(c), c)).alias(c) for c in df.columns])

df_nan.show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+

CPU times: user 16.3 ms, sys: 1.56 ms, total: 17.9 ms
Wall time: 24.2 s


In [15]:
df.select(["type"]).distinct().show()

+--------+
|    type|
+--------+
|TRANSFER|
| CASH_IN|
|CASH_OUT|
| PAYMENT|
|   DEBIT|
+--------+



In [16]:
df_fraud = df.filter("isFraud = 1")
df_fraud.select(["type"]).distinct().show()

+--------+
|    type|
+--------+
|TRANSFER|
|CASH_OUT|
+--------+



In [17]:
num_transfer = df_fraud.filter("type = 'TRANSFER'").count()
num_cashout = df_fraud.filter("type = 'CASH_OUT'").count()
print(num_transfer, num_cashout)

4097 4116


In [109]:
X = df.where((fn.col("type") == 'TRANSFER') | (fn.col("type") == 'CASH_OUT'))
X = X.select([c for c in X.columns if c not in ['nameOrig', 'nameDest', 'isFlaggedFraud']])


map_dict = {"TRANSFER": 0, "CASH_OUT": 1}
def type_map(type_):
    return map_dict[type_]
    
type_map_int = fn.udf(type_map, typ.IntegerType())

X = X.select(X.columns + [type_map_int("type").alias("type_integer"),])

In [46]:
X.select("type_integer").distinct().show()

+------------+
|type_integer|
+------------+
|           1|
|           0|
+------------+



In [49]:
Xfraud = X.where("isFraud = 1")
Xnonfraud = X.where("isFraud = 0")

In [57]:
Xfraud.where((fn.col("oldBalanceDest") == 0) &\
             (fn.col("newBalanceDest") == 0) &\
             (fn.col("amount") != 0))\
    .count()\
    / Xfraud.count()

0.4955558261293072

In [58]:
Xnonfraud.where((fn.col("oldBalanceDest") == 0) &\
             (fn.col("newBalanceDest") == 0) &\
             (fn.col("amount") != 0))\
    .count()\
    / Xnonfraud.count()

0.0006176245277308345

In [110]:
def make_flag(x, y, z):
    if x == 0 and y == 0 and z != 0:
        return -1.
    else:
        return x
make_flag = fn.udf(make_flag, typ.FloatType())

In [111]:
rep_col = ['oldbalanceDest', 'newbalanceDest', "type"]
cols = [c for c in X.columns if c not in rep_col]\
    + [make_flag('oldbalanceDest', 'newbalanceDest', "amount").alias("oldBalanceDest"),]\
    + [make_flag('newbalanceDest', 'oldbalanceDest', "amount").alias("newBalanceDest"),]

X = X.select(cols)

In [112]:
X.columns

['step',
 'amount',
 'oldbalanceOrg',
 'newbalanceOrig',
 'isFraud',
 'type_integer',
 'oldBalanceDest',
 'newBalanceDest']

In [113]:
import numpy as np

def make_nan(x, y, z):
    if x == 0 and y == 0 and z != 0:
        return np.nan
    else:
        return x
make_nan = fn.udf(make_nan, typ.FloatType())

In [114]:
rep_col = ['oldbalanceOrg', 'newbalanceOrig']
cols = [c for c in X.columns if c not in rep_col]\
    + [make_nan('oldbalanceOrg', 'newbalanceOrig', "amount").alias("oldBalanceOrig")]\
    + [make_nan('newbalanceOrig', 'oldbalanceOrg', "amount").alias("newBalanceOrig")]
    
X = X.select(cols)

In [115]:
X.show()

+----+---------+-------+------------+--------------+--------------+--------------+--------------+
|step|   amount|isFraud|type_integer|oldBalanceDest|newBalanceDest|oldBalanceOrig|newBalanceOrig|
+----+---------+-------+------------+--------------+--------------+--------------+--------------+
|   1|    181.0|      1|           0|          -1.0|          -1.0|         181.0|           0.0|
|   1|    181.0|      1|           1|       21182.0|           0.0|         181.0|           0.0|
|   1|229133.94|      0|           1|        5083.0|      51513.44|       15325.0|           0.0|
|   1| 215310.3|      0|           0|       22425.0|           0.0|         705.0|           0.0|
|   1|311685.88|      0|           0|        6267.0|     2719173.0|       10835.0|           0.0|
|   1|110414.71|      0|           1|      288800.0|       2415.16|      26845.41|           0.0|
|   1|  56953.9|      0|           1|       70253.0|      64106.18|       1942.02|           0.0|
|   1|  5346.89|    

[new columns](https://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark/33683462)

In [118]:
X = X.withColumn("errorBalanceOrig",  X.newBalanceOrig + X.amount - X.oldBalanceOrig)
X = X.withColumn("errorBalanceDest",  X.newBalanceDest + X.amount - X.oldBalanceDest)

[Build Estimator](https://spark.apache.org/docs/preview/ml-guide.html)