# Initialisation and Dataset reading

In [None]:
from pyspark.sql import SparkSession
from flask import Flask, request, jsonify
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DoubleType, IntegerType, TimestampType, DecimalType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import to_timestamp, to_date, regexp_replace, col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("CSVReader") \
    .getOrCreate()

# Initialize Flask app
app = Flask(__name__)

Aug 09, 2024 4:41:03 PM org.apache.spark.launcher.Log4jHotPatchOption staticJavaAgentOption

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
file_path = 'Dataset/fraud_data.csv'


# Define the schema with trans_date_trans_time as StringType initially
schema = StructType([
    StructField("trans_date_trans_time", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amt", DecimalType(10,2), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("city_pop", IntegerType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("trans_num", StringType(), True),
    StructField("merch_lat", DoubleType(), True),
    StructField("merch_long", DoubleType(), True),
    StructField("is_fraud", StringType(), True)
])

def read_dataset(file_path, spark):
    # Read the CSV file into a Spark DataFrame
    df = spark.read.option("header", "true") \
        .option("delimiter", ",") \
        .option("quote", "\"") \
        .option("escape", "\"") \
        .schema(schema) \
        .csv(file_path)
    return df

df = read_dataset(file_path, spark)
# Show the first few rows of the DataFrame
df.show(truncate=False)
df.printSchema()

+---------------------+---------------------------------+--------------+-------+--------+-----+-------+---------+--------+--------------------------+----------+--------------------------------+---------+-----------+--------+
|trans_date_trans_time|merchant                         |category      |amt    |city    |state|lat    |long     |city_pop|job                       |dob       |trans_num                       |merch_lat|merch_long |is_fraud|
+---------------------+---------------------------------+--------------+-------+--------+-----+-------+---------+--------+--------------------------+----------+--------------------------------+---------+-----------+--------+
|04-01-2019 00:58     |"Stokes, Christiansen and Sipes" |grocery_net   |14.37  |Wales   |AK   |64.7556|-165.6723|145     |"Administrator, education"|09-11-1939|a3806e984cec6ac0096d8184c64ad3a1|65.654142|-164.722603|1       |
|04-01-2019 15:06     |Predovic Inc                     |shopping_net  |966.11 |Wales   |AK   |64.75

# Data transformation

In [8]:
# Change schema for trans_date_trans_time & dob. Remove '" "' from merchant and job

def transform_data(df):
    # Update the DataFrame
    df_dedup = df.dropDuplicates()
    
    fix_fraud_df = df_dedup.withColumn("is_fraud", when(length(col("is_fraud")) > 2, concat(substring(col("is_fraud"), 1, 1))).otherwise(col("is_fraud")))
    change_class_df = fix_fraud_df.withColumn("is_fraud", when(col("is_fraud") == '1', "fraud").otherwise("not fraud"))
    # Transform the df 
    transform_df = change_class_df.withColumn("trans_date_trans_time", to_timestamp(col("trans_date_trans_time"), "dd-MM-yyyy HH:mm")) \
                     .withColumn("dob", to_date(col("dob"), "dd-MM-yyyy")) \
                     .withColumn("merchant", regexp_replace(col("merchant"), '""|\"', '')) \
                     .withColumn("job", regexp_replace(col("job"), '""|\"', ''))
                    
    return transform_df

transform_df = transform_data(df)
transform_df.printSchema()
transform_df.show()
transform_df.count()

root
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: decimal(10,2) (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)

+---------------------+--------------------+--------------+-------+--------+-----+-------+---------+--------+--------------------+----------+--------------------+---------+-----------+--------+
|trans_date_trans_time|            merchant|      category|    amt|    city|state|    lat|     long|city_pop|                 job|       dob|           trans_num|merch_lat| merch_long|is_fraud|
+---------

In [None]:
transform_df.write.parquet("api_dataset/cc_transactions_detail.parquet", mode='overwrite', compression="gzip")