In [1]:
#1
#import libraries
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.functions import countDistinct
#Two local core
master = "local[2]"
app_name = "Assignment2B"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)
spark_conf.set("spark.sql.session.timeZone", "UTC")
sc = SparkContext.getOrCreate(spark_conf)
spark = SparkSession(sc)
sc.setLogLevel('ERROR')

In [2]:
#2
#Using same topic from Task 1
topic = "bureau"
df_bureau = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

topic = "customer"
df_customer = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [3]:
#Converting the key/value from the kafka data stream to string
df_bureau = df_bureau.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df_customer = df_customer.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [4]:
#3
#Define a schema according to our data
schema_bureau = ArrayType(StructType([    
    StructField('ID', StringType(), True), 
    StructField('SELF-INDICATOR', StringType(), True),    
    StructField('MATCH-TYPE', StringType(), True),
    StructField('ACCT-TYPE', StringType(), True),    
    StructField('CONTRIBUTOR-TYPE', StringType(), True),
    StructField('DATE-REPORTED', StringType(), True), 
    StructField('OWNERSHIP-IND', StringType(), True),    
    StructField('ACCOUNT-STATUS', StringType(), True),
    StructField('DISBURSED-DT', StringType(), True), 
    StructField('CLOSE-DT', StringType(), True),    
    StructField('LAST-PAYMENT-DATE', StringType(), True),
    StructField('CREDIT-LIMIT/SANC AMT', StringType(), True), 
    StructField('DISBURSED-AMT/HIGH CREDIT', StringType(), True),    
    StructField('INSTALLMENT-AMT', StringType(), True),
    StructField('CURRENT-BAL', StringType(), True), 
    StructField('INSTALLMENT-FREQUENCY', StringType(), True),    
    StructField('OVERDUE-AMT', StringType(), True),
    StructField('WRITE-OFF-AMT', StringType(), True),
    StructField('ASSET_CLASS', StringType(), True),    
    StructField('REPORTED DATE - HIST', StringType(), True),
    StructField('DPD - HIST', StringType(), True),    
    StructField('CUR BAL - HIST', StringType(), True),
    StructField('AMT OVERDUE - HIST', StringType(), True), 
    StructField('AMT PAID - HIST', StringType(), True),    
    StructField('TENURE', StringType(), True), 
    StructField('ts', TimestampType(), True) 
    
]))

schema_customer = ArrayType(StructType([    
    StructField('ID', StringType(), True), 
    StructField('InstlmentMode', StringType(), True),    
    StructField('LoanStatus', StringType(), True),
    StructField('PaymentMode', StringType(), True),    
    StructField('BranchID', StringType(), True),
    StructField('Area', StringType(), True), 
    StructField('Tenure', StringType(), True),    
    StructField('AssetCost', StringType(), True),
    StructField('AmountFinance', StringType(), True), 
    StructField('DisbursalAmount', StringType(), True),    
    StructField('EMI', StringType(), True),
    StructField('DisbursalDate', StringType(), True), 
    StructField('MaturityDAte', StringType(), True),    
    StructField('AuthDate', StringType(), True),
    StructField('AssetID', StringType(), True), 
    StructField('ManufacturerID', StringType(), True),    
    StructField('SupplierID', StringType(), True),
    StructField('LTV', StringType(), True),
    StructField('SEX', StringType(), True),    
    StructField('AGE', StringType(), True),
    StructField('MonthlyIncome', StringType(), True),    
    StructField('City', StringType(), True),
    StructField('State', StringType(), True), 
    StructField('ZiPCODE', StringType(), True),    
    StructField('Top-up Month', StringType(), True), 
    StructField('ts', TimestampType(), True) 
    
]))

In [5]:
#Parsed schema
df_bureau=df_bureau.select(F.from_json(F.col("value").cast("string"), schema_bureau).alias('parsed_value'))
df_bureau = df_bureau.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  
df_customer = df_customer.select(F.from_json(F.col("value").cast("string"), schema_customer).alias('parsed_value'))
df_customer = df_customer.select(F.explode(F.col("parsed_value")).alias('unnested_value'))

