# Part 2: Streaming application using Spark Structured Streaming

1. Write code to create a SparkSession with the following requirements: 1) use four cores with a proper application name; 2) Melbourne timezone; 3) a checkpoint location has been set.


In [29]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# Set Spark application name and master
app_name = "Assignment 2B"
master = "local[4]"

# Create Spark configuration
spark_conf = SparkConf().setAppName(app_name).setMaster(master)\
                        .set("spark.sql.session.timeZone", "Australia/Melbourne")\
                        .set("spark.driver.memory", "8g")\
                        .set("spark.checkpoint.auto", "true")\
                        .set("checkpoint_location", "checkpoint")\

# Create SparkSession
spark = SparkSession.builder.config(conf = spark_conf)\
                            .config("spark.sql.streaming.statefulOperator.allowMultiple", "false") \
                            .getOrCreate()

2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets(previous_application_static and value_dict) into data frames. (You can reuse your code from 2A.)


In [30]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
#from pyspark.sql.functions import explode

pre_app_schema = StructType([
    StructField("id_app", IntegerType()),
    StructField("contract_type", IntegerType()),
    StructField("amt_annuity", FloatType()),
    StructField("amt_application", FloatType()),
    StructField("amt_credit", FloatType()),
    StructField("amt_down_payment", FloatType()),
    StructField("amt_goods_price", FloatType()),
    StructField("hour_appr_process_start", IntegerType()),
    StructField("rate_down_payment", FloatType()),
    StructField("rate_interest_primary", FloatType()),
    StructField("rate_interest_privileged", FloatType()),
    StructField("name_cash_loan_purpose", StringType()),
    StructField("name_contract_status", StringType()),
    StructField("days_decision", IntegerType()),
    StructField("name_payment_type", StringType()),
    StructField("code_reject_reason", StringType()),
    StructField("name_type_suite", StringType()),
    StructField("name_client_type", StringType()),
    StructField("name_goods_category", StringType()),
    StructField("name_portfolio", StringType()),
    StructField("name_product_type", StringType()),
    StructField("channel_type", StringType()),
    StructField("sellerplace_area", IntegerType()),
    StructField("name_seller_industry", StringType()),
    StructField("cnt_payment", FloatType()),
    StructField("name_yield_group", StringType()),
    StructField("product_combination", StringType()),
    StructField("days_first_drawing", FloatType()),
    StructField("days_first_due", FloatType()),
    StructField("days_last_due_1st_version", FloatType()),
    StructField("days_last_due", FloatType()),
    StructField("days_termination", FloatType()),
    StructField("nflag_insured_on_approval", FloatType()),
    StructField("id", LongType())
])

value_schema = StructType([
    StructField("id", IntegerType()),
    StructField("category", StringType()),
    StructField("key", StringType()),
    StructField("value", IntegerType()),
])

# Load CSV files
pre_app_df = spark.read.schema(pre_app_schema).csv('previous_application.csv', header = True)
value_df = spark.read.schema(value_schema).csv('value_dict.csv', header = True)

3. Using the Kafka topic from the producer in Task 1, read the streaming data with Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.



In [31]:
hostip = 'kafka'
topic = 'application_stream'

app_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest")\
    .load()

# Get value of the kafka message
app_lines = app_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [32]:
#query = app_lines\
        #.writeStream\
        #.outputMode('append')\
        #.format('console')\
        #.trigegr(processingTime = '10 seconds'))\
        #.start(truncate = False)

In [33]:
#query.stop()

4. Then, transform the streaming data format into proper types following the metadata file schema, similar to assignment 2A. Perform the following tasks:  
a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  
b) If the data is late for more than 1 minute, discard it.




In [34]:
#from pyspark.sql.functions import explode, from_json, col

