# Part 2: Streaming application using Spark Structured Streaming

1. Write code to create a SparkSession with the following requirements: 1) use four cores with a proper application name; 2) Melbourne timezone; 3) a checkpoint location has been set.


In [None]:
## Initiate spark session

# Import necessary packages for the integration between spark and kafka
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Setup Spark configuration
master = "local[4]"
app_name = "Assignment 2B"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Create a SparkSession using Melbourne time as the session timezone
# Set up checkpoint location
spark = SparkSession.builder.config(conf=spark_conf) \
    .config("spark.sql.session.timeZone", "Australia/Melbourne") \
    .config("spark.checkpoint.dir", "parquet") \
    .getOrCreate()

# Create a SparkContext object using SparkSession
sc = spark.sparkContext

# Set log level to ERROR
sc.setLogLevel('ERROR')

In [None]:
## Import module
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
from pyspark.sql.types import DateType
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import TimestampType
from pyspark.sql.types import DoubleType

from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import round as round_
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf
from pyspark.sql.functions import when
from pyspark.sql.functions import sum as sum_
from pyspark.sql.functions import window

from datetime import datetime, timedelta

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

from pyspark.ml.classification import GBTClassificationModel


2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets(previous_application_static and value_dict) into data frames. (You can reuse your code from 2A.)


In [None]:
## Load previous_application_static into dataframe
## Reuse code from assignment 2A

# Define schemas for previous_application following metadata file
columns_to_cast_previous_application = {
    "id": LongType(),
    "id_app": IntegerType(),
    "contract_type": IntegerType(),
    "hour_appr_process_start": IntegerType(),
    "channel_type": StringType(),
    "nflag_insured_on_approval": FloatType(),
    "days_decision": FloatType(),
    "days_first_drawing": FloatType(),
    "days_first_due": FloatType(),
    "days_last_due_1st_version": FloatType(),
    "days_last_due": FloatType(),
    "days_termination": FloatType(),
    "rate_down_payment": FloatType(),
    "rate_interest_primary": FloatType(),
    "rate_interest_privileged": FloatType(),
    "amt_annuity": FloatType(),
    "amt_application": FloatType(),
    "amt_credit": FloatType(),
    "amt_down_payment": FloatType(),
    "amt_goods_price": FloatType(),
    "name_cash_loan_purpose": StringType(),
    "name_contract_status": StringType(),
    "name_payment_type": StringType(),
    "code_rejection_reason": StringType(),
    "name_type_suite": StringType(),
    "name_client_type": StringType(),
    "name_goods_category": StringType(),
    "name_portfolio": StringType(),
    "name_product_type": StringType(),
    "sellerplace_area": IntegerType(),
    "name_seller_industry": StringType(),
    "cnt_payment": FloatType(),
    "name_yield_group": StringType(),
    "product_combination": StringType()
}

# Load the previous_application file into dataframe
df_previous_application = spark.read.csv("previous_application_static.csv",header=True)

# Cast schema datatype to the dataframe
for column, data_type in columns_to_cast_previous_application.items():
    df_previous_application = df_previous_application.withColumn(column, col(column).cast(data_type))

# Print schema
df_previous_application.printSchema()

