In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import rand

import pyspark.sql.types as tp

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("CC Fraud Detection via GNB") \
    .getOrCreate()

25/02/17 18:55:23 WARN Utils: Your hostname, Tyrens-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.68.108 instead (on interface en0)
25/02/17 18:55:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/17 18:55:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Schema to change datatype to float to take less memory

In [3]:
my_schema = tp.StructType([
    tp.StructField(name = "distance_from_home", dataType=tp.FloatType(), nullable=True),
    tp.StructField(name = "distance_from_last_transaction", dataType=tp.FloatType(), nullable=True),
    tp.StructField(name = "ratio_to_median_purchase_price", dataType=tp.FloatType(), nullable=True),
    tp.StructField(name = "repeat_retailer", dataType=tp.FloatType(), nullable=True),
    tp.StructField(name = "used_chip", dataType=tp.FloatType(), nullable=True),
    tp.StructField(name = "used_pin_number", dataType=tp.FloatType(), nullable=True),
    tp.StructField(name = "online_order", dataType=tp.FloatType(), nullable=True),
    tp.StructField(name = "fraud", dataType=tp.FloatType(), nullable=True)
])

In [4]:
df = spark.read.csv('card_transdata.csv',schema= my_schema,header= True)
df.printSchema()
df.show(1)
print("# of columns:",len(df.columns))
print("# of rows:", df.count())

root
 |-- distance_from_home: float (nullable = true)
 |-- distance_from_last_transaction: float (nullable = true)
 |-- ratio_to_median_purchase_price: float (nullable = true)
 |-- repeat_retailer: float (nullable = true)
 |-- used_chip: float (nullable = true)
 |-- used_pin_number: float (nullable = true)
 |-- online_order: float (nullable = true)
 |-- fraud: float (nullable = true)

+------------------+------------------------------+------------------------------+---------------+---------+---------------+------------+-----+
|distance_from_home|distance_from_last_transaction|ratio_to_median_purchase_price|repeat_retailer|used_chip|used_pin_number|online_order|fraud|
+------------------+------------------------------+------------------------------+---------------+---------+---------------+------------+-----+
|         57.877857|                       0.31114|                       1.94594|            1.0|      1.0|            0.0|         0.0|  0.0|
+------------------+----------------

In [5]:
df.createOrReplaceTempView("card_data")

Check if there are any rows with NULL values

In [6]:
query = "SELECT * \
         FROM card_data \
         WHERE distance_from_home IS NULL \
            OR distance_from_last_transaction IS NULL \
            OR ratio_to_median_purchase_price IS NULL \
            OR repeat_retailer IS NULL \
            OR used_chip IS NULL \
            OR used_pin_number IS NULL \
            OR online_order IS NULL"
missing_data = spark.sql(query)

missing_data.show()

+------------------+------------------------------+------------------------------+---------------+---------+---------------+------------+-----+
|distance_from_home|distance_from_last_transaction|ratio_to_median_purchase_price|repeat_retailer|used_chip|used_pin_number|online_order|fraud|
+------------------+------------------------------+------------------------------+---------------+---------+---------------+------------+-----+
+------------------+------------------------------+------------------------------+---------------+---------+---------------+------------+-----+



Some explanatory data analysis

In [7]:
query = "SELECT * \
         FROM card_data \
         WHERE fraud = 1.0"

print("# of rows with fraud", spark.sql(query).count())
print(f"Fraud percentage: {spark.sql(query).count()/df.count()*100}%")

query = "SELECT * \
         FROM card_data \
         WHERE fraud = 0.0"
print("# of rows without fraud:",spark.sql(query).count())

query = "SELECT * \
         FROM card_data \
         WHERE fraud = 1.0 \
         AND online_order = 1.0"
print("# of rows with fraud and is an online order:",spark.sql(query).count())

query = "SELECT * \
         FROM card_data \
         WHERE fraud = 1.0 \
         AND used_chip = 1.0"
print("# of rows with fraud and chip is used:",spark.sql(query).count())

query = "SELECT * \
         FROM card_data \
         WHERE fraud = 1.0 \
         AND repeat_retailer = 1.0"
print("# of rows with fraud and is a repeated retailer:",spark.sql(query).count())

# of rows with fraud 87403
Fraud percentage: 8.7403%
# of rows without fraud: 912597
# of rows with fraud and is an online order: 82711
# of rows with fraud and chip is used: 22410
# of rows with fraud and is a repeated retailer: 76925


Shuffle and split dataset

In [9]:
df = df.orderBy(rand(seed = 42))
train_data, test_data = df.randomSplit([.8, .2], seed=42)
print("Total # of rows:", df.count())
print("Training # of rows:", train_data.count())
print("Testing # of rows:", test_data.count())

Total # of rows: 1000000


                                                                                

Training # of rows: 800221


                                                                                

Testing # of rows: 199706


Fraud column is the label col, while the rest are feature cols
PySpark ML algorithms expect input data in a single vector 

In [10]:
feature = ["distance_from_home", "distance_from_last_transaction",
                "ratio_to_median_purchase_price", "repeat_retailer",
                "used_chip", "used_pin_number", "online_order"]
vectorAssembler = VectorAssembler(inputCols=feature, outputCol="features")

Training data and testing data into a single vector

In [11]:
train_data = vectorAssembler.transform(train_data).select("features", "fraud")
test_data = vectorAssembler.transform(test_data).select("features", "fraud")


In [12]:
sample_row = train_data.select("features", "fraud").first()
print(sample_row)
sample_row = test_data.select("features", "fraud").first()
print(sample_row)

                                                                                

Row(features=DenseVector([0.0049, 0.1981, 0.9981, 0.0, 0.0, 0.0, 1.0]), fraud=0.0)


                                                                                

Row(features=DenseVector([0.0314, 6.0155, 2.5488, 0.0, 0.0, 0.0, 1.0]), fraud=0.0)


Train Model

In [13]:
gnb = NaiveBayes(featuresCol="features", labelCol="fraud")
gnb_model = gnb.fit(train_data)


                                                                                

In [14]:
train_pred = gnb_model.transform(train_data)
test_pred = gnb_model.transform(test_data)

Accuracy

In [15]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="fraud", \
    predictionCol="prediction", \
    metricName="accuracy"
)

train_acc = evaluator.evaluate(train_pred)
test_acc = evaluator.evaluate(test_pred)

print(f"Train Accuracy: {train_acc}")
print(f"Test Accuracy: {test_acc}")

25/02/17 18:57:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
[Stage 133:>                                                        (0 + 8) / 8]

Train Accuracy: 0.9200651305422505
Test Accuracy: 0.9206018680735617


                                                                                

In [16]:
spark.stop()