#Define the schema for the structured datastream received
schema = ArrayType(StructType([    
    StructField("id_app", StringType(), True),
    StructField("target", StringType(), True),
    StructField("contract_type", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("own_car", StringType(), True),
    StructField("own_property", StringType(), True),
    StructField("num_of_children", StringType(), True),
    StructField("income_total", StringType(), True),
    StructField("amt_credit", StringType(), True),
    StructField("amt_annuity", StringType(), True),
    StructField("amt_goods_price", StringType(), True),
    StructField("income_type", StringType(), True),
    StructField("education_type", StringType(), True),
    StructField("family_status", StringType(), True),
    StructField("housing_type", StringType(), True),
    StructField("region_population_relative", StringType(), True),
    StructField("days_birth", StringType(), True),
    StructField("days_employed", StringType(), True),
    StructField("own_car_age", StringType(), True),
    StructField("flag_mobile", StringType(), True),
    StructField("flag_emp_phone", StringType(), True),
    StructField("flag_work_phone", StringType(), True),
    StructField("flag_cont_mobile", StringType(), True),
    StructField("flag_phone", StringType(), True),
    StructField("flag_email", StringType(), True),
    StructField("occupation_type", StringType(), True),
    StructField("cnt_fam_members", StringType(), True),
    StructField("weekday_app_process_start", StringType(), True),
    StructField("hour_app_process_start", StringType(), True),
    StructField("organization_type", StringType(), True),
    StructField("credit_score_1", StringType(), True),
    StructField("credit_score_2", StringType(), True),
    StructField("credit_score_3", StringType(), True),
    StructField("days_last_phone_change", StringType(), True),
    StructField("amt_credit_req_last_hour", StringType(), True),
    StructField("amt_credit_req_last_day", StringType(), True),
    StructField("amt_credit_req_last_week", StringType(), True),
    StructField("amt_credit_req_last_month", StringType(), True),
    StructField("amt_credit_req_last_quarter", StringType(), True),
    StructField("amt_credit_req_last_year", StringType(), True),
    StructField("ts", TimestampType(), True)
]))


app_df_1 = app_lines.select(F.from_json(F.col("value").cast(StringType()), schema).alias('parsed_value'))
#app_df_1.printSchema()
app_df_2 = app_df_1.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  
#app_df_2.printSchema()

app_df_formatted = app_df_2.select(
                    F.col("unnested_value.id_app").alias("id_app"),
                    F.col("unnested_value.target").alias("target"),
                    F.col("unnested_value.contract_type").alias("contract_type"),
                    F.col("unnested_value.gender").alias("gender"),
                    F.col("unnested_value.own_car").alias("own_car"),
                    F.col("unnested_value.own_property").alias("own_property"),
                    F.col("unnested_value.num_of_children").alias("num_of_children"),
                    F.col("unnested_value.income_total").alias("income_total"),
                    F.col("unnested_value.amt_credit").alias("amt_credit"),
                    F.col("unnested_value.amt_annuity").alias("amt_annuity"),
                    F.col("unnested_value.amt_goods_price").alias("amt_goods_price"),
                    F.col("unnested_value.income_type").alias("income_type"),
                    F.col("unnested_value.education_type").alias("education_type"),
                    F.col("unnested_value.family_status").alias("family_status"),
                    F.col("unnested_value.housing_type").alias("housing_type"),
                    F.col("unnested_value.region_population_relative").alias("region_population_relative"),
                    F.col("unnested_value.days_birth").alias("days_birth"),
                    F.col("unnested_value.days_employed").alias("days_employed"),
                    F.col("unnested_value.own_car_age").alias("own_car_age"),
                    F.col("unnested_value.flag_mobile").alias("flag_mobile"),
                    F.col("unnested_value.flag_emp_phone").alias("flag_emp_phone"),
                    F.col("unnested_value.flag_work_phone").alias("flag_work_phone"),
                    F.col("unnested_value.flag_cont_mobile").alias("flag_cont_mobile"),
                    F.col("unnested_value.flag_phone").alias("flag_phone"),
                    F.col("unnested_value.flag_email").alias("flag_email"),
                    F.col("unnested_value.occupation_type").alias("occupation_type"),
                    F.col("unnested_value.weekday_app_process_start").alias("weekday_appr_process_start"),
                    F.col("unnested_value.hour_app_process_start").alias("hour_app_process_start"),
                    F.col("unnested_value.organization_type").alias("organization_type"),
                    F.col("unnested_value.credit_score_1").alias("credit_score_1"),
                    F.col("unnested_value.credit_score_2").alias("credit_score_2"),
                    F.col("unnested_value.credit_score_3").alias("credit_score_3"),
                    F.col("unnested_value.days_last_phone_change").alias("days_last_phone_change"),
                    F.col("unnested_value.amt_credit_req_last_hour").alias("amt_credit_req_last_hour"),
                    F.col("unnested_value.amt_credit_req_last_day").alias("amt_credit_req_last_day"),
                    F.col("unnested_value.amt_credit_req_last_week").alias("amt_credit_req_last_week"),
                    F.col("unnested_value.amt_credit_req_last_month").alias("amt_credit_req_last_month"),
                    F.col("unnested_value.amt_credit_req_last_quarter").alias("amt_credit_req_last_quarter"),
                    F.col("unnested_value.amt_credit_req_last_year").alias("amt_credit_req_last_year"),
                    F.col("unnested_value.ts").alias("ts")
)

# Transform the datatype
app_df_formatted = app_df_formatted.withColumn("id_app", F.col("id_app").cast(IntegerType()))\
                                    .withColumn("target", F.col("target").cast(IntegerType()))\
                                    .withColumn("contract_type", F.col("contract_type").cast(IntegerType()))\
                                    .withColumn("num_of_children", F.col("num_of_children").cast(IntegerType()))\
                                    .withColumn("income_total", F.col("income_total").cast(FloatType()))\
                                    .withColumn("amt_credit", F.col("amt_credit").cast(FloatType()))\
                                    .withColumn("amt_annuity", F.col("amt_annuity").cast(FloatType()))\
                                    .withColumn("amt_goods_price", F.col("amt_goods_price").cast(FloatType()))\
                                    .withColumn("income_type", F.col("income_type").cast(IntegerType()))\
                                    .withColumn("education_type", F.col("education_type").cast(IntegerType()))\
                                    .withColumn("family_status", F.col("family_status").cast(IntegerType()))\
                                    .withColumn("housing_type", F.col("housing_type").cast(IntegerType()))\
                                    .withColumn("region_population_relative", F.col("region_population_relative").cast(FloatType()))\
                                    .withColumn("days_birth", F.col("days_birth").cast(IntegerType()))\
                                    .withColumn("days_employed", F.col("days_employed").cast(IntegerType()))\
                                    .withColumn("own_car_age", F.col("own_car_age").cast(FloatType()))\
                                    .withColumn("flag_mobile", F.col("flag_mobile").cast(IntegerType()))\
                                    .withColumn("flag_emp_phone", F.col("flag_emp_phone").cast(IntegerType()))\
                                    .withColumn("flag_work_phone", F.col("flag_work_phone").cast(IntegerType()))\
                                    .withColumn("flag_cont_mobile", F.col("flag_cont_mobile").cast(IntegerType()))\
                                    .withColumn("flag_phone", F.col("flag_phone").cast(IntegerType()))\
                                    .withColumn("flag_email", F.col("flag_email").cast(IntegerType()))\
                                    .withColumn("occupation_type", F.col("occupation_type").cast(IntegerType()))\
                                    .withColumn("hour_app_process_start", F.col("hour_app_process_start").cast(IntegerType()))\
                                    .withColumn("organization_type", F.col("organization_type").cast(IntegerType()))\
                                    .withColumn("credit_score_1", F.col("credit_score_1").cast(FloatType()))\
                                    .withColumn("credit_score_2", F.col("credit_score_2").cast(FloatType()))\
                                    .withColumn("credit_score_3", F.col("credit_score_3").cast(FloatType()))\
                                    .withColumn("days_last_phone_change", F.col("days_last_phone_change").cast(FloatType()))\
                                    .withColumn("amt_credit_req_last_hour", F.col("amt_credit_req_last_hour").cast(FloatType()))\
                                    .withColumn("amt_credit_req_last_day", F.col("amt_credit_req_last_day").cast(FloatType()))\
                                    .withColumn("amt_credit_req_last_week", F.col("amt_credit_req_last_week").cast(FloatType()))\
                                    .withColumn("amt_credit_req_last_month", F.col("amt_credit_req_last_month").cast(FloatType()))\
                                    .withColumn("amt_credit_req_last_quarter", F.col("amt_credit_req_last_quarter").cast(FloatType()))\
                                    .withColumn("amt_credit_req_last_year", F.col("amt_credit_req_last_year").cast(FloatType()))\
                                    .withColumn("ts", F.col("ts"))

app_df_formatted = app_df_formatted.withWatermark("ts", "1 minute")
#app_df.printSchema()

5. Join the static data frames with the streaming data frame, perform data type/column conversion according to your ML model and print out the Schema. (Again, you can reuse code from 2A).


In [35]:
import math
from datetime import datetime, timedelta
from pyspark.sql.functions import udf
from pyspark.sql.functions import when
from pyspark.sql.functions import expr

def age_calculate(days_birth):
    age = math.floor(days_birth/-365)
    return age

def classify_age(age):
    if age < 25:
        return 'Y'
    elif 25 <= age < 35:
        return 'E'
    elif 35 <= age < 45:
        return 'M'
    elif 45 <= age < 55:
        return 'L'
    elif 55 <= age < 65:
        return 'N'
    elif age >= 65:
        return 'R'

def set_credit_worthiness(avg_score):
    if avg_score >= 0.7:
        return 'high'
    elif 0.4 <= avg_score < 0.7:
        return 'medium'
    elif avg_score < 0.4:
        return 'low'

def workyear_calculate(days_employed):
    workyear = days_employed/-365
    return workyear

def classify_workyear(workyear):
    if 0 < workyear < 1:
        return '< 1'
    elif 1 <= workyear <= 10:
        return '1-10'
    elif 10 < workyear <= 20:
        return '11-20'
    elif 20 < workyear <= 30:
        return '21-30'
    elif 30 < workyear <= 40:
        return '31-40'
    elif 40 < workyear <= 50:
        return '41-50'
    elif 50 < workyear <= 60:
        return '51-60'
    elif workyear < 0 or workyear == 0:
        return '0'

def changephone_calculate(days_last_phone_change):
    if days_last_phone_change is not None:
        changeyear = days_last_phone_change / -365
        return changeyear
    else:
        return 0
    
def classify_changephone(changeyear):
    if 0 < changeyear < 1:
        return '< 1'
    elif 1 <= changeyear <= 5:
        return '1-5'
    elif 5 < changeyear <= 10:
        return '6-10'
    elif 10 < changeyear <= 15:
        return '11-15'
    elif 15 < changeyear <= 20:
        return '16-20'
    else:
        return '0'
    
def replace_value(df, column, value_df):  
    df = df.join(value_df, F.col(column) == value_df.value, how = 'inner')
    df = df.withColumn(column, F.col('key'))
    df = df.drop('value').drop('key')
    return df
        
# Register the functions as UDFs
age_calculate_udf = udf(age_calculate, IntegerType())
classify_age_udf = udf(classify_age, StringType())
set_credit_worthiness_udf = udf(set_credit_worthiness, StringType())
workyear_calculate_udf = udf(workyear_calculate, IntegerType())
classify_workyear_udf = udf(classify_workyear, StringType())
changephone_calculate_udf = udf(changephone_calculate, IntegerType())
classify_changephone_udf = udf(classify_changephone, StringType())

# Add age_bucket and loan_to_income_ratio columns
app_df_new = app_df_formatted.withColumn("age_bucket", classify_age_udf(age_calculate_udf(F.col('days_birth'))))\
                .withColumn('loan_to_income_ratio', F.col('amt_credit') / F.col('income_total'))
    
# replace null value with 0.5
columns = ['credit_score_1', 'credit_score_2', 'credit_score_3']
for col in columns:
    app_df_new = app_df_new.withColumn(col, F.coalesce(col, F.lit(0.5)))

# Add credit_worthiness column # Add work_year and changephone_year
app_df_new = app_df_new.withColumn('credit_worthiness', set_credit_worthiness_udf((F.col('credit_score_1') + F.col('credit_score_2') + F.col('credit_score_3')) / 3))\
                                .withColumn("work_year", classify_workyear_udf(workyear_calculate_udf(F.col('days_employed'))))\
                                .withColumn("changephone_year", classify_changephone_udf(changephone_calculate_udf(F.col('days_last_phone_change'))))

## Join app_df and pre_app_df
pre_app_df_renamed = pre_app_df.withColumnRenamed('id_app', 'id_app_1')\
                                .withColumnRenamed('amt_credit', 'amt_credit_1')

# join application_data with previous_application
joined_df = app_df_new.join(pre_app_df_renamed, app_df_new.id_app == pre_app_df_renamed.id_app_1, how = 'left_outer')

# Create 'num_of_prev_app' column
joined_df_count_pre = joined_df.groupby('id_app').agg(F.count('id').alias('num_of_prev_app'))

# Rename column to avoid duplicate column name
joined_df_count_pre = joined_df_count_pre.withColumnRenamed('id_app', 'id_app_2')

app_df_1 = app_df_new.join(joined_df_count_pre, expr("""id_app == id_app_2"""), how = 'inner')\
                    .drop('id_app_2')

# Create 'num_of_approved_app' column
joined_df_count_approved = joined_df.filter(F.col('name_contract_status') == 'Approved').groupby('id_app')\
                                    .agg(F.count('id').alias('num_of_approved_app'))

# Rename column to avoid duplicate column name
joined_df_count_approved = joined_df_count_approved.withColumnRenamed('id_app', 'id_app_2')

app_df_2 = app_df_1.join(joined_df_count_approved, expr("""id_app == id_app_2"""), how = 'inner')\
                    .drop('id_app_2')

# Replace null value with 0
app_df_2 = app_df_2.withColumn('num_of_approved_app', F.when(F.col('num_of_approved_app').isNull(), 0).otherwise(F.col('num_of_approved_app')))

# Create 'total_credit' column
joined_df_total = joined_df.filter(F.col('name_contract_status') == 'Approved').groupBy('id_app')\
                            .agg(F.sum('amt_credit_1').alias('total_credit'))

# Rename column to avoid duplicate column name
joined_df_total = joined_df_total.withColumnRenamed('id_app', 'id_app_2')

app_df_3 = app_df_2.join(joined_df_total, expr("""id_app == id_app_2"""), how = 'inner')\
                    .drop('id_app_2')

# Replace null value with 0
app_df_3 = app_df_3.withColumn('total_credit', F.when(F.col('total_credit').isNull(), 0).otherwise(F.col('total_credit')))

# Create 'total_credit_to_income_ratio' column
app_df_4 = app_df_3.withColumn('total_credit_to_income_ratio', F.col('total_credit')/F.col('income_total'))

# Replace education_type
value_df_edu = value_df.filter(F.col('category') == 'education_type').select('value', 'key')
app_df_edu = replace_value(app_df_4, 'education_type', value_df_edu)

# Replace occupation_type
value_df_occ = value_df.filter(F.col('category') == 'occupation_type').select('value', 'key')
app_df_occ = replace_value(app_df_edu, 'occupation_type', value_df_occ)

# Replace income_type
value_df_in = value_df.filter(F.col('category') == 'income_type').select('value', 'key')
app_df_in = replace_value(app_df_occ, 'income_type', value_df_in)

# Replace family_status
value_df_fam = value_df.filter(F.col('category') == 'family_status').select('value', 'key')
app_df_fam = replace_value(app_df_in, 'family_status', value_df_fam)   

# Replace housing_type
value_df_housing = value_df.filter(F.col('category') == 'housing_type').select('value', 'key')
app_df_housing = replace_value(app_df_fam, 'housing_type', value_df_housing)

# Fill null with 0 for "amt_credit_req_last_year"
app_df_amt = app_df_housing.fillna(0, subset=['amt_credit_req_last_year'])

cols = ['gender', 'own_car', 'own_property', 'amt_credit', 'income_type', 'education_type', 'family_status', 'housing_type', 
        'flag_emp_phone', 'occupation_type','amt_credit_req_last_year', 'age_bucket', 'loan_to_income_ratio', 
        'credit_worthiness', 'total_credit', 'total_credit_to_income_ratio', 'work_year', 'num_of_prev_app',
        'num_of_approved_app', 'changephone_year', 'ts']

app_df_final = app_df_amt.select([x for x in cols]).na.drop()
app_df_final.printSchema()

root
 |-- gender: string (nullable = true)
 |-- own_car: string (nullable = true)
 |-- own_property: string (nullable = true)
 |-- amt_credit: float (nullable = true)
 |-- income_type: string (nullable = true)
 |-- education_type: string (nullable = true)
 |-- family_status: string (nullable = true)
 |-- housing_type: string (nullable = true)
 |-- flag_emp_phone: integer (nullable = true)
 |-- occupation_type: string (nullable = true)
 |-- amt_credit_req_last_year: float (nullable = false)
 |-- age_bucket: string (nullable = true)
 |-- loan_to_income_ratio: double (nullable = true)
 |-- credit_worthiness: string (nullable = true)
 |-- total_credit: double (nullable = true)
 |-- total_credit_to_income_ratio: double (nullable = true)
 |-- work_year: string (nullable = true)
 |-- num_of_prev_app: long (nullable = false)
 |-- num_of_approved_app: long (nullable = false)
 |-- changephone_year: string (nullable = true)
 |-- ts: timestamp (nullable = true)



6. Load your ML model, and use the model and Spark to perform the following:  
    a) Every 10 seconds, print the total number of applications and number of potential default applications (prediction = 1) in the last 1 minute.  
    b) Every 15 seconds, show the total requested credit (sum of credit where default=0) in the last 15 seconds.  
    c) Every 1 minute, show the top 10 largest loan applications with the potential of default.  

