# End to End Pure Streaming Data-Pipeline for Rent Table Using Spark Structured Streaming on Databricks

###### Description: In this notebook we read rent state rows from incoming csv files into a streamig dataframe, transform (clean, cast, rename) the data, add/update the latest state to a Databricks Delta table
###### Objective: (incoming csv files) --> "rent_streamingDF" --> "results_df" --> "rent_data"

In [0]:
import requests
import json
import optimus as op
import phonenumbers 
import re
import datetime
import time

from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
from pyspark.sql.functions import rank, col

In [0]:
# Schema  for Rent
rent_schema = StructType([
            StructField("Rent_id", IntegerType(), False),
            StructField("Rent_fee", StringType(), True),
            StructField("Late_fee", StringType(), False),
            StructField("Due_date", TimestampType(), True),
            StructField("Lease_id", IntegerType(), True),
            StructField("Pay_id", IntegerType(), True),
            StructField("event_time", TimestampType(), True)])

rent_udf_schema = StructType([
            StructField("Rent_fee", StringType(), True),
            StructField("Late_fee", StringType(), False),
            StructField("Due_date", TimestampType(), True),
            StructField("Lease_id", IntegerType(), True),
            StructField("Pay_id", IntegerType(), True),
            StructField("event_time", TimestampType(), True)])

###### Description: Get rent csv files as a streaming "rent_streamingDF" and process it on the fly and get transformed stream "rent_df"
###### Objective: (incoming csv files) --> "rent_streamingDF" --> "rent_df"

In [0]:
# Get rent Steaming DataFrame from csv files

# streaming starts here by reading the input files 
rent_Path = "/FileStore/apartment/rent/inprogress/"
rent_streamingDF = (
  spark
    .readStream
    .schema(rent_schema)
    .option("maxFilesPerTrigger", "1")
    .option("header", "true")
    .option("multiLine", "true")
    .csv(rent_Path)
)
# Clear invalid rows
rent_df = rent_streamingDF.select("*").where("Rent_id IS NOT NULL")
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(rent_df)
# Replace NA with 0's
transformer.replace_na(0.0, columns="*")
# Clear accents: clear_accents only from name column and not everywhere 
transformer.clear_accents(columns='*')
# Remove special characters:  From all Columns 
# transformer.remove_special_chars(columns=['rent_name', 'Address_line_1', 'City', 'Post_code', 'Region'])

##### This function parses the corresponding columns into a single column

In [0]:
def my_fun(Rent_fee, Late_fee, Due_date, Lease_id, Pay_id, event_time):
  return zip(Rent_fee, Late_fee, Due_date, Lease_id, Pay_id, event_time)

udf_Fun = udf(my_fun, ArrayType(rent_udf_schema))

In [0]:
intermediate_df = ( rent_df.withWatermark("event_time", "10 seconds")
            .groupBy("Rent_id")
            .agg(F.collect_list("Rent_fee").alias("Rent_fee"),
                 F.collect_list("Late_fee").alias("Late_fee"),
                 F.collect_list("Due_date").alias("Due_date"),
                 F.collect_list("Lease_id").alias("Lease_id"),
                 F.collect_list("Pay_id").alias("Pay_id"),
                 F.collect_list("event_time").alias("event_time"), 
                 F.max("event_time").alias("latest_event_time"))
            .select("Rent_id", 
                    F.explode(udf_Fun(F.column("Rent_fee"), 
                                      F.column("Late_fee"), 
                                      F.column("Due_date"), 
                                      F.column("Lease_id"), 
                                      F.column("Pay_id"),
                                      F.column("event_time")))
                    .alias("data"), "latest_event_time"))

##### Filter the data where event_time is latest

In [0]:
results_df = (intermediate_df
              .select("Rent_id", 
                      "data.Rent_fee", 
                      "data.Late_fee", 
                      "data.Due_date", 
                      "data.Lease_id", 
                      "data.Pay_id", 
                      "data.event_time", 
                      "latest_event_time")
              .where("data.event_time=latest_event_time")).orderBy("Rent_id")

##### Display final result
###### This result shows the latest state of all the unique rent_id

In [0]:
display(results_df)

