# Importing libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col,isnan, when, count
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Exploring the dataset

In [2]:
spark = SparkSession.builder\
                    .master('local[*]')\
                    .appName('bank_fraud_detection')\
                    .getOrCreate()

In [3]:
bank_data = spark.read.csv('data/Base.csv', header=True, inferSchema=True, nullValue='NA')
bank_data.printSchema()

root
 |-- fraud_bool: integer (nullable = true)
 |-- income: double (nullable = true)
 |-- name_email_similarity: double (nullable = true)
 |-- prev_address_months_count: integer (nullable = true)
 |-- current_address_months_count: integer (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- days_since_request: double (nullable = true)
 |-- intended_balcon_amount: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- zip_count_4w: integer (nullable = true)
 |-- velocity_6h: double (nullable = true)
 |-- velocity_24h: double (nullable = true)
 |-- velocity_4w: double (nullable = true)
 |-- bank_branch_count_8w: integer (nullable = true)
 |-- date_of_birth_distinct_emails_4w: integer (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- credit_risk_score: integer (nullable = true)
 |-- email_is_free: integer (nullable = true)
 |-- housing_status: string (nullable = true)
 |-- phone_home_valid: integer (nullable = true)
 |-- phone_mobil

In [4]:
bank_data.show(5)

+----------+------+---------------------+-------------------------+----------------------------+------------+--------------------+----------------------+------------+------------+------------------+------------------+------------------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+--------+-------------------------+---------+------------------+-------------------------+------------------+-----+
|fraud_bool|income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|  days_since_request|intended_balcon_amount|payment_type|zip_count_4w|       velocity_6h|      velocity_24h|       velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|email_is_free|housing_status|phone_home_valid|phone_mobile_valid|bank_months_count|has_other_ca

In [5]:
bank_data = bank_data.withColumn("label", bank_data.fraud_bool.cast('float')).drop('fraud_bool')

In [6]:
bank_data = bank_data.dropna()

In [55]:
indexers = StringIndexer(inputCols=['payment_type', 'employment_status', 'housing_status', 'source', 'device_os'], outputCols=['payment_type_index', 'employment_status_index', 'housing_status_index', 'source_index', 'device_os_index'])
bank_data_indexed = indexers.fit(bank_data).transform(bank_data).drop(*['payment_type', 'employment_status', 'housing_status', 'source', 'device_os'])
bank_data_indexed.show(5)

+------+---------------------+-------------------------+----------------------------+------------+--------------------+----------------------+------------+------------------+------------------+------------------+--------------------+--------------------------------+-----------------+-------------+----------------+------------------+-----------------+---------------+---------------------+---------------+-------------------------+------------------+-------------------------+------------------+-----+-----+------------------+-----------------------+--------------------+------------+---------------+
|income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|  days_since_request|intended_balcon_amount|zip_count_4w|       velocity_6h|      velocity_24h|       velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|credit_risk_score|email_is_free|phone_home_valid|phone_mobile_valid|bank_months_count|has_other_cards|proposed_credit_limit|foreign_

In [56]:
assembler = VectorAssembler(inputCols=['income', 'name_email_similarity', 'prev_address_months_count', 'current_address_months_count', 
                                       'customer_age', 'days_since_request', 'intended_balcon_amount', 'zip_count_4w', 'velocity_6h',
                                       'velocity_24h', 'velocity_4w', 'bank_branch_count_8w', 'date_of_birth_distinct_emails_4w', 
                                       'credit_risk_score', 'email_is_free', 'phone_home_valid', 'phone_mobile_valid', 'bank_months_count', 
                                       'has_other_cards', 'proposed_credit_limit', 'foreign_request', 'session_length_in_minutes', 
                                       'keep_alive_session', 'device_distinct_emails_8w', 'device_fraud_count', 'month', 'payment_type_index',
                                       'employment_status_index', 'housing_status_index', 'source_index', 'device_os_index'], outputCol='features')
bank_data_indexed = assembler.transform(bank_data_indexed)
bank_data_indexed.show(5)

+------+---------------------+-------------------------+----------------------------+------------+--------------------+----------------------+------------+------------------+------------------+------------------+--------------------+--------------------------------+-----------------+-------------+----------------+------------------+-----------------+---------------+---------------------+---------------+-------------------------+------------------+-------------------------+------------------+-----+-----+------------------+-----------------------+--------------------+------------+---------------+--------------------+
|income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|  days_since_request|intended_balcon_amount|zip_count_4w|       velocity_6h|      velocity_24h|       velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|credit_risk_score|email_is_free|phone_home_valid|phone_mobile_valid|bank_months_count|has_other_cards|proposed_

In [7]:
bank_data_train, bank_data_test = bank_data_indexed.randomSplit([0.8, 0.2], seed=42)
#bank_data_train.count(), bank_data_test.count()

NameError: name 'bank_data_indexed' is not defined

In [58]:
bank_data_train.show(5)

+------+---------------------+-------------------------+----------------------------+------------+--------------------+----------------------+------------+------------------+------------------+------------------+--------------------+--------------------------------+-----------------+-------------+----------------+------------------+-----------------+---------------+---------------------+---------------+-------------------------+------------------+-------------------------+------------------+-----+-----+------------------+-----------------------+--------------------+------------+---------------+--------------------+
|income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|  days_since_request|intended_balcon_amount|zip_count_4w|       velocity_6h|      velocity_24h|       velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|credit_risk_score|email_is_free|phone_home_valid|phone_mobile_valid|bank_months_count|has_other_cards|proposed_

In [59]:
model = DecisionTreeClassifier().fit(bank_data_train)

In [60]:
predictions = model.transform(bank_data_test)

+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|  1.0|       0.0|  2241|
|  0.0|       0.0|197474|
|  1.0|       1.0|     2|
|  0.0|       1.0|    12|
+-----+----------+------+



In [68]:
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

0.9831811947456215

# Creating a pipeline

In [8]:
spark = SparkSession.builder\
                    .master('local[*]')\
                    .appName('bank_fraud_detection')\
                    .getOrCreate()

bank_data = spark.read.csv('data/Base.csv', header=True, inferSchema=True, nullValue='NA')

bank_data = bank_data.withColumn("label", bank_data.fraud_bool.cast('float')).drop('fraud_bool')

bank_data = bank_data.dropna()

bank_data_train, bank_data_test = bank_data.randomSplit([0.8, 0.2], seed=42)

In [9]:
indexers = StringIndexer(inputCols=['payment_type', 'employment_status', 'housing_status', 'source', 'device_os'], outputCols=['payment_type_index', 'employment_status_index', 'housing_status_index', 'source_index', 'device_os_index'])
assembler = VectorAssembler(inputCols=['income', 'name_email_similarity', 'prev_address_months_count', 'current_address_months_count', 
                                       'customer_age', 'days_since_request', 'intended_balcon_amount', 'zip_count_4w', 'velocity_6h',
                                       'velocity_24h', 'velocity_4w', 'bank_branch_count_8w', 'date_of_birth_distinct_emails_4w', 
                                       'credit_risk_score', 'email_is_free', 'phone_home_valid', 'phone_mobile_valid', 'bank_months_count', 
                                       'has_other_cards', 'proposed_credit_limit', 'foreign_request', 'session_length_in_minutes', 
                                       'keep_alive_session', 'device_distinct_emails_8w', 'device_fraud_count', 'month', 'payment_type_index',
                                       'employment_status_index', 'housing_status_index', 'source_index', 'device_os_index'], outputCol='features')
model = DecisionTreeClassifier()

pipeline = Pipeline(stages=[indexers, assembler, model])

pipeline = pipeline.fit(bank_data_train)

In [11]:
prediction = pipeline.transform(bank_data_test)
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(prediction, {evaluator.metricName: "f1"})

0.9831613483152265