In [36]:
app_df_formatted = app_df_formatted.withColumnRenamed('target', 'prediction')

In [50]:
# 6a
from pyspark.sql.window import Window
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import expr

def show_result(df, epoch_id):
    df.show(truncate = False)
    
window_count_6a = app_df_formatted \
    .withWatermark('ts', '1 minute') \
    .groupBy(F.window('ts', '1 minute', '10 seconds')) \
    .agg(F.count('*').alias('total_number_of_applications'), 
        F.count(expr("CASE WHEN prediction = 1 THEN 1 END")).alias('number_of_potential_default_applications'))\
    .filter((F.col('window.end') >= current_timestamp() - F.expr('INTERVAL 1 MINUTE')) & 
            (F.col('window.end') <= current_timestamp())) \
    .orderBy('window')
    
query_6a = window_count_6a \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(show_result)\
    .trigger(processingTime = '10 seconds') \
    .start()

+------+----------------------------+----------------------------------------+
|window|total_number_of_applications|number_of_potential_default_applications|
+------+----------------------------+----------------------------------------+
+------+----------------------------+----------------------------------------+

+------------------------------------------+----------------------------+----------------------------------------+
|window                                    |total_number_of_applications|number_of_potential_default_applications|
+------------------------------------------+----------------------------+----------------------------------------+
|{2024-02-07 20:19:20, 2024-02-07 20:20:20}|116                         |6                                       |
+------------------------------------------+----------------------------+----------------------------------------+

