In [1]:
import pyspark

In [2]:
import pyspark.sql.functions as func
from pyspark.sql import SparkSession


In [3]:

import findspark
findspark.init()


from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExample").getOrCreate()


In [4]:
orig_df = spark.read.csv("./PS_20174392719_1491204439457_log.csv", inferSchema = True, header = True)

In [5]:
orig_df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [6]:
orig_df.show(truncate = False, vertical = True)

-RECORD 0---------------------
 step           | 1           
 type           | PAYMENT     
 amount         | 9839.64     
 nameOrig       | C1231006815 
 oldbalanceOrg  | 170136.0    
 newbalanceOrig | 160296.36   
 nameDest       | M1979787155 
 oldbalanceDest | 0.0         
 newbalanceDest | 0.0         
 isFraud        | 0           
 isFlaggedFraud | 0           
-RECORD 1---------------------
 step           | 1           
 type           | PAYMENT     
 amount         | 1864.28     
 nameOrig       | C1666544295 
 oldbalanceOrg  | 21249.0     
 newbalanceOrig | 19384.72    
 nameDest       | M2044282225 
 oldbalanceDest | 0.0         
 newbalanceDest | 0.0         
 isFraud        | 0           
 isFlaggedFraud | 0           
-RECORD 2---------------------
 step           | 1           
 type           | TRANSFER    
 amount         | 181.0       
 nameOrig       | C1305486145 
 oldbalanceOrg  | 181.0       
 newbalanceOrig | 0.0         
 nameDest       | C553264065  
 oldbala

In [7]:
def count_type(df):
    return df.groupBy('type').agg(func.count(func.when(func.col('isFraud') == 0, True)).alias('Not_Fraud'),\
                      func.count(func.when(func.col('isFraud') == 1,True)).alias('Fraud')).show()

In [8]:
orig_df.rdd.isEmpty()

False

In [9]:
(orig_df != 'NULL')

True

In [10]:
orig_df.select([func.count(func.when(func.isnan(c),c)).alias(c) for c in orig_df.columns]).show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



In [11]:
orig_df.select([func.count(func.when(func.col(c).isNull(),c)).alias(c) for c in orig_df.columns]).show()

+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+



In [12]:
orig_df.describe().show(vertical = True)

-RECORD 0------------------------------
 summary        | count                
 step           | 6362620              
 type           | 6362620              
 amount         | 6362620              
 nameOrig       | 6362620              
 oldbalanceOrg  | 6362620              
 newbalanceOrig | 6362620              
 nameDest       | 6362620              
 oldbalanceDest | 6362620              
 newbalanceDest | 6362620              
 isFraud        | 6362620              
 isFlaggedFraud | 6362620              
-RECORD 1------------------------------
 summary        | mean                 
 step           | 243.39724563151657   
 type           | null                 
 amount         | 179861.9035491287    
 nameOrig       | null                 
 oldbalanceOrg  | 833883.1040744764    
 newbalanceOrig | 855113.6685785812    
 nameDest       | null                 
 oldbalanceDest | 1100701.6665196533   
 newbalanceDest | 1224996.3982019224   
 isFraud        | 0.001290820448180152 


In [13]:
median_list = orig_df.approxQuantile(['step','amount','oldbalanceOrg','newbalanceOrig','oldbalanceDest','newbalanceDest'], [0.5], 0.25)
name_list = ['step','amount','oldbalanceOrg','newbalanceOrig','oldbalanceDest','newbalanceDest']
median_tuple = list(zip(name_list, median_list))

In [14]:
median_tuple

[('step', [257.0]),
 ('amount', [73199.38]),
 ('oldbalanceOrg', [14910.0]),
 ('newbalanceOrig', [866.26]),
 ('oldbalanceDest', [157086.55]),
 ('newbalanceDest', [224333.11])]

In [15]:
def shape(df):
    return df.count(), len(df.columns)
shape(orig_df)

(6362620, 11)

In [16]:
df = orig_df

In [17]:
def IQR(feature):
    q1 = df.approxQuantile(feature, [0.25], 0.05)
    q3 = df.approxQuantile(feature, [0.75], 0.05)
    iqr = q3[0] - q1[0]
    rang = 1.5*iqr
    return(q1[0]-rang, q3[0] + rang)

In [18]:
li = ['step','amount','oldbalanceOrg','newbalanceOrig','oldbalanceDest','newbalanceDest']
for i in li:
    lower , upper = IQR(i)
    df = df.filter((func.col(i) > lower) & (func.col(i) < upper))
