###### Description: In this notebook we read landlord state rows from incoming csv files into a streamig dataframe, transform (clean, cast, rename) the data, add/update the latest state to a static hive tabe
###### Objective: (incoming csv files) --> "landlord_streamingDF" --> "landlord_df" --> "landlord_data"

In [2]:
import requests
import json
import optimus as op
import phonenumbers 
import re
import datetime

from pyspark.sql.types import StringType, IntegerType, TimestampType, DateType, DoubleType, StructType, StructField
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
from pyspark.sql.functions import rank, col
import time

In [3]:
# Schema for building JSON
building_schema = StructType([
            StructField("Building_id", IntegerType(), False),
            StructField("Building_name", StringType(), True),
            StructField("Landlord_id", IntegerType(), False),
            StructField("Address_line_1", StringType(), False),
            StructField("City", StringType(), False),
            StructField("Post_code", StringType(), True),
            StructField("Region", StringType(), True),
            StructField("event_time", TimestampType(), True),
            StructField("fetch_time", StringType(), True)])

In [4]:
# function validating Post Code  
def validatePostCode(postCode):
  if (re.match(r"^[0-9]{5}(-[0-9]{4})?$", postCode)):
    return postCode
  else:
    return ""
  
# UDF for validatePostCode function  
udfValidatePostCode = udf(validatePostCode, StringType())

###### Description: Get landlord csv files as a streaming "landlord_streamingDF" and process it on the fly and get transformed stream "landlord_df"
###### Objective: (incoming csv files) --> "landlord_streamingDF" --> "landlord_df"

In [6]:
# Get Building Steaming DataFrame from csv files

# streaming starts here by reading the input files 
building_Path = "/FileStore/apartment/building/inprogress/"
building_streamingDF = (
  spark
    .readStream
    .schema(building_schema)
    .option("maxFilesPerTrigger", "1")
    .option("header", "true")
    .csv(building_Path)
)

# building_streamingDF = spark.read.format("csv").option("header", "true").load("/FileStore/apartment/building/inprogress/part-00000-tid-7368714421418765704-83751609-b4eb-4e45-9ebb-c4a51c35008f-21-c000.csv")

building_df = building_streamingDF.withColumn("Post_code", udfValidatePostCode("Post_code") )
# Instantiation of DataTransformer class:
transformer = op.DataFrameTransformer(building_df)
# Replace NA with 0's
transformer.replace_na(0.0, columns="*")
# Clear accents: clear_accents only from name column and not everywhere 
transformer.clear_accents(columns='*')
# Remove special characters:  From all Columns 
transformer.remove_special_chars(columns=['Building_name', 'Address_line_1', 'City', 'Region'])


In [7]:
display(building_df)

- ###### Now "landlord_df" contains pre-processed landlord state rows
- ###### After this point we need comparison
- ###### Stream-Stream subtraction is not supported
- ###### So we dump the incoming data to a query result "landlord_datalake" which will give updated resulsts upon request
- ###### "landlord_datalake" is not streaming but it will give updated results upon request
- ###### From "landlord_datalake" we filter out the unseen rows to "unseen_landlord_df"

In [9]:
building_datalake_query = building_df.writeStream.format("memory").queryName("building_datalake").start()

## Enter batch mode

###### Take a snapshot of the landlord_df where Seen = false
###### Add the fetch time columns to hive table landlord_seq_tracker

In [12]:
def getDelta_df(entity):
  
  #   Save snapshot of data into hive table to work with
  spark.sql("select * from " + entity + "_datalake").write.mode("overwrite").saveAsTable(entity + "_temp")
  #   Take snapshot
  datalake_snapshot = spark.sql("select * from " + entity + "_temp")
    
  if (len(spark.sql("show tables like '" + entity + "_tracker'").collect()) == 1):
    seq_tracker = spark.sql("select * from " + entity + "_tracker")
    datalake_eq = (( datalake_snapshot
                    .join(seq_tracker, seq_tracker.sequence == datalake_snapshot.fetch_time))
                   .drop("sequence").write.saveAsTable("temp_data"))

    spark.sql("refresh table temp_data")

    delta_df = datalake_snapshot.subtract(spark.sql("select * from temp_data"))
    
    delta_df.write.mode("overwrite").saveAsTable(entity + "_delta")
      
    delta_df.select(col("fetch_time").alias("sequence")).distinct().write.insertInto(entity + "_tracker")

    spark.sql("drop table temp_data")
  else:
    datalake_snapshot.write.saveAsTable(entity + "_delta")
    datalake_snapshot.select(col("fetch_time").alias("sequence")).distinct().write.saveAsTable(entity + "_tracker")    
  
  return spark.sql("select * from " + entity + "_delta")