+------------------------------------------+----------------------------+----------------------------------

In [51]:
query_6a.stop()

In [52]:
# 6b
from pyspark.sql.functions import sum, format_string
    
window_count_6b = app_df_formatted \
    .withWatermark('ts', '15 seconds') \
    .filter(F.col('prediction') == 0)\
    .groupBy(F.window('ts', '15 seconds')) \
    .agg(F.format_string('%.0f', F.sum(F.col('amt_credit'))).alias('total_requested_credit')) \
    .filter(F.col('window.end') >= current_timestamp() - F.expr('INTERVAL 15 SECONDS')) \
    .orderBy('window')

query_6b = window_count_6b \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(show_result)\
    .trigger(processingTime = '15 seconds') \
    .start()

+------+----------------------+
|window|total_requested_credit|
+------+----------------------+
+------+----------------------+

+------------------------------------------+----------------------+
|window                                    |total_requested_credit|
+------------------------------------------+----------------------+
|{2024-02-07 20:21:45, 2024-02-07 20:22:00}|182616683             |
+------------------------------------------+----------------------+

+------------------------------------------+----------------------+
|window                                    |total_requested_credit|
+------------------------------------------+----------------------+
|{2024-02-07 20:22:00, 2024-02-07 20:22:15}|287908250             |
+------------------------------------------+----------------------+

