### Streaming application using Spark Structured Streaming (55%)

#### Write code to SparkSession is created using a SparkConf object, which would use two local cores with a proper application name, and use UTC as the timezone.

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'
from pyspark import SparkConf # Spark
from pyspark.sql import SparkSession

# setting up spark session
spark_conf = SparkConf()\
            .setMaster("local[2]")\
            .setAppName("Spark Streaming")\
            .set("spark.sql.session.timeZone", "UTC")


spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

Ivy Default Cache set to: /srv/home/smis0018/.ivy2/cache
The jars for the packages stored in: /srv/home/smis0018/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.8/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ef0b5d5d-148a-415d-a5b7-d96707411971;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org

#### Use the same topic names from the Kafka producer in Task 1, ingest the streaming data into Spark Streaming and assume all data coming in String format.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *

# reading publiser's publised data
df_cust = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", "customer_data") \
    .load()

In [3]:
# reading publiser's publised data
df_bur = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", "bureau_data") \
    .load()


### Then the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A. Then use 'ts' column as the watermark and set the delay threshold to 5 seconds.

In [4]:
#Define the schema for the structured datastream received ref class notes : week 10 
cust_schema = ArrayType(StructType([    
    StructField("ID", IntegerType(), True),
    StructField("Frequency", StringType(), True),
    StructField("InstlmentMode", StringType(), True),
    StructField("LoanStatus", StringType(), True),
    StructField("PaymentMode", StringType(), True),
    StructField("BranchID", StringType(), True),
    StructField("Area", StringType(), True),
    StructField("Tenure", IntegerType(), True),
    StructField("AssetCost", IntegerType(), True),
    StructField("AmountFinance", DoubleType(), True),
    StructField("DisbursalAmount", DoubleType(), True),
    StructField("EMI", DoubleType(), True),
    StructField("DisbursalDate", StringType(), True),
    StructField("MaturityDAte", StringType(), True),
    StructField("AuthDate", StringType(), True),
    StructField("AssetID", StringType(), True),
    StructField("ManufacturerID", StringType(), True),
    StructField("SupplierID", StringType(), True),
    StructField("LTV", DoubleType(), True),
    StructField("SEX", StringType(), True),
    StructField("AGE", IntegerType(), True),
    StructField("MonthlyIncome", DoubleType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("ZiPCODE", IntegerType(), True),
    StructField("Top-up Month", StringType(), True),
    StructField('ts', TimestampType(), True)          
]))
# custom schema
bur_schema = ArrayType(StructType([    
    StructField("ID", IntegerType(), True),
    StructField("SELF-INDICATOR", StringType(), True),
    StructField("MATCH-TYPE", StringType(), True),
    StructField("ACCT-TYPE", StringType(), True),
    StructField("CONTRIBUTOR-TYPE", StringType(), True),
    StructField("DATE-REPORTED", StringType(), True),
    StructField("OWNERSHIP-IND", StringType(), True),
    StructField("ACCOUNT-STATUS", StringType(), True),
    StructField("DISBURSED-DT", StringType(), True),
    StructField("CLOSE-DT", StringType(), True),
    StructField("LAST-PAYMENT-DATE", StringType(), True),
    StructField("CREDIT-LIMIT/SANC AMT", StringType(), True),
    StructField("DISBURSED-AMT/HIGH CREDIT", StringType(), True),
    StructField("INSTALLMENT-AMT", StringType(), True),
    StructField("CURRENT-BAL", StringType(), True),
    StructField("INSTALLMENT-FREQUENCY", StringType(), True),
    StructField("OVERDUE-AMT", StringType(), True),
    StructField("WRITE-OFF-AMT", IntegerType(), True),
    StructField("ASSET_CLASS", StringType(), True),
    StructField("REPORTED DATE - HIST", StringType(), True),
    StructField("DPD - HIST", StringType(), True),
    StructField("CUR BAL - HIST", StringType(), True),
    StructField("AMT OVERDUE - HIST", StringType(), True),
    StructField("AMT PAID - HIST", StringType(), True),
    StructField("TENURE", IntegerType(), True),
    StructField('ts', TimestampType(), True)            
]))

In [5]:
# selecting value and casting it to string from the schema
df_cust = df_cust.select(F.from_json(F.col("value").cast("string"), cust_schema).alias('parsed_value'))
df_bur = df_bur.select(F.from_json(F.col("value").cast("string"), bur_schema).alias('parsed_value'))


22/10/23 05:48:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [6]:
df_cust.printSchema()

root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ID: integer (nullable = true)
 |    |    |-- Frequency: string (nullable = true)
 |    |    |-- InstlmentMode: string (nullable = true)
 |    |    |-- LoanStatus: string (nullable = true)
 |    |    |-- PaymentMode: string (nullable = true)
 |    |    |-- BranchID: string (nullable = true)
 |    |    |-- Area: string (nullable = true)
 |    |    |-- Tenure: integer (nullable = true)
 |    |    |-- AssetCost: integer (nullable = true)
 |    |    |-- AmountFinance: double (nullable = true)
 |    |    |-- DisbursalAmount: double (nullable = true)
 |    |    |-- EMI: double (nullable = true)
 |    |    |-- DisbursalDate: string (nullable = true)
 |    |    |-- MaturityDAte: string (nullable = true)
 |    |    |-- AuthDate: string (nullable = true)
 |    |    |-- AssetID: string (nullable = true)
 |    |    |-- ManufacturerID: string (nullable = true)
 |    |    |-- SupplierID: str

In [7]:
df_bur.printSchema()

root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ID: integer (nullable = true)
 |    |    |-- SELF-INDICATOR: string (nullable = true)
 |    |    |-- MATCH-TYPE: string (nullable = true)
 |    |    |-- ACCT-TYPE: string (nullable = true)
 |    |    |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |    |    |-- DATE-REPORTED: string (nullable = true)
 |    |    |-- OWNERSHIP-IND: string (nullable = true)
 |    |    |-- ACCOUNT-STATUS: string (nullable = true)
 |    |    |-- DISBURSED-DT: string (nullable = true)
 |    |    |-- CLOSE-DT: string (nullable = true)
 |    |    |-- LAST-PAYMENT-DATE: string (nullable = true)
 |    |    |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |    |    |-- DISBURSED-AMT/HIGH CREDIT: string (nullable = true)
 |    |    |-- INSTALLMENT-AMT: string (nullable = true)
 |    |    |-- CURRENT-BAL: string (nullable = true)
 |    |    |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |    |   

In [8]:
# as we have nested column we are using explode to flatten it
df_cust = df_cust.select(F.explode(F.col("parsed_value")).alias('unnested_value'))
df_bur = df_bur.select(F.explode(F.col("parsed_value")).alias('unnested_value'))


In [9]:
df_cust.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- ID: integer (nullable = true)
 |    |-- Frequency: string (nullable = true)
 |    |-- InstlmentMode: string (nullable = true)
 |    |-- LoanStatus: string (nullable = true)
 |    |-- PaymentMode: string (nullable = true)
 |    |-- BranchID: string (nullable = true)
 |    |-- Area: string (nullable = true)
 |    |-- Tenure: integer (nullable = true)
 |    |-- AssetCost: integer (nullable = true)
 |    |-- AmountFinance: double (nullable = true)
 |    |-- DisbursalAmount: double (nullable = true)
 |    |-- EMI: double (nullable = true)
 |    |-- DisbursalDate: string (nullable = true)
 |    |-- MaturityDAte: string (nullable = true)
 |    |-- AuthDate: string (nullable = true)
 |    |-- AssetID: string (nullable = true)
 |    |-- ManufacturerID: string (nullable = true)
 |    |-- SupplierID: string (nullable = true)
 |    |-- LTV: double (nullable = true)
 |    |-- SEX: string (nullable = true)
 |    |-- AGE: integer (nullable 

In [10]:
# fetching nested values
df_cust_formatted = df_cust.select(
                    F.col("unnested_value.ID").alias("ID"),
                    F.col("unnested_value.Frequency").alias("Frequency"),
                    F.col("unnested_value.InstlmentMode").alias("InstlmentMode"),
                    F.col("unnested_value.LoanStatus").alias("LoanStatus"),
                    F.col("unnested_value.PaymentMode").alias("PaymentMode"),
                    F.col("unnested_value.BranchID").alias("BranchID"),
                    F.col("unnested_value.Area").alias("Area"),
                    F.col("unnested_value.Tenure").alias("Tenure"),
                    F.col("unnested_value.AssetCost").alias("AssetCost"),
                    F.col("unnested_value.AmountFinance").alias("AmountFinance"),
                    F.col("unnested_value.DisbursalAmount").alias("DisbursalAmount"),
                    F.col("unnested_value.EMI").alias("EMI"),
                    F.col("unnested_value.DisbursalDate").alias("DisbursalDate"),
                    F.col("unnested_value.MaturityDAte").alias("MaturityDAte"),
                    F.col("unnested_value.AuthDate").alias("AuthDate"),
                    F.col("unnested_value.AssetID").alias("AssetID"),
                    F.col("unnested_value.ManufacturerID").alias("ManufacturerID"),
                    F.col("unnested_value.SupplierID").alias("SupplierID"),
                    F.col("unnested_value.LTV").alias("LTV"),
                    F.col("unnested_value.SEX").alias("SEX"),
                    F.col("unnested_value.AGE").alias("AGE"),
                    F.col("unnested_value.MonthlyIncome").alias("MonthlyIncome"),
                    F.col("unnested_value.City").alias("City"),
                    F.col("unnested_value.State").alias("State"),
                    F.col("unnested_value.ZiPCODE").alias("ZiPCODE"),
                    F.col("unnested_value.Top-up Month").alias("Top-up Month"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [11]:
# formated dataframe
df_cust_formatted.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Frequency: string (nullable = true)
 |-- InstlmentMode: string (nullable = true)
 |-- LoanStatus: string (nullable = true)
 |-- PaymentMode: string (nullable = true)
 |-- BranchID: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- AssetCost: integer (nullable = true)
 |-- AmountFinance: double (nullable = true)
 |-- DisbursalAmount: double (nullable = true)
 |-- EMI: double (nullable = true)
 |-- DisbursalDate: string (nullable = true)
 |-- MaturityDAte: string (nullable = true)
 |-- AuthDate: string (nullable = true)
 |-- AssetID: string (nullable = true)
 |-- ManufacturerID: string (nullable = true)
 |-- SupplierID: string (nullable = true)
 |-- LTV: double (nullable = true)
 |-- SEX: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- MonthlyIncome: double (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZiPCODE: integer (nu

In [12]:
# query_cust_sink = df_cust_formatted.writeStream.outputMode("append").format("console").trigger(processingTime='10 seconds').start()


In [13]:
df_bur.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- ID: integer (nullable = true)
 |    |-- SELF-INDICATOR: string (nullable = true)
 |    |-- MATCH-TYPE: string (nullable = true)
 |    |-- ACCT-TYPE: string (nullable = true)
 |    |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |    |-- DATE-REPORTED: string (nullable = true)
 |    |-- OWNERSHIP-IND: string (nullable = true)
 |    |-- ACCOUNT-STATUS: string (nullable = true)
 |    |-- DISBURSED-DT: string (nullable = true)
 |    |-- CLOSE-DT: string (nullable = true)
 |    |-- LAST-PAYMENT-DATE: string (nullable = true)
 |    |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |    |-- DISBURSED-AMT/HIGH CREDIT: string (nullable = true)
 |    |-- INSTALLMENT-AMT: string (nullable = true)
 |    |-- CURRENT-BAL: string (nullable = true)
 |    |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |    |-- OVERDUE-AMT: string (nullable = true)
 |    |-- WRITE-OFF-AMT: integer (nullable = true)
 |    |-- ASSET_CLASS: string (nulla

In [14]:
# fetching nested values
df_bur_formatted = df_bur.select(
                    F.col("unnested_value.ID").alias("ID"),
                    F.col("unnested_value.SELF-INDICATOR").alias("SELF-INDICATOR"),
                    F.col("unnested_value.MATCH-TYPE").alias("MATCH-TYPE"),
                    F.col("unnested_value.ACCT-TYPE").alias("ACCT-TYPE"),
                    F.col("unnested_value.CONTRIBUTOR-TYPE").alias("CONTRIBUTOR-TYPE"),
                    F.col("unnested_value.DATE-REPORTED").alias("DATE-REPORTED"),
                    F.col("unnested_value.OWNERSHIP-IND").alias("OWNERSHIP-IND"),
                    F.col("unnested_value.ACCOUNT-STATUS").alias("ACCOUNT-STATUS"),
                    F.col("unnested_value.DISBURSED-DT").alias("DISBURSED-DT"),
                    F.col("unnested_value.CLOSE-DT").alias("CLOSE-DT"),
                    F.col("unnested_value.LAST-PAYMENT-DATE").alias("LAST-PAYMENT-DATE"),
                    F.col("unnested_value.CREDIT-LIMIT/SANC AMT").alias("CREDIT-LIMIT/SANC AMT"),
    
                    F.col("unnested_value.DISBURSED-AMT/HIGH CREDIT").alias("DISBURSED-AMT/HIGH CREDIT"),
                    F.col("unnested_value.INSTALLMENT-AMT").alias("INSTALLMENT-AMT"),
                    F.col("unnested_value.CURRENT-BAL").alias("CURRENT-BAL"),
                    F.col("unnested_value.INSTALLMENT-FREQUENCY").alias("INSTALLMENT-FREQUENCY"),
                    F.col("unnested_value.OVERDUE-AMT").alias("OVERDUE-AMT"),
                    F.col("unnested_value.WRITE-OFF-AMT").alias("WRITE-OFF-AMT"),
                    F.col("unnested_value.ASSET_CLASS").alias("ASSET_CLASS"),
                    F.col("unnested_value.REPORTED DATE - HIST").alias("REPORTED DATE - HIST"),
                    F.col("unnested_value.DPD - HIST").alias("DPD - HIST"),
                    F.col("unnested_value.CUR BAL - HIST").alias("CUR BAL - HIST"),
                    F.col("unnested_value.AMT OVERDUE - HIST").alias("AMT OVERDUE - HIST"),
                    F.col("unnested_value.AMT PAID - HIST").alias("AMT PAID - HIST"),
                    F.col("unnested_value.TENURE").alias("TENURE"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [15]:
df_bur_formatted.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- SELF-INDICATOR: string (nullable = true)
 |-- MATCH-TYPE: string (nullable = true)
 |-- ACCT-TYPE: string (nullable = true)
 |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |-- DATE-REPORTED: string (nullable = true)
 |-- OWNERSHIP-IND: string (nullable = true)
 |-- ACCOUNT-STATUS: string (nullable = true)
 |-- DISBURSED-DT: string (nullable = true)
 |-- CLOSE-DT: string (nullable = true)
 |-- LAST-PAYMENT-DATE: string (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT: string (nullable = true)
 |-- INSTALLMENT-AMT: string (nullable = true)
 |-- CURRENT-BAL: string (nullable = true)
 |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |-- OVERDUE-AMT: string (nullable = true)
 |-- WRITE-OFF-AMT: integer (nullable = true)
 |-- ASSET_CLASS: string (nullable = true)
 |-- REPORTED DATE - HIST: string (nullable = true)
 |-- DPD - HIST: string (nullable = true)
 |-- CUR BAL - HIST: string (nullab

In [16]:
# making watermark of ts to ensure we don't get data after 5 seconds
df_bur_formatted_watermark = df_bur_formatted.withWatermark("ts","5 seconds")
df_cust_formatted_watermark = df_cust_formatted.withWatermark("ts","5 seconds")

#### Group the bureau stream based on ID with 30 seconds window duration, similar to assignment 2A(same rule for sum and dist).
- Transform the “SELF-INDICATOR” column’s values. If the value is true, then convert to 1, if the value is false, then convert to 0.
- sum the rows for numeric type columns, count distinct values for other columns with other data types, and rename them with the postfix like '_sum' or '_dist'. (For example, we did the sum function based on the 'HIGH CREDIT', and the new column’s name will be 'HIGH CREDIT_sum').

In [17]:
# assinging 0 and 1 to top-up values
from pyspark.sql.functions import when
df_bur_update = df_bur_formatted_watermark \
                            .withColumn("SELF-INDICATOR", when(df_bur_formatted_watermark["SELF-INDICATOR"] == "TRUE",1) \
                            .otherwise(0))

In [18]:
df_bur_update.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- SELF-INDICATOR: integer (nullable = false)
 |-- MATCH-TYPE: string (nullable = true)
 |-- ACCT-TYPE: string (nullable = true)
 |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |-- DATE-REPORTED: string (nullable = true)
 |-- OWNERSHIP-IND: string (nullable = true)
 |-- ACCOUNT-STATUS: string (nullable = true)
 |-- DISBURSED-DT: string (nullable = true)
 |-- CLOSE-DT: string (nullable = true)
 |-- LAST-PAYMENT-DATE: string (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT: string (nullable = true)
 |-- INSTALLMENT-AMT: string (nullable = true)
 |-- CURRENT-BAL: string (nullable = true)
 |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |-- OVERDUE-AMT: string (nullable = true)
 |-- WRITE-OFF-AMT: integer (nullable = true)
 |-- ASSET_CLASS: string (nullable = true)
 |-- REPORTED DATE - HIST: string (nullable = true)
 |-- DPD - HIST: string (nullable = true)
 |-- CUR BAL - HIST: string (null

In [19]:

from pyspark.sql.functions import col, split
from pyspark.sql.types import IntegerType

from pyspark.sql.functions import regexp_replace

# removing ',' and casting to integer
df_bur_update = df_bur_update.withColumn("OVERDUE-AMT", regexp_replace('OVERDUE-AMT', ',', '').cast(IntegerType())) \
                            .withColumn("CURRENT-BAL", regexp_replace('CURRENT-BAL', ',', '').cast(IntegerType())) \
                            .withColumn("DISBURSED-AMT/HIGH CREDIT", regexp_replace('DISBURSED-AMT/HIGH CREDIT', ',', '').cast(IntegerType())) \
                            .withColumn("CREDIT-LIMIT/SANC AMT", regexp_replace('CREDIT-LIMIT/SANC AMT', ',', '').cast(IntegerType())) \
                            .withColumn("INSTALLMENT-AMT", regexp_replace('INSTALLMENT-AMT', ',', '')) \
                            .withColumn("INSTALLMENT-AMT", split(col("INSTALLMENT-AMT"), "/").getItem(0).cast(IntegerType()))

In [20]:
from pyspark.sql.types import IntegerType
# list to store column names based on type
numeric_col = []
other_col = []
for itr in df_bur_update.schema.fields:
    if (itr.name != "ID"):
        if (isinstance(itr.dataType, IntegerType)):
            numeric_col.append(itr.name)
        else:
            other_col.append(itr.name)
# sum and distinct
df_bur_formatted = df_bur_update.groupBy(F.window(df_bur_update.ts, "30 seconds"), "ID").agg(
    *[F.sum(num).alias(num + "_sum") for num in numeric_col],
    *[F.approx_count_distinct(othr).alias(othr + "_dist") for othr in other_col])


In [21]:
df_bur_formatted.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- ID: integer (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- TENURE_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- INST

In [22]:
# printing to console
#query_cust_sink = df_cust_formatted.writeStream.outputMode("append").format("console").trigger(processingTime='10 seconds').start()


In [23]:
#query_bur_sink = df_bur_formatted.writeStream.outputMode("append").format("console").trigger(processingTime='10 seconds').start()

### Create new columns named 'window_start' and 'window_end' which are the window’s start time and end time in 2.4. Then inner join the 2 streams based on 'ID', and only customer data received between the window time are accepted. For example, customer data ID '3' received at 10:00, and only when the window of corresponding bureau data contains 10:00(like window start: 9:59, end: 10:00), then this data is accepted.

In [24]:
# adding column
df_bur_formatted = df_bur_formatted.withColumn("window_start",F.col("window.start")) \
                                            .withColumn("window_end",F.col("window.end"))


In [25]:
df_bur_formatted.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- ID: integer (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- TENURE_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- INST

In [26]:
# changing column name to avoid ambiguity
df_bur_New = df_bur_formatted.withColumnRenamed("ID","burID")

In [27]:
from pyspark.sql.functions import expr
# inner join between two data stream
join_data = df_bur_New.join(
  df_cust_formatted,
  expr("""
    ID == burID AND
    ts >= window_start AND
    ts <= window_end
    """),
  "inner"                 
)

In [28]:
join_data.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- burID: integer (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- TENURE_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- I

#### Persist the above result in parquet format.(When you save the data to parquet format,you need to rename “Top-up Month” to “Top-up_Month” first. And only keep these columns “ID”, “window_start”, “window_end”, “ts”, “Top-up_Month”) Renaming “Top-up Month” only happen in this question

In [29]:
# changing column name
parquet_df = join_data.withColumnRenamed("Top-up Month","Top-up_Month")

In [30]:
parquet_df.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- burID: integer (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- TENURE_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- I

In [31]:
parquet_df = parquet_df.select("ID","window_start", "window_end", "ts")

In [32]:
parquet_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- ts: timestamp (nullable = true)



In [33]:
# Write into parquet files
query_file_sink = parquet_df.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/performance_df")\
        .option("checkpointLocation", "parquet/performance_df/checkpoint")\
        .start()

22/10/23 05:48:54 WARN UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details.;
Project [ID#52, window_start#4041, window_end#4071, ts#78]
+- Project [window#400-T5000ms, burID#4102, SELF-INDICATOR_sum#428L, CREDIT-LIMIT/SANC AMT_sum#430L, DISBURSED-AMT/HIGH CREDIT_sum#432L, INSTALLMENT-AMT_sum#434L, CURRENT-BAL_sum#436L, OVERDUE-AMT_sum#438L, WRITE-OFF-AMT_sum#440L, TENURE_sum#442L, MATCH-TYPE_dist#548L, ACCT-TYPE_dist#654L, CONTRIBUTOR-TYPE_dist#760L, DATE-REPORTED_dist#866L, OWNERSHIP-IND_dist#972L, ACCOUNT-STATUS_dist#1078L, DISBURSED-DT_dist#1184L, CLOSE-DT_dist#1290L, LAST-PAYMENT-DATE_dist#1396L, INSTALLMENT-FREQUENCY_dist#1502L, ASSET_CLASS_dist#1608L, REPORTED DATE -

In [34]:
#Stop the file_sink query
query_file_sink.stop()

#### Load the machine learning models given and use the model to predict whether users will be joining the top-up service. Save the results in parquet format. (When you save the data to parquet format,you need to rename “Top-up Month” to “Top-up_Month” first. And only keep these columns “ID”, “window_start”, “window_end”, “ts”, “prediction”, “Top-up_Month”) Renaming “Top-up Month” will happen in this question as well

In [35]:
# read pickled model via pipeline api
from pyspark.ml.pipeline import PipelineModel

topup_ml_model = PipelineModel.load("topup_pipeline_model")
# handling null values
topup_ml_model.stages[-2].setHandleInvalid("keep")


                                                                                

VectorAssembler_ac48f700b95d

In [36]:
# predict
predictionsDF = topup_ml_model.transform(join_data)

In [37]:
# schema after prediction
predictionsDF.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- burID: integer (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- TENURE_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- I

In [38]:
# renaming column
prediction_parquet = predictionsDF.withColumnRenamed("Top-up Month","Top-up_Month")

In [39]:
prediction_parquet.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- burID: integer (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- TENURE_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- I

In [40]:
# selecting required columns
prediction_df = prediction_parquet.select("ID","window_start", 
                                  "window_end", "ts", 
                                  "prediction", "Top-up_Month")

In [41]:
# Write into parquet files
prediction_file_sink = predictionsDF.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/prediction_df")\
        .option("checkpointLocation", "parquet/performance_df/checkpoint")\
        .start()

22/10/23 05:49:02 WARN UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details.;
Project [window#400-T5000ms, burID#4102, SELF-INDICATOR_sum#428L, CREDIT-LIMIT/SANC AMT_sum#430L, DISBURSED-AMT/HIGH CREDIT_sum#432L, INSTALLMENT-AMT_sum#434L, CURRENT-BAL_sum#436L, OVERDUE-AMT_sum#438L, WRITE-OFF-AMT_sum#440L, TENURE_sum#442L, MATCH-TYPE_dist#548L, ACCT-TYPE_dist#654L, CONTRIBUTOR-TYPE_dist#760L, DATE-REPORTED_dist#866L, OWNERSHIP-IND_dist#972L, ACCOUNT-STATUS_dist#1078L, DISBURSED-DT_dist#1184L, CLOSE-DT_dist#1290L, LAST-PAYMENT-DATE_dist#1396L, INSTALLMENT-FREQUENCY_dist#1502L, ASSET_CLASS_dist#1608L, REPORTED DATE - HIST_dist#1714L, DPD - HIST_dist#1820L, CUR BAL - HIST_dist#1

In [42]:
#Stop the file_sink query
query_file_sink.stop()

#### 8
Only keep the customer predicted as our target customers (willing to join the top-up service). Normally, we should only keep “Top-up=1”. But due to the limited performance of our VM, if your process is extremely slow, you can abandon the filter and keep all of the data. Then for each batch, show the epoch id and count of the dataframe. 
If the dataframe is not empty, transform the data to the following key/value format, which key is 'window_end' column and the data are the numbers of top-up services customers in the different states(in JSON format). Then send it to Kafka with a proper topic. These data will be used for the real-time monitoring in task 3.

In [43]:
# Create function to show values received from input dataframe
# refered from chapter 8 (Structured Streaming) learning spark
from pyspark.sql.functions import *
def foreach_batch_function(df, epoch_id):
    if(df_empty.isEmpty()):
       print("data frame is empty")
        
    else:
        # filter prediction value
        filtered = df.filter(df.prediction == 1.0)
        
        #group by window_end and state then storing key and value data
        transform_df = filtered.groupBy("window_end", "State").count()\
                        .groupBy("window_end")\
                        .agg(collect_set(ArrayType(StructType(col('State'),col('count')))).alias("value"))\
                        .withColumnRenamed("window_end", "Key") \
                        .withColumn("value", to_json(col("value"))) 
    # sending data to kafka port
    transform_df.write\
            .format("kafka")\
            .option("kafka.bootstrap.servers", 'localhost:9092')\
            .option("topic", "top_up_customer")
        
    
    
        
    print(f"Epoch id:  {epoch_id} and count of the dataframe is {df.count()}")

In [44]:
# using foreachBatch to modify batch data
query1 = (
    predictionsDF.writeStream.outputMode("Append")
        .foreachBatch(foreach_batch_function)\
        .start()
)

22/10/23 05:49:02 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-dfa8362f-baaf-497d-a444-6f2609130a28. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/10/23 05:49:02 WARN UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details.;
Project [window#400-T5000ms, burID#4102, SELF-INDICATOR_sum#428L, CREDIT-LIMIT/SANC AMT_sum#430L, DISBURSED-AMT/HIGH CREDIT_sum#432L, INSTALLMENT-AMT_sum#434L, CURRENT-BAL_sum#436L, OVERDUE-AMT_sum#438L, WRITE-OFF-AMT_

In [45]:
query1.stop()

22/10/23 05:49:02 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-4c061b94-a7ca-468d-b53d-001ed2c84507-1480152181-driver-0-1, groupId=spark-kafka-source-4c061b94-a7ca-468d-b53d-001ed2c84507-1480152181-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
22/10/23 05:49:02 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-4c061b94-a7ca-468d-b53d-001ed2c84507-1480152181-driver-0-1, groupId=spark-kafka-source-4c061b94-a7ca-468d-b53d-001ed2c84507-1480152181-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
22/10/23 05:49:02 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-4c061b94-a7ca-468d-b53d-001ed2c84507-1480152181-driver-0-1, groupId=spark-kafka-source-4c061b94-a7ca-468d-b53d-001ed2c84507-1480152181-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
22/10/23 05:49:02 WARN NetworkClient:

### References
to set UTC 
https://stackoverflow.com/questions/49644232/how-to-set-timezone-to-utc-in-apache-spark

For Watermark
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.withWatermark.html

For sum agg 
https://www.anycodings.com/1questions/762375/pyspark-loop-in-groupby-aggregate-function

For join
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations

For loading ML model
https://stackoverflow.com/questions/57038445/load-model-pyspark

For collecting set  values
https://sparkbyexamples.com/spark/spark-collect-list-and-collect-set-functions/

for JSON
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html