Rent_id,Rent_fee,Late_fee,Due_date,Lease_id,Pay_id,event_time,latest_event_time
1,$711.98,$1055.86,2017-10-21T02:21:09.000+0000,476,804,2018-06-10T17:43:39.000+0000,2018-06-10T17:43:39.000+0000
2,$556.18,$2126.44,2017-03-28T16:15:16.000+0000,494,306,2017-09-17T11:12:56.000+0000,2017-09-17T11:12:56.000+0000
3,$583.19,$2161.72,2017-03-04T00:27:23.000+0000,74,14,2017-01-30T17:42:08.000+0000,2017-01-30T17:42:08.000+0000
4,$535.95,$1096.05,2017-09-21T04:08:54.000+0000,720,718,2017-09-16T05:34:44.000+0000,2017-09-16T05:34:44.000+0000
5,$886.52,$1999.51,2017-06-15T03:54:13.000+0000,644,109,2017-10-06T22:53:17.000+0000,2017-10-06T22:53:17.000+0000
6,$920.80,$1001.71,2017-01-30T21:20:24.000+0000,456,534,2017-10-10T05:17:06.000+0000,2017-10-10T05:17:06.000+0000
7,$500.64,$1496.66,2017-08-14T07:12:30.000+0000,288,875,2017-09-02T20:48:38.000+0000,2017-09-02T20:48:38.000+0000
8,$613.34,$1899.32,2017-09-29T00:45:25.000+0000,763,28,2017-09-18T21:17:26.000+0000,2017-09-18T21:17:26.000+0000
9,$557.64,$2245.11,2017-07-08T00:27:54.000+0000,191,519,2017-01-07T23:35:28.000+0000,2017-01-07T23:35:28.000+0000
10,$981.88,$1293.81,2017-01-10T10:01:12.000+0000,282,676,2017-08-14T03:18:25.000+0000,2017-08-14T03:18:25.000+0000


##### Below cells are optional if external functionality or storage is needed

###### Write the stream to a Databricks Delta table for storage

In [0]:
streaming_query = (results_df.writeStream
 .format("delta")
 .outputMode("complete")
 .option("mergeSchema", "true")
 .option("checkpointLocation", "/delta/apartment/rent/_checkpoints/streaming-agg")
 .start("/delta/apartment/rent_data"))

#### Read the Delta Table as a Static or Streaming DataFrame
#### This dataframe will always be Up-To-Date

In [0]:
rent_data = spark.read.format("delta").load("/delta/apartment/rent_data").orderBy("Rent_id")

In [0]:
display(rent_data)

Rent_id,Rent_fee,Late_fee,Due_date,Lease_id,Pay_id,event_time,latest_event_time
1,$711.98,$1055.86,2017-10-21T02:21:09.000+0000,476,804,2018-06-10T17:43:39.000+0000,2018-06-10T17:43:39.000+0000
2,$556.18,$2126.44,2017-03-28T16:15:16.000+0000,494,306,2017-09-17T11:12:56.000+0000,2017-09-17T11:12:56.000+0000
3,$583.19,$2161.72,2017-03-04T00:27:23.000+0000,74,14,2017-01-30T17:42:08.000+0000,2017-01-30T17:42:08.000+0000
4,$535.95,$1096.05,2017-09-21T04:08:54.000+0000,720,718,2017-09-16T05:34:44.000+0000,2017-09-16T05:34:44.000+0000
5,$886.52,$1999.51,2017-06-15T03:54:13.000+0000,644,109,2017-10-06T22:53:17.000+0000,2017-10-06T22:53:17.000+0000
6,$920.80,$1001.71,2017-01-30T21:20:24.000+0000,456,534,2017-10-10T05:17:06.000+0000,2017-10-10T05:17:06.000+0000
7,$500.64,$1496.66,2017-08-14T07:12:30.000+0000,288,875,2017-09-02T20:48:38.000+0000,2017-09-02T20:48:38.000+0000
8,$613.34,$1899.32,2017-09-29T00:45:25.000+0000,763,28,2017-09-18T21:17:26.000+0000,2017-09-18T21:17:26.000+0000
9,$557.64,$2245.11,2017-07-08T00:27:54.000+0000,191,519,2017-01-07T23:35:28.000+0000,2017-01-07T23:35:28.000+0000
10,$981.88,$1293.81,2017-01-10T10:01:12.000+0000,282,676,2017-08-14T03:18:25.000+0000,2017-08-14T03:18:25.000+0000


### Do Some Live Streaming Graphs

In [0]:
rent_data_stream = spark.readStream.format("delta").load("/delta/apartment/rent_data")

In [0]:
display(rent_data_stream.groupBy("Lease_id").count())

Lease_id,count
471,1
626,1
984,1
973,1
476,1
140,1
912,1
442,1
94,1
676,1
