In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql.functions import col, substring, round

In [3]:
def calculate_distance(lat1, lon1, lat2, lon2):
    return ((lat2 - lat1)**2 + (lon2 - lon1)**2)**1/2

In [4]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder\
        .appName('fraud_detection')\
        .config("spark.jars", "E:\spark-3.5.0-bin-hadoop3\jars\mysql-connector-j-8.1.0.jar")\
        .getOrCreate()

# Model training

In [5]:
# Read from MySQL Table
transDF = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/creditcard") \
    .option("dbtable", "transaction") \
    .option("user", "root") \
    .option("password", "khang") \
    .load()

In [6]:
transDF.show(5)

+----------------+--------------------+-------------------+-------------------+--------------+---+--------------------+---------+----------+---+--------+--------+
|          cc_num|           trans_num|         trans_date|         trans_time|      category|amt|            merchant|merch_lat|merch_long|age|distance|is_fraud|
+----------------+--------------------+-------------------+-------------------+--------------+---+--------------------+---------+----------+---+--------+--------+
|4641003399120410|a2de9ff6c581157f6...|2016-01-05 01:30:00|2024-01-23 20:54:01|      misc_pos|140|            Wiza LLC|42.168126|-92.149976| 39|  1.5E-4|       0|
|4738555317386146|6938e448d7a51744c...|2012-01-16 01:30:00|2024-01-23 14:46:57|      misc_pos|230|         Block Group|34.899095|-87.758065| 30|  3.0E-4|       0|
|    675985166411|7aa4ecc3c69a223d4...|2016-07-31 01:30:00|2024-01-23 22:13:18|health_fitness| 58|          Heller PLC|44.771892|-75.124947| 41|  1.4E-4|       0|
|5264302655249852|738f

In [7]:
testDF = spark.read.format("csv").load("./test.csv", header='True')

In [8]:
testDF.show(5)

+----------------+---------+------+--------------------+--------------------+----------+----------+-------------+--------------------+---+---------+-----------+--------+
|          cc_num|    first|  last|           trans_num|          trans_date|trans_time| unix_time|     category|            merchant|amt|merch_lat| merch_long|is_fraud|
+----------------+---------+------+--------------------+--------------------+----------+----------+-------------+--------------------+---+---------+-----------+--------+
|5157595343543285|Stephanie|Martin|65685514ff0d1c8c1...|2016-12-25T00:00:...|  19:44:51|1482695091|gas_transport|Schaefer, Maggio ...| 75|27.416029| -98.836205|       0|
|   4483018920250|    Helen|Strong|a6563d05c87fb4e4a...|2017-01-11T00:00:...|  21:13:34|1484169214|  food_dining|Lesch, DAmore and...| 56|36.954036|-108.192545|       0|
|4361646620879135| Kimberly|Hudson|19b76c72c860aa2a5...|2016-12-01T00:00:...|  23:16:52|1480634212|         home|Durgan, Gislason ...|449|40.097713| -

In [9]:
custDF = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/creditcard") \
    .option("dbtable", "customer") \
    .option("user", "root") \
    .option("password", "khang") \
    .load()

In [10]:
custDF.show(5)

+---------------+----------+--------+------+--------------------+----------+-----+-----+-------+---------+--------------------+-------------------+
|         cc_num|     first|    last|gender|              street|      city|state|  zip|    lat|     long|                 job|                dob|
+---------------+----------+--------+------+--------------------+----------+-----+-----+-------+---------+--------------------+-------------------+
|101331974127885|Georgeanna| D'Aulby|     F|     6 Stuart Circle|Clearwater|   FL|34620|27.9139| -82.7157| Electrical Engineer|1946-04-03 00:00:00|
|103644429587006|     Davie|Chestnut|     M| 02 Crest Line Trail|   Visalia|   CA|93291|36.3551| -119.301| Marketing Assistant|1986-02-03 00:00:00|
|119517748522380|   Randall|  d' Eye|     M|    415 Towne Center|     Boise|   ID|83727|43.4599| -116.244|    Registered Nurse|1988-05-15 00:00:00|
|121937491260937|  Shepherd|  Dobble|     M|47668 Cottonwood ...| Las Vegas|   NV|89105| 36.086|-115.1471|Compen

In [11]:
import pandas as pd 

from pyspark.ml.classification import LinearSVC, DecisionTreeClassifier

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

from pyspark.sql.functions import isnan, when, count, col, length, expr
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StringIndexer, VectorIndexer, StringIndexerModel, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator


In [12]:
transDF = transDF.withColumn("trans_date", col("trans_date").substr(1, 10))
transDF = transDF.withColumn("trans_date", col("trans_date").cast("string"))
transDF1 = transDF.withColumn("trans_time", col("trans_time").substr(12, 19))
transDF1 = transDF1.withColumn("trans_time", col("trans_time").cast("string"))
transDF1 = transDF1.drop("merchant", "merch_lat", "merch_long")

