In [1]:
import findspark
findspark.init()
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars ./libs/spark-sql-kafka-0-10_2.11-2.4.5.jar,./libs/kafka-clients-2.4.1.jar pyspark-shell'

In [2]:
from pyspark.sql import SparkSession

## Create a spark session
spark = SparkSession\
    .builder\
    .appName("SparkDemo")\
    .master("local[*]")\
    .getOrCreate()
    

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

## Kafka configs
kafka_input_config = {
    "kafka.bootstrap.servers" : "kafka:9092",
    "subscribe" : "input",
    "startingOffsets" : "latest",
    "failOnDataLoss" : "false"
}
kafka_output_config = {
    "kafka.bootstrap.servers" : "kafka:9092",
    "topic" : "output",
    "checkpointLocation" : "./check.txt"
}

## Input Schema
df_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("amount", IntegerType(), True),
    StructField("transaction_timestamp", TimestampType(), True),
    StructField("merchant_id", StringType(), True)
])


## Read Stream
df = spark\
    .readStream\
    .format("kafka")\
    .options(**kafka_input_config)\
    .load()\
    .select(F.from_json(F.col("value").cast("string"),df_schema).alias("json_data"))\
    .select("json_data.*")


## Filter for eligible transactions
df = df.filter(\
              (F.col("amount") >= 15) &\
              (F.dayofweek(F.col("transaction_timestamp")) == 6 ) &\
              (F.col("merchant_id") == "MerchantX")\
              )\
        .withColumn("cashback",F.col("amount").cast("double") * 0.15 )\
        .select(F.col("customer_id"),F.col("amount"),\
                F.col("transaction_timestamp"),F.col("merchant_id"),F.col("cashback"))


## Create an output and produce to kafka target
output_df = df.select(F.to_json(F.struct(*df.columns)).alias("value"))
    
write = output_df\
    .writeStream\
    .format("kafka")\
    .options(**kafka_output_config)\
    .start()

write.awaitTermination()




## Transaction Sample
#{"transaction_id": "1", "customer_id": "123", "amount": 20, "transaction_timestamp": "2023-04-21T09:30:00Z", "merchant_id": "MerchantX"}
#{"transaction_id": "2", "customer_id": "123", "amount": 20, "transaction_timestamp": "2023-04-21T09:30:00Z", "merchant_id": "MerchantY"}
#{"transaction_id": "3", "customer_id": "123", "amount": 20, "transaction_timestamp": "2023-04-22T09:30:00Z", "merchant_id": "MerchantX"}
#{"transaction_id": "4", "customer_id": "321", "amount": 667, "transaction_timestamp": "2023-04-28T09:30:00Z", "merchant_id": "MerchantX"}
