In [0]:
user_df = spark.read.csv("dbfs:/FileStore/shared_uploads/varshinie.1006@gmail.com/user_dim.csv", header=True, inferSchema=True)
transaction_df = spark.read.csv("dbfs:/FileStore/shared_uploads/varshinie.1006@gmail.com/transaction_fact.csv", header=True, inferSchema=True)
fraud_df = spark.read.csv("dbfs:/FileStore/shared_uploads/varshinie.1006@gmail.com/fraud_indicators.csv", header=True, inferSchema=True)


In [0]:
#EDA
transaction_df.describe().show()
transaction_df.groupBy("fraud_label").count().show()


+-------+------------------+------------------+------------------+--------------+--------+------------------+
|summary|    transaction_id|           user_id|transaction_amount|payment_method|location|       fraud_label|
+-------+------------------+------------------+------------------+--------------+--------+------------------+
|  count|                10|                10|                10|            10|      10|                10|
|   mean|             105.5|               3.0|           850.125|          null|    null|               0.2|
| stddev|3.0276503540974917|1.4907119849998596| 931.2506916477671|          null|    null|0.4216370213557839|
|    min|               101|                 1|              50.0|   Credit Card| Chicago|                 0|
|    max|               110|                 5|            3000.0|        PayPal| Phoenix|                 1|
+-------+------------------+------------------+------------------+--------------+--------+------------------+

+--------

In [0]:
#Feature Engineering
freq_df = transaction_df.groupBy("user_id").count().withColumnRenamed("count", "transaction_frequency")
freq_df.show()

from pyspark.sql.functions import avg,unix_timestamp

avg_amount_df = transaction_df.groupBy("user_id").agg(avg("transaction_amount").alias("avg_transaction_amount"))
avg_amount_df.show()

user_df = user_df.withColumn("account_creation_timestamp", unix_timestamp("account_creation_date"))
transaction_df = transaction_df.withColumn("transaction_timestamp", unix_timestamp("transaction_date"))


+-------+---------------------+
|user_id|transaction_frequency|
+-------+---------------------+
|      1|                    2|
|      3|                    2|
|      5|                    2|
|      4|                    2|
|      2|                    2|
+-------+---------------------+

+-------+----------------------+
|user_id|avg_transaction_amount|
+-------+----------------------+
|      1|                 975.0|
|      3|                 575.0|
|      5|                1525.0|
|      4|               1000.25|
|      2|               175.375|
+-------+----------------------+



In [0]:
# Data Preprocessing
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=["payment_method", "location"], outputCols=["payment_method_encoded", "location_encoded"])
transaction_df = indexer.fit(transaction_df).transform(transaction_df)


In [0]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler

# Assemble the feature vector
feature_cols = ['transaction_amount', 'transaction_id', 'user_id']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Create labeled dataset
labeled_data = assembler.transform(transaction_df)
labeled_data.show()

# Split the data
train_df, test_df = labeled_data.randomSplit([0.8, 0.2], seed=42)
train_df.show()
test_df.show()

+--------------+-------+------------------+-------------------+-------------------+--------------+-----------+-----------+---------------------+----------------------+----------------+------------------+
|transaction_id|user_id|transaction_amount|   transaction_date|   transaction_time|payment_method|   location|fraud_label|transaction_timestamp|payment_method_encoded|location_encoded|          features|
+--------------+-------+------------------+-------------------+-------------------+--------------+-----------+-----------+---------------------+----------------------+----------------+------------------+
|           101|      1|            1200.0|2023-09-01 00:00:00|2024-09-21 10:00:00|   Credit Card|   New York|          0|           1693526400|                   0.0|             3.0|[1200.0,101.0,1.0]|
|           102|      2|            250.75|2023-09-01 00:00:00|2024-09-21 11:00:00|    Debit Card|Los Angeles|          0|           1693526400|                   1.0|             2.0|

In [0]:
# Model Training
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features', labelCol='fraud_label')
lr_model = lr.fit(train_df)

# Evaluate on test set
test_results = lr_model.transform(test_df)
test_results.select("features", "fraud_label", "prediction").show()


+------------------+-----------+----------+
|          features|fraud_label|prediction|
+------------------+-----------+----------+
| [500.0,103.0,3.0]|          0|       0.0|
|[3000.0,107.0,5.0]|          1|       1.0|
| [200.0,109.0,4.0]|          0|       0.0|
+------------------+-----------+----------+



In [0]:
#Deploying the Model

from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType,IntegerType,DoubleType
# Define schema for transaction data
transaction_schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("transaction_amount", DoubleType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("transaction_time", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("location", StringType(), True),
    StructField("fraud_label", IntegerType(), True)
    
])

# Reading transaction data from streaming CSV
streaming_df = spark.readStream \
    .schema(transaction_schema) \
    .option("header", "true") \
    .csv("dbfs:/FileStore/shared_uploads/varshinie.1006@gmail.com/transaction_fact.csv")


# Transform the streaming data similar to the training data
streaming_features = assembler.transform(streaming_df)

# Use the model to predict fraud in real-time
predictions = lr_model.transform(streaming_features)

# Write results to a sink (e.g., console, database)
predictions.writeStream.format("console").start()
print("started...")

started...
