In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf,lit
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
import pickle
import pandas as pd
import numpy as np
from transform import *

In [3]:
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName('Spark-201-project') \
                    .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Read train and test data

In [4]:
%%time
train_df = spark.read.csv('data/fraudTrain.csv', header=True)
test_df = spark.read.csv('data/fraudTest.csv', header=True)

CPU times: user 2.84 ms, sys: 4 ms, total: 6.84 ms
Wall time: 3.49 s


### Transform data using transformation pipeline

In [5]:
get_age_fn = get_age_from_dob(input_col="dob", output_col="age")
get_time_fn = get_time(input_col="trans_date_trans_time", output_col="time")
amt_int_fn = cast_int(input_col="amt",output_col="amt")
city_pop_int_fn = cast_int(input_col="city_pop",output_col="city_pop")
is_fraud_int_fn = cast_int(input_col="is_fraud",output_col="is_fraud")

transformer_pipeline = Pipeline(stages=[get_age_fn,get_time_fn,amt_int_fn,city_pop_int_fn,is_fraud_int_fn])

In [6]:
%%time
transformer_model = transformer_pipeline.fit(train_df)
train_df = transformer_model.transform(train_df)
test_df = transformer_model.transform(test_df)

CPU times: user 27.2 ms, sys: 40.1 ms, total: 67.2 ms
Wall time: 298 ms


In [7]:
train_data = train_df.select(col("time"), col("amt"), col("city_pop"), col("age"), col("is_fraud"))
test_data = test_df.select(col("time"), col("amt"), col("city_pop"), col("age"), col("is_fraud"))

In [8]:
train_data.show()
train_data.printSchema()

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

+----+---+--------+---+--------+
|time|amt|city_pop|age|is_fraud|
+----+---+--------+---+--------+
|   0|  4|    3495| 33|       0|
|   0|107|     149| 43|       0|
|   0|220|    4154| 59|       0|
|   0| 45|    1939| 54|       0|
|   0| 41|      99| 35|       0|
|   0| 94|    2158| 60|       0|
|   0| 44|    2691| 28|       0|
|   0| 71|    6018| 74|       0|
|   0|  4|    1472| 80|       0|
|   0|198|  151785| 47|       0|
|   0| 24|    7297| 31|       0|
|   0|  7|    1925| 55|       0|
|   0| 71|  341043| 32|       0|
|   0| 96|     589| 76|       0|
|   0|  7|     899| 54|       0|
|   0|  3|    4664| 56|       0|
|   0|327|    1078| 69|       0|
|   0|341|    4081| 83|       0|
|   0| 63|    2518| 75|       0|
|   0| 44|  124967| 41|       0|
+----+---+--------+---+--------+
only showing top 20 rows