root
 |-- id_app: integer (nullable = true)
 |-- contract_type: integer (nullable = true)
 |-- amt_annuity: float (nullable = true)
 |-- amt_application: float (nullable = true)
 |-- amt_credit: float (nullable = true)
 |-- amt_down_payment: float (nullable = true)
 |-- amt_goods_price: float (nullable = true)
 |-- hour_appr_process_start: integer (nullable = true)
 |-- rate_down_payment: float (nullable = true)
 |-- rate_interest_primary: float (nullable = true)
 |-- rate_interest_privileged: float (nullable = true)
 |-- name_cash_loan_purpose: string (nullable = true)
 |-- name_contract_status: string (nullable = true)
 |-- days_decision: float (nullable = true)
 |-- name_payment_type: string (nullable = true)
 |-- code_rejection_reason: string (nullable = true)
 |-- name_type_suite: string (nullable = true)
 |-- name_client_type: string (nullable = true)
 |-- name_goods_category: string (nullable = true)
 |-- name_portfolio: string (nullable = true)
 |-- name_product_type: string (n

In [None]:
## Load value_dict into dataframe
## Reuse code from assignment 2A

df_value_dict = spark.read.csv("value_dict.csv",header=True)

# Print schema
df_value_dict.printSchema()

root
 |-- id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



3. Using the Kafka topic from the producer in Task 1, read the streaming data with Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.



In [None]:
## Connect to Kafka Producer, subscribe to the topic and load data from Kafka topic with readStream

# Configuration
hostip = "kafka"
topic = "application_stream"

df = (spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", f"{hostip}:9092")
      .option("subscribe", topic)
      .load()
     )

In [None]:
# Convert the key value pair from kafka stream to string
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
# Define the schema for the structured datastream received
schema = ArrayType(StructType([
    StructField("id_app", StringType(), True),
    StructField("target", StringType(), True),
    StructField("contract_type", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("own_car", StringType(), True),
    StructField("own_property", StringType(), True),
    StructField("num_of_children", StringType(), True),
    StructField("income_total", StringType(), True),
    StructField("amt_credit", StringType(), True),
    StructField("amt_annuity", StringType(), True),
    StructField("amt_goods_price", StringType(), True),
    StructField("income_type", StringType(), True),
    StructField("education_type", StringType(), True),
    StructField("family_status", StringType(), True),
    StructField("housing_type", StringType(), True),
    StructField("region_population", StringType(), True),
    StructField("days_birth", StringType(), True),
    StructField("days_employed", StringType(), True),
    StructField("own_car_age", StringType(), True),
    StructField("flag_mobile", StringType(), True),
    StructField("flag_emp_phone", StringType(), True),
    StructField("flag_work_phone", StringType(), True),
    StructField("flag_cont_mobile", StringType(), True),
    StructField("flag_phone", StringType(), True),
    StructField("flag_email", StringType(), True),
    StructField("occupation_type", StringType(), True),
    StructField("cnt_fam_members", StringType(), True),
    StructField("weekday_app_process_start", StringType(), True),
    StructField("hour_app_process_start", StringType(), True),
    StructField("organization_type", StringType(), True),
    StructField("credit_score_1", StringType(), True),
    StructField("credit_score_2", StringType(), True),
    StructField("credit_score_3", StringType(), True),
    StructField("days_last_phone_change", StringType(), True),
    StructField("amt_credit_req_last_hour", StringType(), True),
    StructField("amt_credit_req_last_day", StringType(), True),
    StructField("amt_credit_req_last_week", StringType(), True),
    StructField("amt_credit_req_last_month", StringType(), True),
    StructField("amt_credit_req_last_quarter", StringType(), True),
    StructField("amt_credit_req_last_year", StringType(), True),
    StructField("ts", IntegerType(), True)
]))

In [None]:
# Use from_json to parse the string to the json format based on the defined schema
df = df.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))
df = df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))