In [13]:
transDF1.show(5)

+----------------+--------------------+----------+----------+--------------+---+---+--------+--------+
|          cc_num|           trans_num|trans_date|trans_time|      category|amt|age|distance|is_fraud|
+----------------+--------------------+----------+----------+--------------+---+---+--------+--------+
|4641003399120410|a2de9ff6c581157f6...|2016-01-05|  20:54:01|      misc_pos|140| 39|  1.5E-4|       0|
|4738555317386146|6938e448d7a51744c...|2012-01-16|  14:46:57|      misc_pos|230| 30|  3.0E-4|       0|
|    675985166411|7aa4ecc3c69a223d4...|2016-07-31|  22:13:18|health_fitness| 58| 41|  1.4E-4|       0|
|5264302655249852|738fe82839d73f5a0...|2012-01-08|  03:40:06|      misc_net|811| 88| 0.52205|       1|
|4641003399120410|3dba76e37a738bd38...|2016-04-23|  18:35:59|   food_dining| 65| 39|  2.0E-5|       0|
+----------------+--------------------+----------+----------+--------------+---+---+--------+--------+
only showing top 5 rows



In [14]:
transDF1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in transDF1.columns]).show()

+------+---------+----------+----------+--------+---+---+--------+--------+
|cc_num|trans_num|trans_date|trans_time|category|amt|age|distance|is_fraud|
+------+---------+----------+----------+--------+---+---+--------+--------+
|     0|        0|         0|         0|       0|  0|  0|       0|       0|
+------+---------+----------+----------+--------+---+---+--------+--------+



In [15]:
trans_dateIndexer = StringIndexer(inputCol="trans_date", outputCol="indexedtrans_date", handleInvalid="keep")
trans_timeIndexer = StringIndexer(inputCol="trans_time", outputCol="indexedtrans_time", handleInvalid="keep")
categoryIndexer = StringIndexer(inputCol="category", outputCol="indexedcategory", handleInvalid="keep")

In [16]:
trainDF = trans_dateIndexer.fit(transDF1).transform(transDF1)
trainDF = trans_timeIndexer.fit(trainDF).transform(trainDF)
trainDF = categoryIndexer.fit(trainDF).transform(trainDF)

In [17]:
trainDF.show(5)

+----------------+--------------------+----------+----------+--------------+---+---+--------+--------+-----------------+-----------------+---------------+
|          cc_num|           trans_num|trans_date|trans_time|      category|amt|age|distance|is_fraud|indexedtrans_date|indexedtrans_time|indexedcategory|
+----------------+--------------------+----------+----------+--------------+---+---+--------+--------+-----------------+-----------------+---------------+
|4641003399120410|a2de9ff6c581157f6...|2016-01-05|  20:54:01|      misc_pos|140| 39|  1.5E-4|       0|            354.0|           9457.0|            7.0|
|4738555317386146|6938e448d7a51744c...|2012-01-16|  14:46:57|      misc_pos|230| 30|  3.0E-4|       0|             42.0|           5981.0|            7.0|
|    675985166411|7aa4ecc3c69a223d4...|2016-07-31|  22:13:18|health_fitness| 58| 41|  1.4E-4|       0|             91.0|          10207.0|           10.0|
|5264302655249852|738fe82839d73f5a0...|2012-01-08|  03:40:06|      mis

In [18]:
inputFeatures = ['amt',
                 'age',
                 'distance',
                 'indexedtrans_date',
                 'indexedtrans_time',
                 'indexedcategory',]
assembler = VectorAssembler(inputCols=inputFeatures,outputCol="features")
df1 = assembler.transform(trainDF)
df1.show(2)