In [6]:
#Unnested the schema
df_formatted_bureau = df_bureau.select(
                    F.col("unnested_value.ID").alias("ID1"),
                    F.col("unnested_value.SELF-INDICATOR").alias("SELF-INDICATOR"),
                    F.col("unnested_value.MATCH-TYPE").alias("MATCH-TYPE"),
                    F.col("unnested_value.ACCT-TYPE").alias("ACCT-TYPE"),
                    F.col("unnested_value.CONTRIBUTOR-TYPE").alias("CONTRIBUTOR-TYPE"),
                    F.col("unnested_value.DATE-REPORTED").alias("DATE-REPORTED"),
                    F.col("unnested_value.OWNERSHIP-IND").alias("OWNERSHIP-IND"),
                    F.col("unnested_value.ACCOUNT-STATUS").alias("ACCOUNT-STATUS"),
                    F.col("unnested_value.DISBURSED-DT").alias("DISBURSED-DT"),
                    F.col("unnested_value.CLOSE-DT").alias("CLOSE-DT"),
                    F.col("unnested_value.LAST-PAYMENT-DATE").alias("LAST-PAYMENT-DATE"),
                    F.col("unnested_value.CREDIT-LIMIT/SANC AMT").alias("CREDIT-LIMIT/SANC AMT"),
                    F.col("unnested_value.DISBURSED-AMT/HIGH CREDIT").alias("DISBURSED-AMT/HIGH CREDIT"),
                    F.col("unnested_value.INSTALLMENT-AMT").alias("INSTALLMENT-AMT"),
                    F.col("unnested_value.CURRENT-BAL").alias("CURRENT-BAL"),
                    F.col("unnested_value.INSTALLMENT-FREQUENCY").alias("INSTALLMENT-FREQUENCY"),
                    F.col("unnested_value.OVERDUE-AMT").alias("OVERDUE-AMT"),
                    F.col("unnested_value.WRITE-OFF-AMT").alias("WRITE-OFF-AMT"),
                    F.col("unnested_value.ASSET_CLASS").alias("ASSET_CLASS"),
                    F.col("unnested_value.REPORTED DATE - HIST").alias("REPORTED DATE - HIST"),
                    F.col("unnested_value.DPD - HIST").alias("DPD - HIST"),
                    F.col("unnested_value.CUR BAL - HIST").alias("CUR BAL - HIST"),
                    F.col("unnested_value.AMT OVERDUE - HIST").alias("AMT OVERDUE - HIST"),
                    F.col("unnested_value.AMT PAID - HIST").alias("AMT PAID - HIST"),
                    F.col("unnested_value.TENURE").alias("TENURE"),
                    F.col("unnested_value.ts").alias("ts")
                )

df_formatted_customer = df_customer.select(
                    F.col("unnested_value.ID").alias("ID"),
                    F.col("unnested_value.InstlmentMode").alias("InstlmentMode"),
                    F.col("unnested_value.LoanStatus").alias("LoanStatus"),
                    F.col("unnested_value.PaymentMode").alias("PaymentMode"),
                    F.col("unnested_value.BranchID").alias("BranchID"),
                    F.col("unnested_value.Area").alias("Area"),
                    F.col("unnested_value.Tenure").alias("Tenure"),
                    F.col("unnested_value.AssetCost").alias("AssetCost"),
                    F.col("unnested_value.AmountFinance").alias("AmountFinance"),
                    F.col("unnested_value.DisbursalAmount").alias("DisbursalAmount"),
                    F.col("unnested_value.EMI").alias("EMI"),
                    F.col("unnested_value.DisbursalDate").alias("DisbursalDate"),
                    F.col("unnested_value.MaturityDAte").alias("MaturityDAte"),
                    F.col("unnested_value.AuthDate").alias("AuthDate"),
                    F.col("unnested_value.AssetID").alias("AssetID"),
                    F.col("unnested_value.ManufacturerID").alias("ManufacturerID"),
                    F.col("unnested_value.SupplierID").alias("SupplierID"),
                    F.col("unnested_value.LTV").alias("LTV"),
                    F.col("unnested_value.SEX").alias("SEX"),
                    F.col("unnested_value.AGE").alias("AGE"),
                    F.col("unnested_value.MonthlyIncome").alias("MonthlyIncome"),
                    F.col("unnested_value.City").alias("City"),
                    F.col("unnested_value.State").alias("State"),
                    F.col("unnested_value.ZiPCODE").alias("ZiPCODE"),
                    F.col("unnested_value.Top-up Month").alias("Top-up Month"),
                    F.col("unnested_value.ts").alias("time")
                )

In [7]:
#Assign watermark and replace value in column SELF-INDICATOR
df_formatted_bureau = df_formatted_bureau.withWatermark("ts", "5 seconds")
df_formatted_customer= df_formatted_customer.withWatermark("time", "5 seconds")
df_formatted_bureau = df_formatted_bureau.withColumn('SELF-INDICATOR', regexp_replace('SELF-INDICATOR', 'FALSE', '0'))\
    .withColumn('SELF-INDICATOR', regexp_replace('SELF-INDICATOR', 'TRUE', '1'))

In [8]:
#Change selected column datatype to integer
df_formatted_bureau = df_formatted_bureau.withColumn("CREDIT-LIMIT/SANC AMT",col("CREDIT-LIMIT/SANC AMT").cast(IntegerType()))\
    .withColumn("INSTALLMENT-AMT",col("INSTALLMENT-AMT").cast(IntegerType()))\
    .withColumn("SELF-INDICATOR",col("SELF-INDICATOR").cast(IntegerType()))\
    .withColumn("CURRENT-BAL",col("CURRENT-BAL").cast(IntegerType()))\
    .withColumn("OVERDUE-AMT",col("OVERDUE-AMT").cast(IntegerType())) \
    .withColumn("WRITE-OFF-AMT",col("WRITE-OFF-AMT").cast(IntegerType()))\
    .withColumn("TENURE",col("TENURE").cast(IntegerType()))