In [13]:
def resetTrackingData(entity):
  if (len(spark.sql("show tables like '" + entity + "_delta'").collect()) == 1):
      spark.sql("drop table " + entity + "_delta")
      
  if (len(spark.sql("show tables like '" + entity + "_temp'").collect()) == 1):
      spark.sql("drop table " + entity + "_temp")
      
  if (len(spark.sql("show tables like '" + entity + "_tracker'").collect()) == 1):
      spark.sql("drop table " + entity + "_tracker")

In [14]:
resetTrackingData("building")

In [15]:
spark.sql("drop table building_data")

In [16]:
def getLastBuildingState_df():
  entity = "building"
  delta_df = getDelta_df(entity).drop("fetch_time")
  temp_state_df = ( delta_df.groupBy("Building_id").agg(F.max(delta_df.event_time))
                   .select(col("Building_id").alias("Building_id1"), col("max(event_time)").alias("event_time1")))
  delta_state_df = ( delta_df.join(temp_state_df,(delta_df.Building_id == temp_state_df.Building_id1) 
                                                & (delta_df.event_time == temp_state_df.event_time1))
                    .drop("Building_id1")
                    .drop("event_time1"))
  
  return delta_state_df

In [17]:
def replaceBuildingRows(update_df):
  for row in update_df.collect():
    spark.sql("delete from landlord_data where Building_id=" + str(row.Building_id))
  update_df.write.mode("append").saveAsTable("building_data")

In [18]:
building_df

In [19]:
def updateBuilding(new_State_df):
  
  spark.sql("create table if not exists building_data (Building_id int, Building_name string, Landlord_id int, Address_line_1 string, City string, Post_code string, Region string, event_time timestamp)")
  
  building_data_df = (spark.sql("select * from building_data")
                      .select(col("Building_id").alias("Building_id1"), 
                              col("Building_name").alias("Building_name1"), 
                              col("Landlord_id").alias("Landlord_id1"), 
                              col("Address_line_1").alias("Address_line_11"), 
                              col("City").alias("City1"), 
                              col("Post_code").alias("Post_code1"), 
                              col("Region").alias("Region1"), 
                              col("event_time").alias("event_time1")))
  
  update_rows_df = (building_data_df.join(state_df, (new_State_df.Building_id == building_data_df.Building_id1),  'outer')
             .select(new_State_df.Building_id, 
                     F.when(new_State_df.event_time > building_data_df.event_time1, new_State_df.Building_name)
                     .otherwise(building_data_df.Building_name1).alias("Password"), 
                     
                     F.when(new_State_df.event_time > building_data_df.event_time1, new_State_df.Landlord_id)
                     .otherwise(building_data_df.Landlord_id1).alias("Landlord_name"), 
                     
                     F.when(new_State_df.event_time > building_data_df.event_time1, new_State_df.Address_line_1)
                     .otherwise(building_data_df.Address_line_11).alias("Address_line_1"), 
                     
                     F.when(new_State_df.event_time > building_data_df.event_time1, new_State_df.City)
                     .otherwise(building_data_df.City1).alias("City"), 
                     
                     F.when(new_State_df.event_time > building_data_df.event_time1, new_State_df.Post_code)
                     .otherwise(building_data_df.Post_code1).alias("Post_code"), 
                     
                     F.when(new_State_df.event_time > building_data_df.event_time1, new_State_df.Region)
                     .otherwise(building_data_df.Region1).alias("Region"), 
                     
                     F.when(new_State_df.event_time > building_data_df.event_time1, new_State_df.event_time)
                     .otherwise(building_data_df.event_time1).alias("event_time")))
  
  new_ids_df = (new_State_df.select("Building_id").subtract(update_rows_df.select("Building_id"))
                .distinct().select(col("Building_id").alias("Building_id1")))
  
  new_rows_df = (new_State_df.join(new_ids_df, (new_State_df.Building_id == new_ids_df.Building_id1), "outer")
                 .drop("Building_id1")
                 .distinct())
  
  new_rows_df.write.insertInto("building_data")
  
#   replaceBuildingRows(update_rows_df)

In [20]:
state_df = getLastBuildingState_df()

In [21]:
display(state_df.orderBy("Building_id"))

In [22]:
updateBuilding(state_df)

In [23]:
display(spark.sql("select * from building_data").orderBy("Building_id"))