+----------------+--------------------+----------+----------+--------+---+---+--------+--------+-----------------+-----------------+---------------+--------------------+
|          cc_num|           trans_num|trans_date|trans_time|category|amt|age|distance|is_fraud|indexedtrans_date|indexedtrans_time|indexedcategory|            features|
+----------------+--------------------+----------+----------+--------+---+---+--------+--------+-----------------+-----------------+---------------+--------------------+
|4641003399120410|a2de9ff6c581157f6...|2016-01-05|  20:54:01|misc_pos|140| 39|  1.5E-4|       0|            354.0|           9457.0|            7.0|[140.0,39.0,1.5E-...|
|4738555317386146|6938e448d7a51744c...|2012-01-16|  14:46:57|misc_pos|230| 30|  3.0E-4|       0|             42.0|           5981.0|            7.0|[230.0,30.0,3.0E-...|
+----------------+--------------------+----------+----------+--------+---+---+--------+--------+-----------------+-----------------+---------------+--

In [19]:
df2 = df1.select(['features', 'is_fraud'])
df2.show(2)

+--------------------+--------+
|            features|is_fraud|
+--------------------+--------+
|[140.0,39.0,1.5E-...|       0|
|[230.0,30.0,3.0E-...|       0|
+--------------------+--------+
only showing top 2 rows



In [20]:
# svm = LinearSVC(featuresCol="features", labelCol="is_fraud", maxIter=1000, regParam=0.3)
# svm_model = svm.fit(df2)

In [21]:
dt = DecisionTreeClassifier(featuresCol="features", labelCol="is_fraud", maxBins=12473)
dt_model = dt.fit(df2)

In [22]:
# pipeline = Pipeline(stages = [trans_dateIndexer, trans_timeIndexer, categoryIndexer, assembler, svm])

In [23]:
pipeline1 = Pipeline(stages = [trans_dateIndexer, trans_timeIndexer, categoryIndexer, assembler, dt])

In [24]:
transDF1.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date: string (nullable = true)
 |-- trans_time: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [25]:
# pipeline_model = pipeline.fit(transDF1)

In [26]:
pipeline1_model = pipeline1.fit(transDF1)

# Model Testing

In [27]:
# testDF = testDF.join(custDF, on=['cc_num', 'first', 'last'], how='inner')
# testDF = testDF.withColumn("age", substring(col("trans_date"), 1, 4).cast("int") - substring(col("dob"), 1, 4).cast("int"))
# testDF = testDF.withColumn("distance", round(calculate_distance(col("merch_lat").cast("double"), col("merch_long").cast("double"), col("lat").cast("double"), col("long").cast("double")), 5))
# testDF = testDF.withColumn("amt", col("amt").cast("int"))
# testDF = testDF.withColumn("cc_num", col("cc_num").cast("long"))
# testDF = testDF.withColumn("is_fraud", col("is_fraud").cast("int"))
# test_predict = pipeline1_model.transform(testDF)

In [28]:
# test_predict.show(5)

In [29]:
# evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction", metricName="areaUnderPR")
# auc_pr = evaluator.evaluate(test_predict)
# auc_pr

In [30]:
# evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction", metricName="areaUnderROC")
# auc = evaluator.evaluate(test_predict)
# auc

In [31]:
# evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="accuracy")
# accuracy = evaluator.evaluate(test_predict)
# accuracy

In [None]:
pipeline1_model.save('./pipeline1_model')

# Consumer

In [None]:
import pandas as pd
def json_to_dataframe(json_data):
    pd_df = pd.DataFrame([json.loads(json_data)])
    spark_df = spark.createDataFrame(pd_df)
    return spark_df

In [None]:
from kafka import KafkaConsumer
import json

KAFKA_TOPIC_NAME = 'Fraud-detection'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

consumer = KafkaConsumer(KAFKA_TOPIC_NAME, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
print("Kafka Consumer Application Started ... ")
for msg in consumer:
    try:
        spark_df = json_to_dataframe(msg.value.decode('utf-8'))
        cc_no = spark_df.toPandas()['cc_num'][0]
        query_df = custDF.where(f'cc_num == {cc_no}')
        joinedDF = spark_df.join(query_df, on=['cc_num'], how='inner')
        joinedDF = joinedDF.withColumn("trans_date", substring(col("trans_date"), 1, 10).cast("string"))
        joinedDF = joinedDF.withColumn("age", substring(col("trans_date"), 1, 4).cast("int") - substring(col("dob"), 1, 4).cast("int"))
        joinedDF = joinedDF.withColumn("distance", round(calculate_distance(col("merch_lat").cast("double"), col("merch_long").cast("double"), col("lat").cast("double"), col("long").cast("double")), 5))
        joinedDF = joinedDF.withColumn("amt", col("amt").cast("int"))
        joinedDF = joinedDF.withColumn("cc_num", col("cc_num").cast("long"))
        spark_df = joinedDF.select("cc_num", "trans_num", "trans_date", "trans_time", "category", "amt", "age", "distance")
        result_df = pipeline1_model.transform(spark_df)
        result_df = result_df.select("cc_num", "trans_num", "trans_date", "trans_time", "category", "amt", "age", "distance", col("prediction").alias("is_fraud"))
        print(result_df.toPandas().to_string(index=False, header=False))

        result_df.write \
            .format("jdbc") \
            .option("driver","com.mysql.jdbc.Driver") \
            .option("url", "jdbc:mysql://localhost:3306/creditcard") \
            .option("dbtable", "new_transaction") \
            .option("user", "root") \
            .option("password", "khang") \
            .mode("append") \
            .save()
    except KeyboardInterrupt:
        print('break')
        break
print("Kafka Consumer Application Completed. ")