+------------------------------------------+----------------------+
|window                                    |total_requested_credit|
+------------------------------------------+---------

In [53]:
query_6b.stop()

In [54]:
window_spec = Window().orderBy(F.col("amt_credit").desc())

# The function ranks amt_credit
def rank_loan(df, epoch_id):
    sorted_df = df.withColumn('rank', F.rank().over(window_spec))\
                    .show(10)
                    
window_rank_6c = app_df_formatted\
    .withWatermark('ts', '1 minute')\
    .filter(F.col('prediction') == 1)\
    .select('ts', 'id_app', 'amt_credit')
  
query_6c = window_rank_6c\
    .writeStream\
    .outputMode("append")\
    .foreachBatch(rank_loan)\
    .trigger(processingTime = '1 minute')\
    .option("truncate", "False")\
    .start()

+---+------+----------+----+
| ts|id_app|amt_credit|rank|
+---+------+----------+----+
+---+------+----------+----+

+-------------------+------+----------+----+
|                 ts|id_app|amt_credit|rank|
+-------------------+------+----------+----+
|2024-02-07 20:23:30|378362| 2281500.0|   1|
|2024-02-07 20:23:10|376463| 1800000.0|   2|
|2024-02-07 20:23:05|375998| 1762110.0|   3|
|2024-02-07 20:23:50|380052| 1585224.0|   4|
|2024-02-07 20:23:35|378857| 1546020.0|   5|
|2024-02-07 20:23:55|380454| 1546020.0|   5|
|2024-02-07 20:23:20|377497| 1512796.5|   7|
|2024-02-07 20:23:50|379954| 1436850.0|   8|
|2024-02-07 20:23:35|378797| 1423584.0|   9|
|2024-02-07 20:23:25|377856| 1314000.0|  10|
+-------------------+------+----------+----+
only showing top 10 rows