In [None]:
# Rename the column with proper name
df_formatted = df.select(
                    F.col("unnested_value.id_app").alias("id_app"),
                    F.col("unnested_value.target").alias("target"),
                    F.col("unnested_value.contract_type").alias("contract_type"),
                    F.col("unnested_value.gender").alias("gender"),
                    F.col("unnested_value.own_car").alias("own_car"),
                    F.col("unnested_value.own_property").alias("own_property"),
                    F.col("unnested_value.num_of_children").alias("num_of_children"),
                    F.col("unnested_value.income_total").alias("income_total"),
                    F.col("unnested_value.amt_credit").alias("amt_credit"),
                    F.col("unnested_value.amt_annuity").alias("amt_annuity"),
                    F.col("unnested_value.amt_goods_price").alias("amt_goods_price"),
                    F.col("unnested_value.income_type").alias("income_type"),
                    F.col("unnested_value.education_type").alias("education_type"),
                    F.col("unnested_value.family_status").alias("family_status"),
                    F.col("unnested_value.housing_type").alias("housing_type"),
                    F.col("unnested_value.region_population").alias("region_population"),
                    F.col("unnested_value.days_birth").alias("days_birth"),
                    F.col("unnested_value.days_employed").alias("days_employed"),
                    F.col("unnested_value.own_car_age").alias("own_car_age"),
                    F.col("unnested_value.flag_mobile").alias("flag_mobile"),
                    F.col("unnested_value.flag_emp_phone").alias("flag_emp_phone"),
                    F.col("unnested_value.flag_work_phone").alias("flag_work_phone"),
                    F.col("unnested_value.flag_cont_mobile").alias("flag_cont_mobile"),
                    F.col("unnested_value.flag_phone").alias("flag_phone"),
                    F.col("unnested_value.flag_email").alias("flag_email"),
                    F.col("unnested_value.occupation_type").alias("occupation_type"),
                    F.col("unnested_value.cnt_fam_members").alias("cnt_fam_members"),
                    F.col("unnested_value.weekday_app_process_start").alias("weekday_app_process_start"),
                    F.col("unnested_value.hour_app_process_start").alias("hour_app_process_start"),
                    F.col("unnested_value.organization_type").alias("organization_type"),
                    F.col("unnested_value.credit_score_1").alias("credit_score_1"),
                    F.col("unnested_value.credit_score_2").alias("credit_score_2"),
                    F.col("unnested_value.credit_score_3").alias("credit_score_3"),
                    F.col("unnested_value.days_last_phone_change").alias("days_last_phone_change"),
                    F.col("unnested_value.amt_credit_req_last_hour").alias("amt_credit_req_last_hour"),
                    F.col("unnested_value.amt_credit_req_last_day").alias("amt_credit_req_last_day"),
                    F.col("unnested_value.amt_credit_req_last_week").alias("amt_credit_req_last_week"),
                    F.col("unnested_value.amt_credit_req_last_month").alias("amt_credit_req_last_month"),
                    F.col("unnested_value.amt_credit_req_last_quarter").alias("amt_credit_req_last_quarter"),
                    F.col("unnested_value.amt_credit_req_last_year").alias("amt_credit_req_last_year"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [None]:
# Print the schema to inspect the data type
df_formatted.printSchema()

root
 |-- id_app: string (nullable = true)
 |-- target: string (nullable = true)
 |-- contract_type: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- own_car: string (nullable = true)
 |-- own_property: string (nullable = true)
 |-- num_of_children: string (nullable = true)
 |-- income_total: string (nullable = true)
 |-- amt_credit: string (nullable = true)
 |-- amt_annuity: string (nullable = true)
 |-- amt_goods_price: string (nullable = true)
 |-- income_type: string (nullable = true)
 |-- education_type: string (nullable = true)
 |-- family_status: string (nullable = true)
 |-- housing_type: string (nullable = true)
 |-- region_population: string (nullable = true)
 |-- days_birth: string (nullable = true)
 |-- days_employed: string (nullable = true)
 |-- own_car_age: string (nullable = true)
 |-- flag_mobile: string (nullable = true)
 |-- flag_emp_phone: string (nullable = true)
 |-- flag_work_phone: string (nullable = true)
 |-- flag_cont_mobile: string (nullab

4. Then, transform the streaming data format into proper types following the metadata file schema, similar to assignment 2A. Perform the following tasks:  
a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  
b) If the data is late for more than 1 minute, discard it.




In [None]:
## Transform stream data format according to metadata file schema
## Reuse code from assignment 2A

# Define schemas for application_data following metadata file
# Convert ts column to timestamp format
columns_to_cast_application = {
    "id_app": IntegerType(),
    "target": IntegerType(),
    "contract_type": IntegerType(),
    "gender": StringType(),
    "own_car": StringType(),
    "own_property": StringType(),
    "num_of_children": IntegerType(),
    "income_total": FloatType(),
    "amt_credit": FloatType(),
    "amt_annuity": FloatType(),
    "amt_goods_price": FloatType(),
    "income_type": IntegerType(),
    "education_type": IntegerType(),
    "family_status": IntegerType(),
    "housing_type": IntegerType(),
    "region_population": FloatType(),
    "days_birth": IntegerType(),
    "days_employed": IntegerType(),
    "own_car_age": FloatType(),
    "flag_mobile": IntegerType(),
    "flag_emp_phone": IntegerType(),
    "flag_work_phone": IntegerType(),
    "flag_cont_mobile": IntegerType(),
    "flag_phone": IntegerType(),
    "flag_email": IntegerType(),
    "occupation_type": IntegerType(),
    "cnt_fam_members": FloatType(),
    "weekday_app_process_start": StringType(),
    "hour_app_process_start": IntegerType(),
    "organization_type": IntegerType(),
    "credit_score_1": FloatType(),
    "credit_score_2": FloatType(),
    "credit_score_3": FloatType(),
    "days_last_phone_change": IntegerType(),
    "amt_credit_req_last_hour": FloatType(),
    "amt_credit_req_last_day": FloatType(),
    "amt_credit_req_last_week": FloatType(),
    "amt_credit_req_last_month": FloatType(),
    "amt_credit_req_last_quarter": FloatType(),
    "amt_credit_req_last_year": FloatType(),
    "ts": TimestampType()
}

# Cast schema datatype to the dataframe
for column, data_type in columns_to_cast_application.items():
    df_formatted = df_formatted.withColumn(column, col(column).cast(data_type))

# Print schema
df_formatted.printSchema()

root
 |-- id_app: integer (nullable = true)
 |-- target: integer (nullable = true)
 |-- contract_type: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- own_car: string (nullable = true)
 |-- own_property: string (nullable = true)
 |-- num_of_children: integer (nullable = true)
 |-- income_total: float (nullable = true)
 |-- amt_credit: float (nullable = true)
 |-- amt_annuity: float (nullable = true)
 |-- amt_goods_price: float (nullable = true)
 |-- income_type: integer (nullable = true)
 |-- education_type: integer (nullable = true)
 |-- family_status: integer (nullable = true)
 |-- housing_type: integer (nullable = true)
 |-- region_population: float (nullable = true)
 |-- days_birth: integer (nullable = true)
 |-- days_employed: integer (nullable = true)
 |-- own_car_age: float (nullable = true)
 |-- flag_mobile: integer (nullable = true)
 |-- flag_emp_phone: integer (nullable = true)
 |-- flag_work_phone: integer (nullable = true)
 |-- flag_cont_mobile: integer

In [None]:
## Handling late data for more than 1 minute late

df_final = df_formatted.withWatermark("ts", "1 minute")

5. Join the static data frames with the streaming data frame, perform data type/column conversion according to your ML model and print out the Schema. (Again, you can reuse code from 2A).


In [None]:
## Join and transform dataframe by repeating assignment 2A

df_application = df_final.withColumn("loan_to_income_ratio", round_(col("amt_credit")/col("income_total"), 4))

## Reuse date_calculator and age_calculator function from assignment 1

# This function takes a base date and a relative number to calculate the target date
def date_calculator(base_date, relative_number):
    base_date = str(base_date)

    if relative_number is None:
        return base_date
    else:
        relative_number = int(relative_number)
        base_date_datetime = datetime.strptime(base_date, "%d/%m/%Y")
        result_date = base_date_datetime + timedelta(days = relative_number)
        return result_date.date()

# This function takes birthdate and current_date to generate age
def age_calculator(birthdate, current_date):
        if birthdate is not None:
            current_date = datetime.strptime(current_date, "%d/%m/%Y")
            age = current_date.year - birthdate.year - ((current_date.month, current_date.day) < (birthdate.month, birthdate.day))
            return age
        else:
            return None

# This function takes age and produce age bucket value
def age_bucket(age):
    age = int(age)
    if age < 25:
        age_bucket = "Y"
    elif age < 35:
        age_bucket = "E"
    elif age < 45:
        age_bucket = "M"
    elif age < 55:
        age_bucket = "L"
    elif age < 65:
        age_bucket = "N"
    else:
        age_bucket = "R"
    return age_bucket

## Calculate birthday
current_date = datetime.today().strftime("%d/%m/%Y")

# Register date_calculator as udf
date_calculator_udf = udf(date_calculator, DateType())

# Generate birthdate
df_application = df_application.withColumn("birthday", date_calculator_udf(lit(current_date), col("days_birth")))

## Calculate age

# Register age_calculator as udf
age_calculator_udf = udf(age_calculator, IntegerType())

# Generate age
df_application = df_application.withColumn("age", age_calculator_udf(col("birthday"), lit(current_date)))

## Calculate age_bucket

# Register age_bucket as udf
age_bucket_udf = udf(age_bucket, StringType())

# Generate age bucket
df_application = df_application.withColumn("age_bucket", age_bucket_udf(col("age")))

# This function assigns credit worthiness bucket to avarage credit score
def credit_bucket(credit_average):
    credit_average = float(credit_average)
    if credit_average < 0.4:
        credit_bucket = "low"
    elif credit_average < 0.7:
        credit_bucket = "medium"
    else:
        credit_bucket = "high"

    return credit_bucket

## Reuse code from assignment 1
# Replace null value in credit score columns
df_application = df_application.withColumn("credit_score_1", when(col("credit_score_1").isNull(), 0.5).otherwise(col("credit_score_1")))
df_application = df_application.withColumn("credit_score_2", when(col("credit_score_2").isNull(), 0.5).otherwise(col("credit_score_2")))
df_application = df_application.withColumn("credit_score_3", when(col("credit_score_3").isNull(), 0.5).otherwise(col("credit_score_3")))

# Calculate average credit score
df_application = (df_application
                  .withColumn("average_credit_score", (col("credit_score_1") + col("credit_score_2") + col("credit_score_3"))/3)
                  .withColumn("average_credit_score", round_(col("average_credit_score"), 4))
                 )



## Calculate credit_bucket

# Register credit_bucket as udf
credit_bucket_udf = udf(credit_bucket, StringType())

# Generate age bucket
df_application = df_application.withColumn("credit_bucket", credit_bucket_udf(col("average_credit_score")))

# Get the number of previous applications
df_num_of_prev_app = (df_previous_application
                      .groupBy("id_app")
                      .count()
                     )

# Join the number of previous applications with application
# For those without previous application, set the count to 0
df_application = (df_application
                  .join(df_num_of_prev_app, df_application.id_app == df_num_of_prev_app.id_app, "left_outer")
                  .drop(df_num_of_prev_app["id_app"])
                  .withColumnRenamed("count", "num_of_prev_app")
                  .withColumn("num_of_prev_app", when(col("num_of_prev_app").isNull(), 0).otherwise(col("num_of_prev_app")))
                 )

## num_of_approved_app (number of approved applications)

# Get the number of approved applications
df_num_of_approved_app = (df_previous_application
                          .filter(col("name_contract_status") == "Approved")
                          .groupBy("id_app")
                          .count()
                         )

# Join the number of approved applications with application
# For those without approved applications, set the count to 0
df_application = (df_application
                  .join(df_num_of_approved_app, df_application.id_app == df_num_of_approved_app.id_app, "left_outer")
                  .drop(df_num_of_approved_app["id_app"])
                  .withColumnRenamed("count", "num_of_approved_app")
                  .withColumn("num_of_approved_app", when(col("num_of_approved_app").isNull(), 0).otherwise(col("num_of_approved_app")))
                 )

## total_credit (sum of credit of all approved previous applications)

# Get the sum of credit of all approved previous applications
df_total_credit = (df_previous_application
                   .filter(col("name_contract_status") == "Approved")
                   .groupBy("id_app")
                   .agg(sum_("amt_credit").alias("total_credit"))
                   .withColumn("total_credit", round_(col("total_credit"), 1))
                  )

# Join the sum of credit of all approved previous applications with application
# For those without approved previous applications, set the amount to 0
df_application = (df_application
                  .join(df_total_credit, df_application.id_app == df_total_credit.id_app, "left_outer")
                  .drop(df_total_credit["id_app"])
                  .withColumn("total_credit", when(col("total_credit").isNull(), 0).otherwise(col("total_credit")))
                 )

## total_credit_to_income_ratio (total credit/income)

# Calculate total_credit_to_income_ratio
df_application = (df_application
                  .withColumn("total_credit_to_income_ratio", col("total_credit")/col("income_total"))
                  .withColumn("total_credit_to_income_ratio", round_(col("total_credit_to_income_ratio"), 2))
                 )

## education_type

# Separate education_type key value pair from value dictionary
df_education_type = (df_value_dict
                     .filter(col("category") == "education_type")
                     .drop(col("id"), col("category"))
                    )
# Convert education_type dataframe to dictionary
education_type_dict = {x[1]: x[0] for x in df_education_type.collect()}

# Adjust education_type datatype, apply replace function based on dictionary
df_application = (df_application
                  .withColumn("education_type", col("education_type").cast(StringType()))
                  .replace(education_type_dict, subset = ["education_type"])
                 )

## occupation_type

# Separate occupation_type key value pair from value dictionary
df_occupation_type = (df_value_dict
                     .filter(col("category") == "occupation_type")
                     .drop(col("id"), col("category"))
                    )
# Convert occupation_type dataframe to dictionary
occupation_type_dict = {x[1]: x[0] for x in df_occupation_type.collect()}

# Adjust occupation_type datatype, apply replace function based on dictionary
df_application = (df_application
                  .withColumn("occupation_type", col("occupation_type").cast(StringType()))
                  .replace(occupation_type_dict, subset = ["occupation_type"])
                 )

## income_type

# Separate income_type key value pair from value dictionary
df_income_type = (df_value_dict
                     .filter(col("category") == "income_type")
                     .drop(col("id"), col("category"))
                    )
# Convert income_type dataframe to dictionary
income_type_dict = {x[1]: x[0] for x in df_income_type.collect()}

# Adjust income_type datatype, apply replace function based on dictionary
df_application = (df_application
                  .withColumn("income_type", col("income_type").cast(StringType()))
                  .replace(income_type_dict, subset = ["income_type"])
                 )

## family_status

# Separate family_status key value pair from value dictionary
df_family_status = (df_value_dict
                     .filter(col("category") == "family_status")
                     .drop(col("id"), col("category"))
                    )
# Convert family_status dataframe to dictionary
family_status_dict = {x[1]: x[0] for x in df_family_status.collect()}

# Adjust family_status datatype, apply replace function based on dictionary
df_application = (df_application
                  .withColumn("family_status", col("family_status").cast(StringType()))
                  .replace(family_status_dict, subset = ["family_status"])
                 )

## Create income bucket

# This function takes income and produce income bucket value
def income_bucket(income):
    if income is None:
        income_bucket = 0
    else:
        income = int(income)
        if income <= 18200:
            income_bucket = 1
        elif income <= 45000:
            income_bucket = 2
        elif income <= 120000:
            income_bucket = 3
        elif income <= 180000:
            income_bucket = 4
        else:
            income_bucket = 5
    return income_bucket


# Register income_bucket as udf
income_bucket_udf = udf(income_bucket, IntegerType())

# Generate income bucket
df_application = df_application.withColumn("income_bucket", income_bucket_udf(col("income_total")))

## Creat flag_contact

df_application = df_application.withColumn("flag_contact", col("flag_mobile") + col("flag_emp_phone") + col("flag_work_phone") + col("flag_phone") + col("flag_email"))

## Drop columns

df_application = df_application.drop("id_app", "age", "birthday", "days_birth", "average_credit_score", "credit_score_1", "credit_score_2", "credit_score_3", "income_total", "flag_mobile", "flag_emp_phone", "flag_work_phone", "flag_phone", "flag_email", "amt_credit_req_last_hour", "amt_credit_req_last_day", "amt_credit_req_last_week", "amt_credit_req_last_month", "amt_credit_req_last_quarter", "weekday_app_process_start", "hour_app_process_start", "region_population", "total_credit", "own_car")

# Display the schema
df_application.printSchema()

root
 |-- target: integer (nullable = true)
 |-- contract_type: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- own_property: string (nullable = true)
 |-- num_of_children: integer (nullable = true)
 |-- amt_credit: float (nullable = true)
 |-- amt_annuity: float (nullable = true)
 |-- amt_goods_price: float (nullable = true)
 |-- income_type: string (nullable = true)
 |-- education_type: string (nullable = true)
 |-- family_status: string (nullable = true)
 |-- housing_type: integer (nullable = true)
 |-- days_employed: integer (nullable = true)
 |-- own_car_age: float (nullable = true)
 |-- flag_cont_mobile: integer (nullable = true)
 |-- occupation_type: string (nullable = true)
 |-- cnt_fam_members: float (nullable = true)
 |-- organization_type: integer (nullable = true)
 |-- days_last_phone_change: integer (nullable = true)
 |-- amt_credit_req_last_year: float (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- loan_to_income_ratio: double (nullable =

In [None]:
# Create output sink for the stream to inspect records
query = (df_final
         .writeStream.outputMode("append")
         .format("console")
         .trigger(processingTime='5 seconds')
         .start()
        )

In [None]:
# Stop the query
query.stop()

6. Load your ML model, and use the model and Spark to perform the following:  
    a) Every 10 seconds, print the total number of applications and number of potential default applications (prediction = 1) in the last 1 minute.  
    b) Every 15 seconds, show the total requested credit (sum of credit where default=0) in the last 15 seconds.  
    c) Every 1 minute, show the top 10 largest loan applications with the potential of default.  

In [None]:
## Load ML model

model_path = 'loan_default_prediction_model'
gbtModel = GBTClassificationModel.load(model_path)

**Explanation**

When completing assignment 2A, I forgot to save the transformation part of the model, only saved the gradient boosting tree model. I am unable to resolve this problem. Therefore, I will proceed with the rest of this assignment using target column in application_data_stream as the prediction.

In [None]:
# 6a
application_breakdown = (df_final
                         .groupBy(window(df_application.ts, "1 minute", "10 seconds"))
                         .agg(F.count("target").alias("total_number_of_applications"),
                              F.count(F.when(F.col("target") == 1, True)).alias("number_of_potential_default_applications"))
                         .select("window", "total_number_of_applications", "number_of_potential_default_applications")
                        )

#SEND OUTPUT TO CONSOLE SINK
query1 = (application_breakdown
          .orderBy(col("window").start)
          .writeStream.outputMode("complete")
          .format("console")
          .trigger(processingTime='10 seconds')
          .option("truncate","false")
          .start()
         )

In [None]:
query1.stop()

In [None]:
# 6b
total_credit_request = (df_final
                        .groupBy(window(df_application.ts, "15 seconds", "15 seconds"))
                        .agg(F.sum(F.when(F.col("target") == 0, F.col("amt_credit")).otherwise(0)).alias("total_credit_request"))
                        .select("window", "total_credit_request")
                       )

#SEND OUTPUT TO CONSOLE SINK
query2 = (total_credit_request
          .orderBy(col("window").start)
          .writeStream.outputMode("complete")
          .format("console")
          .trigger(processingTime='15 seconds')
          .option("truncate","false")
          .start()
         )

In [None]:
query2.stop()

In [None]:
# 6c
def process_batch(df, epoch_id):

    top_10_potential_default = (df
                                .filter(F.col("target") == 1)
                                .orderBy(F.col("amt_credit").desc())
                                .limit(10)
                                .select("id_app","amt_credit","ts")
                               )

    top_10_potential_default.show()

query3 = (df_final
          .writeStream
          .format("console")
          .foreachBatch(process_batch)
          .trigger(processingTime='1 minute')
          .option("truncate","false")
          .start()
         )

## Reference: (n.d.). How to use foreach or foreachBatch in PySpark to write to database? Stack Overflow. https://stackoverflow.com/questions/58766638/how-to-use-foreach-or-foreachbatch-in-pyspark-to-write-to-database

In [None]:
query3.stop()

+------+----------+---+
|id_app|amt_credit| ts|
+------+----------+---+
+------+----------+---+



7. Write a Parquet file to store the following data:  
    a) Persist the raw data: Persist your prediction results along with application data and event_time in Parquet format; after that, read the Parquet file and show the first 10 records.  
    b) Persist the 15-second requested credit (6b) in another parquet file.