df.show()    

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|  4024.36|C1265012928|       2671.0|           0.0|M1176932104|           0.0|           0.0|      0|             0|
|   1|   DEBIT|  9644.94|C1900366749|       4465.0|           0.0| C99760839

In [19]:
shape(df)

(2852632, 11)

In [20]:
count_type(df)

+--------+---------+-----+
|    type|Not_Fraud|Fraud|
+--------+---------+-----+
|TRANSFER|   137946| 1133|
| CASH_IN|    10976|    0|
|CASH_OUT|  1271195|  956|
| PAYMENT|  1413759|    0|
|   DEBIT|    16667|    0|
+--------+---------+-----+



In [21]:
count_type(df.filter((df.oldbalanceOrg != 0.0) | (df.newbalanceOrig != 0.0)))

+--------+---------+-----+
|    type|Not_Fraud|Fraud|
+--------+---------+-----+
|TRANSFER|    68261| 1133|
| CASH_IN|    10976|    0|
|CASH_OUT|   718075|  934|
| PAYMENT|   652194|    0|
|   DEBIT|    12245|    0|
+--------+---------+-----+



In [22]:
df = df.filter((df.oldbalanceOrg != 0.0) | (df.newbalanceOrig != 0.0))

In [23]:
df = df.filter((df.type == "TRANSFER") | (df.type == "CASH_OUT"))
df.show()

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1|CASH_OUT|229133.94| C905080434|      15325.0|           0.0| C476402209|        5083.0|      51513.44|      0|             0|
|   1|TRANSFER| 215310.3|C1670993182|        705.0|           0.0|C1100439041|       22425.0|           0.0|      0|             0|
|   1|CASH_OUT|110414.71| C768216420|     26845.41|           0.0|C150951433

In [24]:
shape(df)

(788403, 11)

In [25]:
count_type(df)

+--------+---------+-----+
|    type|Not_Fraud|Fraud|
+--------+---------+-----+
|TRANSFER|    68261| 1133|
|CASH_OUT|   718075|  934|
+--------+---------+-----+



In [26]:
df = df.select('type', 'amount','oldbalanceOrg','newbalanceOrig','isFraud')

In [27]:
df_nf = df.filter(df['isFraud'] == 0)
df_f = df.filter(df['isFraud'] == 1)
nf_count = df_nf.count()
f_count = df_f.count()
ratio = nf_count / f_count
df_overs = df_f.sample(withReplacement = True, fraction = ratio)
df = df_nf.unionAll(df_overs)
df.show(2)

+--------+---------+-------------+--------------+-------+
|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|
+--------+---------+-------------+--------------+-------+
|CASH_OUT|229133.94|      15325.0|           0.0|      0|
|TRANSFER| 215310.3|        705.0|           0.0|      0|
+--------+---------+-------------+--------------+-------+
only showing top 2 rows



In [28]:
count_type(df)

+--------+---------+------+
|    type|Not_Fraud| Fraud|
+--------+---------+------+
|TRANSFER|    68261|431353|
|CASH_OUT|   718075|355757|
+--------+---------+------+



In [29]:
catCols = [x for (x, dataType) in df.dtypes if dataType == "string"]
numCols = [x for (x, dataType) in df.dtypes if dataType == "double"]
numCols, catCols

(['amount', 'oldbalanceOrg', 'newbalanceOrig'], ['type'])

In [30]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [31]:
stages = []
for i in catCols:
    indexer = StringIndexer(inputCol= i, outputCol= i + "_Index")
    encoder = OneHotEncoder(inputCols= [indexer.getOutputCol()],\
                outputCols=[i + "_classVec"])
    stages += [indexer, encoder]
    assemblerInputs = [c + "_classVec" for c in catCols] + numCols  
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages += [assembler]

In [32]:
from pyspark.ml import Pipeline

In [33]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['features'] + df.columns
df = df.select(selectedCols)
df.show(5)