+-------------------+------+----------+----+
|                 ts|id_app|amt_credit|rank|
+-------------------+------+----------+----+
|2024-02-07 20:24:46|384200| 1724688.0|   1|
|2024-02-07 20:24:20|382272| 1575000.0|   2|
|2

In [55]:
query_6c.stop()

In [None]:
spark.stop()

7. Write a Parquet file to store the following data:  
    a) Persist the raw data: Persist your prediction results along with application data and event_time in Parquet format; after that, read the Parquet file and show the first 10 records.  
    b) Persist the 15-second requested credit (6b) in another parquet file.


In [74]:
# 7a
query_app = app_df_formatted\
        .writeStream\
        .format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/app_formatted_df")\
        .option("checkpointLocation", "parquet/app_formatted_df/checkpoint")\
        .start()

In [76]:
query_app.stop()

In [58]:
query_app = spark.read.parquet("parquet/app_formatted_df")
query_app.printSchema()
query_app.show(10)

root
 |-- id_app: integer (nullable = true)
 |-- prediction: integer (nullable = true)
 |-- contract_type: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- own_car: string (nullable = true)
 |-- own_property: string (nullable = true)
 |-- num_of_children: integer (nullable = true)
 |-- income_total: float (nullable = true)
 |-- amt_credit: float (nullable = true)
 |-- amt_annuity: float (nullable = true)
 |-- amt_goods_price: float (nullable = true)
 |-- income_type: integer (nullable = true)
 |-- education_type: integer (nullable = true)
 |-- family_status: integer (nullable = true)
 |-- housing_type: integer (nullable = true)
 |-- region_population_relative: float (nullable = true)
 |-- days_birth: integer (nullable = true)
 |-- days_employed: integer (nullable = true)
 |-- own_car_age: float (nullable = true)
 |-- flag_mobile: integer (nullable = true)
 |-- flag_emp_phone: integer (nullable = true)
 |-- flag_work_phone: integer (nullable = true)
 |-- flag_cont_mo