In [None]:
# 7a
# Write into parquet files with the prediction, application data and event time
query_file_sink_a = (df_final
                    .writeStream
                    .format("parquet")
                    .outputMode("append")
                    .option("path", "parquet/application_df")
                    .option("checkpointLocation", "parquet/application_df/checkpoint")
                    .start()
                   )

## Reference: (2024, January 29). FIT5202 - Week11 - 1. Spark Streaming Watermarking DEMO [Lab Resources].

In [None]:
#Stop the file_sink query
query_file_sink_a.stop()

In [None]:
# Read the saved parquet data
query_file_sink_a_df = spark.read.parquet("parquet/application_df")
query_file_sink_a_df.show(10)

+------+------+-------------+------+-------+------------+---------------+------------+----------+-----------+---------------+-----------+--------------+-------------+------------+-----------------+----------+-------------+-----------+-----------+--------------+---------------+----------------+----------+----------+---------------+---------------+-------------------------+----------------------+-----------------+--------------+--------------+--------------+----------------------+------------------------+-----------------------+------------------------+-------------------------+---------------------------+------------------------+-------------------+
|id_app|target|contract_type|gender|own_car|own_property|num_of_children|income_total|amt_credit|amt_annuity|amt_goods_price|income_type|education_type|family_status|housing_type|region_population|days_birth|days_employed|own_car_age|flag_mobile|flag_emp_phone|flag_work_phone|flag_cont_mobile|flag_phone|flag_email|occupation_type|cnt_fam_mem