df_formatted_customer = df_formatted_customer.withColumn("AssetCost",col("AssetCost").cast(IntegerType()))\
    .withColumn("DisbursalAmount",col("DisbursalAmount").cast(IntegerType()))\
    .withColumn("EMI",col("EMI").cast(IntegerType()))\
    .withColumn("MonthlyIncome",col("MonthlyIncome").cast(IntegerType()))

In [9]:
#4 Groupby bureau stream
#Replace SELF-INDICATOR value False to 0, True to 1
df_formatted_bureau = df_formatted_bureau.withColumn('SELF-INDICATOR', regexp_replace('SELF-INDICATOR', 'FALSE', '0'))\
    .withColumn('SELF-INDICATOR', regexp_replace('SELF-INDICATOR', 'TRUE', '1'))

#Groupby on ID with window 30 seconds, 
windowedCounts = df_formatted_bureau.groupBy(window(df_formatted_bureau.ts,"30 seconds"),df_formatted_bureau.ID1)\
    .agg(F.sum("CREDIT-LIMIT/SANC AMT").alias("CREDIT-LIMIT/SANC AMT_sum"), 
     F.sum("INSTALLMENT-AMT").alias("INSTALLMENT-AMT_sum"),
     F.sum("DISBURSED-AMT/HIGH CREDIT").alias("DISBURSED-AMT/HIGH CREDIT_sum"),
     F.sum("CURRENT-BAL").alias("CURRENT-BAL_sum"), 
     F.sum("OVERDUE-AMT").alias("OVERDUE-AMT_sum"),
     F.sum("TENURE").alias("TENURE_sum"),
     F.sum('SELF-INDICATOR').alias("SELF-INDICATOR_sum"),
     F.count('MATCH-TYPE').alias("MATCH-TYPE_dist"),
     F.count('ACCT-TYPE').alias("ACCT-TYPE_dist"),
     F.count('CONTRIBUTOR-TYPE').alias("CONTRIBUTOR-TYPE_dist"),
     F.count('DATE-REPORTED').alias("DATE-REPORTED_dist"),
     F.count('OWNERSHIP-IND').alias("OWNERSHIP-IND_dist"),
     F.count('ACCOUNT-STATUS').alias("ACCOUNT-STATUS_dist"),
     F.count('DISBURSED-DT').alias("DISBURSED-DT_dist"),
     F.count('CLOSE-DT').alias("CLOSE-DT_dist"),
     F.count('LAST-PAYMENT-DATE').alias("LAST-PAYMENT-DATE_dist"),
     F.count('INSTALLMENT-FREQUENCY').alias("INSTALLMENT-FREQUENCY_dist"),
     F.count('ASSET_CLASS').alias("ASSET_CLASS_dist"),
     F.count('REPORTED DATE - HIST').alias("REPORTED DATE - HIST_dist"),
     F.count('DPD - HIST').alias("DPD - HIST_dist"),
     F.count('CUR BAL - HIST').alias("CUR BAL - HIST_dist"),
     F.count('AMT OVERDUE - HIST').alias("AMT OVERDUE - HIST_dist"),
     F.count('AMT PAID - HIST').alias("AMT PAID - HIST_dist"))\
    .select('window.start','window.end',('*'))\
    .drop('window')

In [10]:
#5 Inner join 2 stream on id and if customer ID recived between start and end time of bureau with delay 1 hour
joined = df_formatted_customer.join(windowedCounts,
  expr("""
    ID1 = ID AND
    time >= start AND
    time <= end
    """)
)

In [11]:
#Create new column named window_start and window_end
joined = joined.withColumnRenamed("start","window_start") \
    .withColumnRenamed("end","window_end")\
    .withColumnRenamed("time","ts")\
    .withColumnRenamed("Top-up Month","Top-up_Month")

In [12]:
#Select column only 'ID','window_start','window_end','ts','Top-up_Month'
joined_s = joined.select('ID','window_start','window_end','ts','Top-up_Month')

In [13]:
#Testing
query = joined_s\
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .option("truncate","false")\
    .start()

In [14]:
query.stop()

In [15]:
#6 persist the above result in parquet format.
query_file_sink = joined_s.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/joined_df")\
        .option("checkpointLocation", "parquet/joined_df/checkpoint")\
        .start()

In [16]:
#7 Loading the model
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load('topup_pipeline_model')

In [17]:
#fill na with 0 to avoid error
joined = joined.fillna(0)
predDF = pipelineModel.transform(joined)
joined_model = predDF.select('ID','window_start','window_end','ts','prediction','Top-up_Month')

In [18]:
#Testing
query1 = joined_model\
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .option("truncate","false")\
    .start()

In [19]:
query1.stop()

In [20]:
#persist the above result in parquet format.
query_file_sink2 = joined_model.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/joined_model")\
        .option("checkpointLocation", "parquet/joined_model/checkpoint")\
        .start()