In [78]:
# 7b
def write_df(df, epoch_id):
    df.write.mode("append").parquet("parquet/total_credit")
    
query_7b = window_count_6b\
        .writeStream\
        .foreachBatch(write_df)\
        .outputMode("complete")\
        .option("checkpointLocation", "parquet/total_credit/checkpoint")\
        .start()

In [81]:
query_7b.stop()

In [72]:
query_7b = spark.read.parquet("parquet/total_credit")
query_7b.printSchema()
query_7b.show(10, truncate = False)

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- total_requested_credit: string (nullable = true)

+------------------------------------------+----------------------+
|window                                    |total_requested_credit|
+------------------------------------------+----------------------+
|{2024-02-07 20:28:45, 2024-02-07 20:29:00}|685636929             |
|{2024-02-07 20:28:15, 2024-02-07 20:28:30}|627555038             |
|{2024-02-07 20:29:00, 2024-02-07 20:29:15}|102343950             |
|{2024-02-07 20:28:00, 2024-02-07 20:28:15}|422220402             |
|{2024-02-07 20:27:30, 2024-02-07 20:27:45}|419525330             |
|{2024-02-07 20:28:15, 2024-02-07 20:28:30}|627555038             |
|{2024-02-07 20:28:30, 2024-02-07 20:28:45}|275397813             |
|{2024-02-07 20:27:15, 2024-02-07 20:27:30}|615114050             |
|{2024-02-07 20:27:15, 2024-02-07 20:27:30}|161934777             