In [None]:
# 7b
# Write into parquet files with requested credit
query_file_sink_b = (total_credit_request
                     .writeStream.format("parquet")
                     .outputMode("append")
                     .option("path", "parquet/total_credit_request")
                     .option("checkpointLocation", "parquet/total_credit_request/checkpoint")
                     .start()
                    )

In [None]:
# Read the saved parquet data
query_file_sink_b_df = spark.read.parquet("parquet/total_credit_request")

query_file_sink_b_df.show(10)

+--------------------+--------------------+
|              window|total_credit_request|
+--------------------+--------------------+
|{2024-02-15 16:51...|       7.342874685E8|
|{2024-02-15 16:51...|       4.158635895E8|
|{2024-02-15 16:52...|       4.926838995E8|
|{2024-02-15 17:10...|        4.62045924E8|
|{2024-02-15 17:16...|        5.34052908E8|
|{2024-02-15 17:10...|       4.448438235E8|
|{2024-02-15 16:53...|       4.791040245E8|
|{2024-02-15 17:15...|        2.05130475E8|
|{2024-02-15 16:53...|        3.48389037E8|
|{2024-02-15 16:52...|       7.349625315E8|
+--------------------+--------------------+
only showing top 10 rows



8. Read the two parquet files from 7a and 7b as a data stream, and send the records to two topics with appropriate names.  
(Note: You shall read the parquet files as a streaming data frame and send messages to the Kafka topic when new data appears in the parquet files. The parquet files serve as an intermediate storage in this use case.)

In [None]:
## Keep updating and save parquet files from 7a and 7b

# The content for parquet file from 7b is already saved at "parquet/total_credit_request"

# Write into parquet files with requested credit
query_file_sink_c = (application_breakdown
                     .writeStream.format("parquet")
                     .outputMode("append")
                     .option("path", "parquet/application_breakdown")
                     .option("checkpointLocation", "parquet/application_breakdown/checkpoint")
                     .start()
                    )

In [None]:
# Read the saved parquet data to inspect the records
query_file_sink_c_df = spark.read.parquet("parquet/application_breakdown")

query_file_sink_c_df.show(10)

+--------------------+----------------------------+----------------------------------------+
|              window|total_number_of_applications|number_of_potential_default_applications|
+--------------------+----------------------------+----------------------------------------+
|{2024-02-15 17:15...|                        3417|                                     289|
|{2024-02-15 17:09...|                        1553|                                     145|
|{2024-02-15 17:11...|                        3570|                                     289|
|{2024-02-15 17:14...|                         649|                                      59|
|{2024-02-15 17:11...|                        3744|                                     299|
|{2024-02-15 17:14...|                         150|                                      12|
|{2024-02-15 17:10...|                        3236|                                     277|
|{2024-02-15 17:11...|                        3050|                   

In [None]:
# Stream 1

# Define the schema for the dataframe
schema1 = StructType([
    StructField("window", StructType([StructField("start", TimestampType(), True),StructField("end", TimestampType(), True)]), True),
    StructField("total_number_of_applications", LongType(), True),
    StructField("number_of_potential_default_applications", LongType(), True),
])

# Read the streamed dataframe
stream_application_breakdown = spark.readStream.format("parquet").schema(schema1).load("parquet/application_breakdown")


# Write the streamed dataframe to kafka
query_a = (stream_application_breakdown
           .selectExpr("CAST(null AS STRING) AS key", "to_json(struct(*)) AS value")
           .writeStream
           .outputMode("append")
           .format("kafka")
           .option("kafka.bootstrap.servers", f"{hostip}:9092")
           .option("topic", "application_visulisation_1")
           .option("checkpointLocation", "checkpoint/application_breakdown")
           .start()
          )

## Reference: (2024, January 29). FIT5202 - Week11 - 1. Spark Streaming Watermarking DEMO [Lab Resources].
## Reference: (n.d.). PySpark : Write Spark Dataframe to Kafka Topic. Stack Overflow. https://stackoverflow.com/questions/62370231/pyspark-write-spark-dataframe-to-kafka-topic
## Reference: (n.d.). Spark: How to define a nested schema? Stack Overflow. https://stackoverflow.com/questions/67379564/spark-how-to-define-a-nested-schema

In [None]:
# Stream 2

# Define the schema for the dataframe
schema2 = StructType([
    StructField("window", StructType([StructField("start", TimestampType(), True),StructField("end", TimestampType(), True)]), True),
    StructField("total_credit_request", DoubleType(), True),
])

# Read the streamed dataframe
stream_total_credit_request = spark.readStream.format("parquet").schema(schema2).load("parquet/total_credit_request")

# Write the streamed dataframe to kafka
query_b = (stream_total_credit_request
           .selectExpr("CAST(null AS STRING) AS key", "to_json(struct(*)) AS value")
           .writeStream
           .outputMode("append")
           .format("kafka")
           .option("kafka.bootstrap.servers", f"{hostip}:9092")
           .option("topic", "application_visulisation_2")
           .option("checkpointLocation", "checkpoint/total_credit_request")
           .start()
          )

In [None]:
## Use the following code to validate streamed data

# df_test = spark.read.parquet('parquet/total_credit_request/part-00022-6e089cd7-d523-4bef-91ee-864ddfa674aa-c000.snappy.parquet')
# df_test.show()

+--------------------+--------------------+
|              window|total_credit_request|
+--------------------+--------------------+
|{2024-02-16 16:34...|         3.6846594E7|
+--------------------+--------------------+



In [None]:
## Use the following code to inspect streaming status

query_b.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [None]:
## Stop the queries
query_file_sink_b.stop()
query_file_sink_c.stop()

query_a.stop()
query_b.stop()