In [7]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = 'pyspark-shell'
# !pip install pyspark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
import json

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .appName("SimpleFraudDetection") \
        .getOrCreate()
    return spark

In [None]:
def process_stream(spark, kafka_topic):
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", kafka_topic) \
        .option("startingOffsets",    "earliest")    \
        .load()

    # Transaction data in JSON format for example
    df = df.selectExpr("CAST(value AS STRING)") \
           .select(from_json("value", schema).alias("data"))

    # Join with customer risk scores from parquet
    customer_risk_scores = spark.read.parquet("customer_risk_scores")
    df = df.join(customer_risk_scores, "customer_id")

    # Feature engineering Sample
    df = df.withColumn("amount_bin", col("amount").cast("int") / 100)
    df = df.withColumn("time_diff", unix_timestamp() - col("timestamp"))

    # Load pre-trained model (update it with your own model)
    model = LogisticRegression.load("fraud_model")

    # Make predictions (job)
    predictions = model.transform(df)

    # Filter fraudulent transactions
    fraud_transactions = predictions.filter(col("prediction") === 1)

    # Update customer risk scores (batch process)
    # my batch process here

    # Write fraudulent transactions to a sink (e.g., Kafka)
    fraud_transactions.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "fraud_alerts") \
        .start()

    fraud_transactions.awaitTermination()



In [None]:
if __name__ == "__main__":
    spark = create_spark_session()
    process_stream(spark, "transactions")