8. Read the two parquet files from 7a and 7b as a data stream, and send the records to two topics with appropriate names.  
(Note: You shall read the parquet files as a streaming data frame and send messages to the Kafka topic when new data appears in the parquet files. The parquet files serve as an intermediate storage in this use case.)

In [75]:
# Stream 1
from pyspark.sql.functions import to_json

hostip = 'kafka'
topic = 'application_pred_stream'

parquet_schema = StructType([
    StructField("id_app", IntegerType(), True),
    StructField("prediction", IntegerType(), True),
    StructField("contract_type", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("own_car", StringType(), True),
    StructField("own_property", StringType(), True),
    StructField("num_of_children", IntegerType(), True),
    StructField("income_total", FloatType(), True),
    StructField("amt_credit", FloatType(), True),
    StructField("amt_annuity", FloatType(), True),
    StructField("amt_goods_price", FloatType(), True),
    StructField("income_type", IntegerType(), True),
    StructField("education_type", IntegerType(), True),
    StructField("family_status", IntegerType(), True),
    StructField("housing_type", IntegerType(), True),
    StructField("region_population_relative", FloatType(), True),
    StructField("days_birth", IntegerType(), True),
    StructField("days_employed", IntegerType(), True),
    StructField("own_car_age", FloatType(), True),
    StructField("flag_mobile", IntegerType(), True),
    StructField("flag_emp_phone", IntegerType(), True),
    StructField("flag_work_phone", IntegerType(), True),
    StructField("flag_cont_mobile", IntegerType(), True),
    StructField("flag_phone", IntegerType(), True),
    StructField("flag_email", IntegerType(), True),
    StructField("occupation_type", IntegerType(), True),
    StructField("weekday_appr_process_start", StringType(), True),
    StructField("hour_app_process_start", IntegerType(), True),
    StructField("organization_type", IntegerType(), True),
    StructField("credit_score_1", FloatType(), True),
    StructField("credit_score_2", FloatType(), True),
    StructField("credit_score_3", FloatType(), True),
    StructField("days_last_phone_change", FloatType(), True),
    StructField("amt_credit_req_last_hour", FloatType(), True),
    StructField("amt_credit_req_last_day", FloatType(), True),
    StructField("amt_credit_req_last_week", FloatType(), True),
    StructField("amt_credit_req_last_month", FloatType(), True),
    StructField("amt_credit_req_last_quarter", FloatType(), True),
    StructField("amt_credit_req_last_year", FloatType(), True),
    StructField("ts", TimestampType(), True)
])

app_pred_df = spark \
    .readStream \
    .schema(parquet_schema)\
    .format("parquet")\
    .option("path", "parquet/app_formatted_df")\
    .option("maxFilesPerTrigger", 1) \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest")\
    .load()      

query_8a = app_pred_df \
    .select(to_json(F.struct("*")).alias('value')) \
    .writeStream \
    .format("kafka")\
    .outputMode("append") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("topic", topic) \
    .trigger(processingTime = '15 seconds') \
    .option("checkpointLocation", "checkpoint/app_formatted_df") \
    .start()

In [77]:
query_8a.stop()

In [79]:
# Stream 2

hostip = 'kafka'
topic = 'total_credit'

parquet_schema = StructType([
    StructField("window", StructType([
        StructField("start",  TimestampType(), True),
        StructField("end",  TimestampType(), True)
    ]), True),
    StructField("total_requested_credit", StringType(), True),
])

total_credit = spark \
    .readStream\
    .schema(parquet_schema) \
    .format("parquet") \
    .option("path", "parquet/total_credit") \
    .option("maxFilesPerTrigger", 1) \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

query_8b = total_credit \
    .select(to_json(F.struct("*")).alias('value')) \
    .writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("topic", topic) \
    .trigger(processingTime='15 seconds') \
    .option("checkpointLocation", "checkpoint/total_credit") \
    .start()

In [80]:
query_8b.stop()

In [None]:
spark.stop()