In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

KAFKA_TOPIC_NAME_CONS = "transactionrequest"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "transactiondetail"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092'

In [2]:
if __name__ == "__main__":
    print("PySpark Structured Streaming with Kafka Demo Application Started ...")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars", "spark-sql-kafka-0-10_2.11-2.4.0.jar,kafka-clients-1.1.0.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

PySpark Structured Streaming with Kafka Demo Application Started ...


In [3]:
transaction_detail_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
        .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
        .option("startingOffsets", "latest") \
        .load()

In [4]:
transaction_detail_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
transaction_detail_df1 = transaction_detail_df.selectExpr("CAST(value AS STRING)", "timestamp")

In [6]:
transaction_detail_df1

DataFrame[value: string, timestamp: timestamp]

In [7]:
   transaction_detail_schema = StructType() \
        .add("transaction_id", StringType()) \
        .add("user_id", StringType()) \
        .add("card_number", StringType()) \
        .add("amount", StringType()) \
        .add("description", StringType()) \
        .add("transaction_type", StringType()) \
        .add("vendor", StringType()) \

In [8]:
transaction_detail_df2 = transaction_detail_df1\
.select(from_json(col("value"), transaction_detail_schema).alias("transaction_detail"), "timestamp")

In [9]:
transaction_detail_df2

DataFrame[transaction_detail: struct<transaction_id:string,user_id:string,card_number:string,amount:string,description:string,transaction_type:string,vendor:string>, timestamp: timestamp]

In [10]:
transaction_detail_df3 = transaction_detail_df2.select("transaction_detail.*", "timestamp")

In [11]:
transaction_detail_df3 = transaction_detail_df3.withColumn("user_id",transaction_detail_df3["user_id"].cast('int')).withColumn("card_number",transaction_detail_df3["card_number"].cast('long')) \
.withColumn("amount",transaction_detail_df3["amount"].cast('float')).withColumn("status", lit("None"))

In [12]:
transaction_detail_df3


DataFrame[transaction_id: string, user_id: int, card_number: bigint, amount: float, description: string, transaction_type: string, vendor: string, timestamp: timestamp]

In [13]:
#     transaction_detail_df2 = transaction_detail_df1\
#         .select(from_json(col("value"), transaction_detail_schema).alias("transaction_detail"), "timestamp")

#     transaction_detail_df3 = transaction_detail_df2.select("transaction_detail.*", "timestamp")
transaction_detail_df3 = transaction_detail_df3.createOrReplaceTempView("transaction_detail_df3")
transaction_detail_df3.withColumn('status',when(transaction_detail_df3.amount <= 70, "APPROVED").otherwise('DENIED'))
# transaction_detail_df4 = spark.sql("SELECT * from transaction_detail_df3 where amount < 70")
# transaction_detail_approved = transaction_detail_df4
# transaction_detail_df5 = spark.sql("SELECT * from transaction_detail_df3 where amount >= 70")
# transaction_detail_denied = transaction_detail_df5
#     transaction_detail_approved = transaction_detail_df3.select("transaction_id", "user_id", "card_number", "amount", "description", "transaction_type", "vendor", "timestamp").where("amount < 70") 

#     transaction_detail_denied = transaction_detail_df3.select("transaction_id", "user_id", "card_number", "amount", "description", "transaction_type", "vendor", "timestamp").where("amount >= 70") 

In [14]:
    trans_detail_write_stream_approved = transaction_detail_df3 \
        .writeStream \
        .trigger(processingTime='1 seconds') \
        .outputMode("update") \
        .option("truncate", "false")\
        .format("console") \
        .start()

#     trans_detail_write_stream_denied = transaction_detail_denied \
#         .writeStream \
#         .trigger(processingTime='60 seconds') \
#         .outputMode("update") \
#         .option("truncate", "false")\
#         .format("console") \
#         .start()

In [15]:
# td3.writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .start()t