root
 |-- time: integer (nullable = true)
 |-- amt: integer (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- is_fraud: integer (nul

## PySpark Model

### Create features for training model

In [9]:
feature_columns = ['time', 'amt', 'city_pop', 'age']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

### Train

In [10]:
rf = RandomForestClassifier(maxDepth=3,numTrees=10,labelCol="is_fraud",featuresCol='features', seed=6)

In [11]:
%%time
rf_model_spark = rf.fit(train_data)

                                                                                

CPU times: user 31.9 ms, sys: 51.1 ms, total: 83 ms
Wall time: 14.1 s


### Predictions

In [12]:
%%time
predictions = rf_model_spark.transform(test_data)

CPU times: user 10 ms, sys: 14.5 ms, total: 24.5 ms
Wall time: 122 ms


In [13]:
%%time
correct = predictions.where(col('prediction')==col('is_fraud')).count()
total = predictions.count()

print("Accuracy: ", correct/total)

                                                                                

Accuracy:  0.9961401355721147
CPU times: user 8.63 ms, sys: 19 ms, total: 27.6 ms
Wall time: 2.16 s


## Load and broadcast a previously trained RF model

In [14]:
model_data = sc.binaryFiles("rf_model.pkl").collect()
rf_model = pickle.loads(model_data[0][1])
rf_model_bc = sc.broadcast(rf_model)

def predict_fn(*cols):
    cols = np.array(cols).reshape((1,-1))
    prediction = rf_model_bc.value.predict_proba((cols))
    return int( float(prediction[0,1]) > 0.5 )

predict = udf(predict_fn, IntegerType())

### Predictions

In [15]:
%%time
df = test_data.withColumn("score", predict(*feature_columns))
df.show()

[Stage 23:>                                                         (0 + 1) / 1]

+----+---+--------+---+--------+--------------------+-----+
|time|amt|city_pop|age|is_fraud|            features|score|
+----+---+--------+---+--------+--------------------+-----+
|  12|  2|  333497| 53|       0|[12.0,2.0,333497....|    0|
|  12| 29|     302| 31|       0|[12.0,29.0,302.0,...|    0|
|  12| 41|   34496| 51|       0|[12.0,41.0,34496....|    0|
|  12| 60|   54767| 34|       0|[12.0,60.0,54767....|    0|
|  12|  3|    1126| 66|       0|[12.0,3.0,1126.0,...|    0|
|  12| 19|     520| 30|       0|[12.0,19.0,520.0,...|    0|
|  12|133|    1139| 70|       0|[12.0,133.0,1139....|    0|
|  12| 10|     343| 49|       0|[12.0,10.0,343.0,...|    0|
|  12|  4|    3688| 48|       0|[12.0,4.0,3688.0,...|    0|
|  12| 66|     263| 65|       0|[12.0,66.0,263.0,...|    0|
|  12|  7|     564| 25|       0|[12.0,7.0,564.0,2...|    0|
|  12| 42|    1645| 45|       0|[12.0,42.0,1645.0...|    0|
|  12|  2|   26551| 44|       0|[12.0,2.0,26551.0...|    0|
|  12|  7|    2258| 84|       0|[12.0,7.

                                                                                

In [16]:
%%time
correct = df.filter(col('score')==col('is_fraud')).count()
total = df.count()

print(correct/total)

                                                                                

0.9970524671641603
CPU times: user 14.3 ms, sys: 13.6 ms, total: 28 ms
Wall time: 1min 43s


## Caching


In [17]:
train_data.cache()
test_data.cache()

DataFrame[time: int, amt: int, city_pop: int, age: int, is_fraud: int, features: vector]

### Re-train Pyspark model

In [18]:
%%time
rf_model_spark = rf.fit(train_data)

                                                                                

CPU times: user 27.1 ms, sys: 50.2 ms, total: 77.3 ms
Wall time: 5.73 s


### Predictions

In [19]:
%%time
predictions = rf_model_spark.transform(test_data)

CPU times: user 8.48 ms, sys: 13.1 ms, total: 21.6 ms
Wall time: 64.4 ms


In [20]:
%%time
correct = predictions.where(col('prediction')==col('is_fraud')).count()
total = predictions.count()

print("Accuracy: ", correct/total)

[Stage 43:===>                                                    (1 + 15) / 16]

Accuracy:  0.9961401355721147
CPU times: user 11.4 ms, sys: 27.4 ms, total: 38.8 ms
Wall time: 1.99 s


                                                                                

### Predictions for broadcasted model

In [21]:
%%time
df = test_data.withColumn("score", predict(*feature_columns))
df.show()

[Stage 49:>                                                         (0 + 1) / 1]

+----+---+--------+---+--------+--------------------+-----+
|time|amt|city_pop|age|is_fraud|            features|score|
+----+---+--------+---+--------+--------------------+-----+
|  12|  2|  333497| 53|       0|[12.0,2.0,333497....|    0|
|  12| 29|     302| 31|       0|[12.0,29.0,302.0,...|    0|
|  12| 41|   34496| 51|       0|[12.0,41.0,34496....|    0|
|  12| 60|   54767| 34|       0|[12.0,60.0,54767....|    0|
|  12|  3|    1126| 66|       0|[12.0,3.0,1126.0,...|    0|
|  12| 19|     520| 30|       0|[12.0,19.0,520.0,...|    0|
|  12|133|    1139| 70|       0|[12.0,133.0,1139....|    0|
|  12| 10|     343| 49|       0|[12.0,10.0,343.0,...|    0|
|  12|  4|    3688| 48|       0|[12.0,4.0,3688.0,...|    0|
|  12| 66|     263| 65|       0|[12.0,66.0,263.0,...|    0|
|  12|  7|     564| 25|       0|[12.0,7.0,564.0,2...|    0|
|  12| 42|    1645| 45|       0|[12.0,42.0,1645.0...|    0|
|  12|  2|   26551| 44|       0|[12.0,2.0,26551.0...|    0|
|  12|  7|    2258| 84|       0|[12.0,7.

                                                                                

In [22]:
%%time
correct = df.filter(col('score')==col('is_fraud')).count()
total = df.count()

print(correct/total)



0.9970524671641603
CPU times: user 17.3 ms, sys: 26.2 ms, total: 43.5 ms
Wall time: 1min 33s


                                                                                

### Caching the previously trained model and making predictions on cached data

In [23]:
model_rdd_pkl = sc.binaryFiles("rf_model.pkl")
model_rdd_pkl.cache()
model_rdd_data = model_rdd_pkl.collect()
creditcardfrauddetection_model = pickle.loads(model_rdd_data[0][1])
broadcast_creditcardfrauddetection_model = sc.broadcast(creditcardfrauddetection_model)

def predict_fn(*cols):
    cols = np.array(cols).reshape((1,-1))
    prediction = broadcast_creditcardfrauddetection_model.value.predict_proba((cols))
    return int( float(prediction[0,1]) > 0.5 )

predict = udf(predict_fn, IntegerType())

In [24]:
%%time
df = test_data.withColumn("score", predict(*feature_columns))
df.show()

[Stage 57:>                                                         (0 + 1) / 1]

+----+---+--------+---+--------+--------------------+-----+
|time|amt|city_pop|age|is_fraud|            features|score|
+----+---+--------+---+--------+--------------------+-----+
|  12|  2|  333497| 53|       0|[12.0,2.0,333497....|    0|
|  12| 29|     302| 31|       0|[12.0,29.0,302.0,...|    0|
|  12| 41|   34496| 51|       0|[12.0,41.0,34496....|    0|
|  12| 60|   54767| 34|       0|[12.0,60.0,54767....|    0|
|  12|  3|    1126| 66|       0|[12.0,3.0,1126.0,...|    0|
|  12| 19|     520| 30|       0|[12.0,19.0,520.0,...|    0|
|  12|133|    1139| 70|       0|[12.0,133.0,1139....|    0|
|  12| 10|     343| 49|       0|[12.0,10.0,343.0,...|    0|
|  12|  4|    3688| 48|       0|[12.0,4.0,3688.0,...|    0|
|  12| 66|     263| 65|       0|[12.0,66.0,263.0,...|    0|
|  12|  7|     564| 25|       0|[12.0,7.0,564.0,2...|    0|
|  12| 42|    1645| 45|       0|[12.0,42.0,1645.0...|    0|
|  12|  2|   26551| 44|       0|[12.0,2.0,26551.0...|    0|
|  12|  7|    2258| 84|       0|[12.0,7.

                                                                                

In [25]:
%%time
correct = df.filter(col('score')==col('is_fraud')).count()
total = df.count()

print(correct/total)



0.9970524671641603
CPU times: user 14.2 ms, sys: 15.5 ms, total: 29.7 ms
Wall time: 1min 40s


                                                                                

In [26]:
train_data.unpersist()
test_data.unpersist()

DataFrame[time: int, amt: int, city_pop: int, age: int, is_fraud: int, features: vector]