+--------------------+--------+---------+-------------+--------------+-------+----------+-------------+--------------------+
|            features|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_Index|type_classVec|            features|
+--------------------+--------+---------+-------------+--------------+-------+----------+-------------+--------------------+
|[1.0,229133.94,15...|CASH_OUT|229133.94|      15325.0|           0.0|      0|       0.0|(1,[0],[1.0])|[1.0,229133.94,15...|
|[0.0,215310.3,705...|TRANSFER| 215310.3|        705.0|           0.0|      0|       1.0|    (1,[],[])|[0.0,215310.3,705...|
|[1.0,110414.71,26...|CASH_OUT|110414.71|     26845.41|           0.0|      0|       0.0|(1,[0],[1.0])|[1.0,110414.71,26...|
|[1.0,56953.9,1942...|CASH_OUT|  56953.9|      1942.02|           0.0|      0|       0.0|(1,[0],[1.0])|[1.0,56953.9,1942...|
|[1.0,23261.3,2041...|CASH_OUT|  23261.3|     20411.53|           0.0|      0|       0.0|(1,[0],[1.0])|[1.0,23261.3,2041...|


In [34]:
from pyspark.ml.feature import StandardScaler

In [35]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)


scalerModel = scaler.fit(df)


scaledData = scalerModel.transform(df)
scaledData.show()

+--------------------+--------+---------+-------------+--------------+-------+----------+-------------+--------------------+--------------------+
|            features|    type|   amount|oldbalanceOrg|newbalanceOrig|isFraud|type_Index|type_classVec|            features|      scaledFeatures|
+--------------------+--------+---------+-------------+--------------+-------+----------+-------------+--------------------+--------------------+
|[1.0,229133.94,15...|CASH_OUT|229133.94|      15325.0|           0.0|      0|       0.0|(1,[0],[1.0])|[1.0,229133.94,15...|[2.14815806388665...|
|[0.0,215310.3,705...|TRANSFER| 215310.3|        705.0|           0.0|      0|       1.0|    (1,[],[])|[0.0,215310.3,705...|[0.0,2.1049897565...|
|[1.0,110414.71,26...|CASH_OUT|110414.71|     26845.41|           0.0|      0|       0.0|(1,[0],[1.0])|[1.0,110414.71,26...|[2.14815806388665...|
|[1.0,56953.9,1942...|CASH_OUT|  56953.9|      1942.02|           0.0|      0|       0.0|(1,[0],[1.0])|[1.0,56953.9,1942...|

In [36]:
train , test = scaledData.randomSplit([0.8, 0.2])
print("There are %d training examples and %d test examples." % (train.count(), test.count()))

There are 1258652 training examples and 314794 test examples.


In [37]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [38]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [39]:

lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="isFraud", maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(train)
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))


Coefficients: [-0.024540749962310114,0.0,0.0,0.0]
Intercept: 0.03799101834054362


In [40]:
trainingSummary = lrModel.summary
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

+------------------+------------------+
|               FPR|               TPR|
+------------------+------------------+
|               0.0|               0.0|
|0.0871023054434169|0.5483311797779563|
|               1.0|               1.0|
|               1.0|               1.0|
+------------------+------------------+

areaUnderROC: 0.7306144371672697


In [42]:
prediction =  lrModel.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="isFraud", rawPredictionCol="prediction",
                                              metricName="areaUnderROC")
areaUnderROC = evaluator.evaluate(prediction)
print("Test set areaUnderROC = " + str(areaUnderROC))

Test set areaUnderROC = 0.7305698384010598


In [43]:
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

False positive rate by label:
label 0: 0.45166882022204374
label 1: 0.0871023054434169
True positive rate by label:
label 0: 0.9128976945565831
label 1: 0.5483311797779563
Precision by label:
label 0: 0.6685514326784412
label 1: 0.8631649966266024
Recall by label:
label 0: 0.9128976945565831
label 1: 0.5483311797779563
F-measure by label:
label 0: 0.771847859123638
label 1: 0.670636291979079
Accuracy: 0.7304290622030554
FPR: 0.26920018786851596
TPR: 0.7304290622030554
F-measure: 0.7211906114429533
Precision: 0.7659571718550899
Recall: 0.7304290622030554


In [44]:
prediction.select("prediction", "isFraud", "features").show(5)

+----------+-------+--------------------+
|prediction|isFraud|            features|
+----------+-------+--------------------+
|       1.0|      0|[0.0,158.78,10047...|
|       1.0|      0|[0.0,159.55,7249....|
|       1.0|      0|[0.0,1012.04,5033...|
|       1.0|      0|[0.0,1648.67,232....|
|       1.0|      0|[0.0,1663.71,1954...|
+----------+-------+--------------------+
only showing top 5 rows



In [45]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="isFraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print("Accuracy = ", accuracy)
print("Test Error = %g" % (1.0 - accuracy))

Accuracy =  0.730865264268061
Test Error = 0.269135


In [46]:

def model_(model):
    model_ = model.fit(train)
    predictions = model_.transform(test)
    evaluator = MulticlassClassificationEvaluator(
    labelCol="isFraud", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    
    print("Accuracy = ", accuracy)
    print("Test Error = %g" % (1.0 - accuracy))
  
    metric_list= ["truePositiveRateByLabel","falsePositiveRateByLabel","precisionByLabel" ,"recallByLabel"]
    for i in metric_list:
        print(i+'_1 = ',evaluator.evaluate(predictions, {evaluator.metricName: i, evaluator.metricLabel: 1.0}))
        print(i+'_0 = ',evaluator.evaluate(predictions, {evaluator.metricName: i, evaluator.metricLabel: 0.0}))
     
    print(predictions.select("prediction", "isFraud", "features").show(5))

In [47]:
dt = DecisionTreeClassifier(labelCol="isFraud", featuresCol="scaledFeatures")
model_(dt)


Accuracy =  0.9031588912113954
Test Error = 0.0968411
truePositiveRateByLabel_1 =  0.9123606373771828
truePositiveRateByLabel_0 =  0.8939866793529971
falsePositiveRateByLabel_1 =  0.10601332064700285
falsePositiveRateByLabel_0 =  0.08763936262281728
precisionByLabel_1 =  0.8955992129181373
precisionByLabel_0 =  0.9109812615943481
recallByLabel_1 =  0.9123606373771828
recallByLabel_0 =  0.8939866793529971
+----------+-------+--------------------+
|prediction|isFraud|            features|
+----------+-------+--------------------+
|       0.0|      0|[0.0,158.78,10047...|
|       0.0|      0|[0.0,159.55,7249....|
|       0.0|      0|[0.0,1012.04,5033...|
|       1.0|      0|[0.0,1648.67,232....|
|       0.0|      0|[0.0,1663.71,1954...|
+----------+-------+--------------------+
only showing top 5 rows

None


In [48]:
rf = RandomForestClassifier(labelCol="isFraud", featuresCol="scaledFeatures", numTrees=10)
model_(rf)

Accuracy =  0.9521401297356367
Test Error = 0.0478599
truePositiveRateByLabel_1 =  0.9851982894669857
truePositiveRateByLabel_0 =  0.9191880748493498
falsePositiveRateByLabel_1 =  0.08081192515065018
falsePositiveRateByLabel_0 =  0.014801710533014305
precisionByLabel_1 =  0.9239666264815766
precisionByLabel_0 =  0.9842022331495015
recallByLabel_1 =  0.9851982894669857
recallByLabel_0 =  0.9191880748493498
+----------+-------+--------------------+
|prediction|isFraud|            features|
+----------+-------+--------------------+
|       0.0|      0|[0.0,158.78,10047...|
|       0.0|      0|[0.0,159.55,7249....|
|       0.0|      0|[0.0,1012.04,5033...|
|       1.0|      0|[0.0,1648.67,232....|
|       0.0|      0|[0.0,1663.71,1954...|
+----------+-------+--------------------+
only showing top 5 rows

None


In [49]:
lsvc = LinearSVC(featuresCol="scaledFeatures", labelCol="isFraud",maxIter=10, regParam=0.1)
model_(lsvc)

Accuracy =  0.9348176902990527
Test Error = 0.0651823
truePositiveRateByLabel_1 =  0.9995609122842743
truePositiveRateByLabel_0 =  0.8702822708531557
falsePositiveRateByLabel_1 =  0.12971772914684426
falsePositiveRateByLabel_0 =  0.0004390877157257038
precisionByLabel_1 =  0.8848049570483031
precisionByLabel_0 =  0.9994973373449213
recallByLabel_1 =  0.9995609122842743
recallByLabel_0 =  0.8702822708531557
+----------+-------+--------------------+
|prediction|isFraud|            features|
+----------+-------+--------------------+
|       0.0|      0|[0.0,158.78,10047...|
|       0.0|      0|[0.0,159.55,7249....|
|       1.0|      0|[0.0,1012.04,5033...|
|       1.0|      0|[0.0,1648.67,232....|
|       0.0|      0|[0.0,1663.71,1954...|
+----------+-------+--------------------+
only showing top 5 rows

None
