# ETL-UnStacking tranforming rental data into high dimension.

This ETL process reads NSW Rental Bonds information publically available and does following
1. Cleans the Dataset by removing unwanted columns.
2. Removes rows which are NULL
3. For Categorical Fields like PremisesDwellingType and Premises_Suburb converts empty fields as UNKNOWN
4. Data cleaning, convert timestamp field as epoch or specific data types
5. Add new columns like MonthName, DayStatus (Early if it within 10 days or Late if it was after 10 days)
6. Added Fields like SchoolAdmit ie months when School Admission starts which drives Rental and Occpancy
7. However, instead of just storing the cleanized data for further analyzes. It does following
   7.1 Identifies Time Dimension, Categorical Variables, Response Variables and Contributing Factors.
   7.2 For Each Categorical Variables ie ("PremisesDwellingType", "NumberBedrooms", "Premises_Postcode", "Premises_Suburb") we pin the following :  
       a. Time Dimension (Time when the tenancy and bond application was lodged)
       b. Contributing Factors (Was it a holiday season or school admission period which triggered this process )
       c. Response Variable e.g for "BondAmount" it is amount Bond deposit.
8. Eventually all these unstacked data is stored back

This converts the dataset into a high dimensions so that analytical system can easily analyze.

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

glueContext = GlueContext(SparkContext.getOrCreate())

In [None]:
nsw_rental_history = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://nsw_services/dept_finance/nsw_rental/fa13-2016-17.parquet')

df_rent = nsw_rental_history.toDF()

In [3]:
df_rent.printSchema()

root
 |-- DateTenancyCommenced: string (nullable = true)
 |-- DateLodgement: string (nullable = true)
 |-- BondAmount: string (nullable = true)
 |-- PremisesWeeklyRent: string (nullable = true)
 |-- PremisesDwellingType: string (nullable = true)
 |-- NumberBedrooms: string (nullable = true)
 |-- Premises_Postcode: string (nullable = true)
 |-- Premises_Suburb: string (nullable = true)
 |-- Released: string (nullable = true)



In [4]:
df_rent.select("Released").distinct().show()

+--------+
|Released|
+--------+
|    null|
+--------+



In [5]:
df_rent = df_rent.drop("Released")

In [6]:
df_rent.groupby("PremisesDwellingType").count().collect()

[Row(PremisesDwellingType='F', count=365256),
 Row(PremisesDwellingType='T', count=69664),
 Row(PremisesDwellingType='B', count=83),
 Row(PremisesDwellingType='O', count=26533),
 Row(PremisesDwellingType='C', count=1),
 Row(PremisesDwellingType='S', count=7),
 Row(PremisesDwellingType=' ', count=1935),
 Row(PremisesDwellingType='NULL', count=169715),
 Row(PremisesDwellingType='H', count=280548)]

In [7]:
from pyspark.sql.functions import udf
from datetime import datetime
from pyspark.sql import functions as func

In [8]:
# UDF
clean_f = udf(lambda x: float(x[1:].replace(",","")), FloatType())
unk_f = udf(lambda x: 'Unk' if len(x.strip()) == 0 else x)
epoch_f = udf(lambda x: int(datetime.strptime(x,"%d/%m/%Y").strftime("%s")), LongType())
int_f = udf(lambda x: int(x), IntegerType())
month_f = udf(lambda x: datetime.fromtimestamp(x).strftime("%b"))
day_f = udf(lambda x: "Early" if datetime.fromtimestamp(x).strftime("%d")[0] == "0" else "Late")
admit_f = udf(lambda x: 1 if datetime.fromtimestamp(x).strftime("%m") in ["11", "12", "01", "02", "03"] else 0, IntegerType() ) 
holiday_f = udf(lambda x: 1 if datetime.fromtimestamp(x).strftime("%m") in ["11", "12"] else 0, IntegerType() )

In [9]:
df_rent = df_rent.where((df_rent.DateTenancyCommenced != "NULL") & 
                        (df_rent.DateLodgement != "NULL") & (df_rent.BondAmount != "NULL") &
                        (df_rent.PremisesWeeklyRent != "NULL") & (df_rent.PremisesDwellingType != "NULL") &
                        (df_rent.NumberBedrooms != "NULL") & 
                        (df_rent.Premises_Postcode != "NULL") &
                        (df_rent.Premises_Suburb != "NULL")
                       )

In [10]:
df_rent.count()

642024

In [11]:
df_rent = df_rent.withColumn("BondAmount", clean_f(df_rent.BondAmount))\
                 .withColumn("PremisesWeeklyRent", clean_f(df_rent.PremisesWeeklyRent))\
                 .withColumn("Premises_Postcode", int_f(df_rent.Premises_Postcode))

In [12]:
df_rent = df_rent.withColumn("NumberBedrooms", int_f(df_rent.NumberBedrooms))

In [13]:
df_rent = df_rent.withColumn("DateTenancyCommenced", epoch_f(df_rent.DateTenancyCommenced))

df_rent = df_rent.withColumn("DateLodgement", epoch_f(df_rent.DateLodgement))

df_rent = df_rent.withColumn("LengthOccupancy", (df_rent.DateLodgement - df_rent.DateTenancyCommenced))

df_rent = df_rent.withColumn("MonthName", month_f(df_rent.DateLodgement))

df_rent = df_rent.withColumn("DayStatus", day_f(df_rent.DateLodgement))

df_rent = df_rent.withColumn("SchoolAdmit", admit_f(df_rent.DateLodgement))

df_rent = df_rent.withColumn("Holiday", holiday_f(df_rent.DateLodgement))

df_rent = df_rent.withColumn("PremisesDwellingType", unk_f(df_rent.PremisesDwellingType)).withColumn("Premises_Suburb", unk_f(df_rent.Premises_Suburb))

In [28]:
df_rent.printSchema()

root
 |-- DateTenancyCommenced: long (nullable = true)
 |-- DateLodgement: long (nullable = true)
 |-- BondAmount: float (nullable = true)
 |-- PremisesWeeklyRent: float (nullable = true)
 |-- PremisesDwellingType: string (nullable = true)
 |-- NumberBedrooms: integer (nullable = true)
 |-- Premises_Postcode: integer (nullable = true)
 |-- Premises_Suburb: string (nullable = true)
 |-- LengthOccupancy: long (nullable = true)
 |-- MonthName: string (nullable = true)
 |-- DayStatus: string (nullable = true)
 |-- SchoolAdmit: integer (nullable = true)
 |-- Holiday: integer (nullable = true)



In [15]:
df_rent.show(n=10)

+--------------------+-------------+----------+------------------+--------------------+--------------+-----------------+--------------------+---------------+---------+---------+-----------+-------+
|DateTenancyCommenced|DateLodgement|BondAmount|PremisesWeeklyRent|PremisesDwellingType|NumberBedrooms|Premises_Postcode|     Premises_Suburb|LengthOccupancy|MonthName|DayStatus|SchoolAdmit|Holiday|
+--------------------+-------------+----------+------------------+--------------------+--------------+-----------------+--------------------+---------------+---------+---------+-----------+-------+
|          1369008000|   1371427200|    1920.0|             480.0|                   H|             4|             2489|    POTTSVILLE BEACH|        2419200|      Jun|     Late|          0|      0|
|          1369180800|   1371427200|    2080.0|             520.0|                   H|             3|             2487|           CASUARINA|        2246400|      Jun|     Late|          0|      0|
|         

In [16]:
df_rent.groupby("PremisesDwellingType").count().collect()

[Row(PremisesDwellingType='F', count=313463),
 Row(PremisesDwellingType='Unk', count=582),
 Row(PremisesDwellingType='T', count=62793),
 Row(PremisesDwellingType='B', count=1),
 Row(PremisesDwellingType='O', count=19777),
 Row(PremisesDwellingType='C', count=1),
 Row(PremisesDwellingType='S', count=7),
 Row(PremisesDwellingType='H', count=245400)]

In [17]:
cat_vars = ["PremisesDwellingType", "NumberBedrooms", "Premises_Postcode", "Premises_Suburb"]
time_vars = ["DateTenancyCommenced", "DateLodgement"]
contrib_vars = ["MonthName", "DayStatus", "SchoolAdmit"]
response_vars = ["BondAmount", "PremisesWeeklyRent" , "LengthOccupancy"]

In [23]:
def datapoints(cat, df_rent, interested_coln):
    hmap_df = {}
    df_dwelling_ = df_rent.groupBy(cat).agg(*[f.collect_list(c).alias(c) for c in interested_coln])
    for c in interested_coln:
        print("processing ..." + c)
        df_dwelling_1 = df_dwelling_.select(c, cat).withColumn(c, f.explode(c)).withColumn("id", f.monotonically_increasing_id())
        hmap_df.setdefault(c, df_dwelling_1)
    
    df_dwell = None
    for c in hmap_df:
        if df_dwell == None:
            df_dwell = hmap_df[c]
            continue
        df_dwell = df_dwell.join(hmap_df[c].drop(cat), "id", "outer")
        
    
    df_dwell = df_dwell.withColumn("FactorName", f.lit(cat)).withColumnRenamed(cat, "FactorValue")
    return df_dwell

def response(cat, cresp, df_rent, df_dwell):
    df_dwelling_ = df_rent.groupBy(cat).agg(f.collect_list(cresp).alias(cresp))
    df_dwelling_1 = df_dwelling_.select(cresp).withColumn(cresp, f.explode(cresp)).withColumn("id", f.monotonically_increasing_id())
    df_dwell_1 = df_dwell.join(df_dwelling_1, "id", "outer")
    df_dwell_1 = df_dwell_1.withColumn("ResponseName", f.lit(cresp)).withColumnRenamed(cresp, "ResponseValue")
    
    return df_dwell_1

In [19]:
df_dwell = datapoints("PremisesDwellingType", df_rent, 
                      ["DateTenancyCommenced", "DateLodgement", "MonthName", "DayStatus", "SchoolAdmit"])

processing ...DateTenancyCommenced
processing ...DateLodgement
processing ...MonthName
processing ...DayStatus
processing ...SchoolAdmit


In [20]:
df_dwell.show(n=10)

+------------+--------------------+-----------+-------------+---------+---------+-----------+--------------------+
|          id|DateTenancyCommenced|FactorValue|DateLodgement|MonthName|DayStatus|SchoolAdmit|          FactorName|
+------------+--------------------+-----------+-------------+---------+---------+-----------+--------------------+
|171798692310|          1370649600|          F|   1371513600|      Jun|     Late|          0|PremisesDwellingType|
|171798692537|          1371254400|          F|   1371513600|      Jun|     Late|          0|PremisesDwellingType|
|171798692967|          1369958400|          F|   1371686400|      Jun|     Late|          0|PremisesDwellingType|
|171798693847|          1370649600|          F|   1372032000|      Jun|     Late|          0|PremisesDwellingType|
|171798693936|          1371254400|          F|   1372118400|      Jun|     Late|          0|PremisesDwellingType|
|171798694097|          1371859200|          F|   1372118400|      Jun|     Late

In [21]:
df_dwell_bond = response("PremisesDwellingType", "BondAmount", df_rent, df_dwell)
df_dwell_bond.show(n=10)

+------------+--------------------+-----------+-------------+---------+---------+-----------+--------------------+-------------+------------+
|          id|DateTenancyCommenced|FactorValue|DateLodgement|MonthName|DayStatus|SchoolAdmit|          FactorName|ResponseValue|ResponseName|
+------------+--------------------+-----------+-------------+---------+---------+-----------+--------------------+-------------+------------+
|171798692310|          1370649600|          F|   1371513600|      Jun|     Late|          0|PremisesDwellingType|       2400.0|  BondAmount|
|171798692537|          1371254400|          F|   1371513600|      Jun|     Late|          0|PremisesDwellingType|       1040.0|  BondAmount|
|171798692967|          1369958400|          F|   1371686400|      Jun|     Late|          0|PremisesDwellingType|       1120.0|  BondAmount|
|171798693847|          1370649600|          F|   1372032000|      Jun|     Late|          0|PremisesDwellingType|       1940.0|  BondAmount|
|17179

In [24]:
stack_df_rent  = {}
keep_coln = ["DateTenancyCommenced", "DateLodgement", "MonthName", "DayStatus", "SchoolAdmit"]
for c in ["PremisesDwellingType", "NumberBedrooms", "Premises_Postcode", "Premises_Suburb"]:
    df_dwell = datapoints(c, df_rent, keep_coln)
    df_rent_resp_full = None
    for r in ["BondAmount", "PremisesWeeklyRent" , "LengthOccupancy"]:
        df_rent_resp = response(c, r, df_rent, df_dwell)
        if df_rent_resp_full == None:
            df_rent_resp_full = df_rent_resp
        else:
            df_rent_resp_full = df_rent_resp_full.union(df_rent_resp)
    stack_df_rent.setdefault(c, df_rent_resp_full)

processing ...DateTenancyCommenced
processing ...DateLodgement
processing ...MonthName
processing ...DayStatus
processing ...SchoolAdmit
processing ...DateTenancyCommenced
processing ...DateLodgement
processing ...MonthName
processing ...DayStatus
processing ...SchoolAdmit
processing ...DateTenancyCommenced
processing ...DateLodgement
processing ...MonthName
processing ...DayStatus
processing ...SchoolAdmit
processing ...DateTenancyCommenced
processing ...DateLodgement
processing ...MonthName
processing ...DayStatus
processing ...SchoolAdmit


In [25]:
for cat_name in stack_df_rent:
    stack_df_rent[cat_name] = stack_df_rent[cat_name].drop("id")
    glue_df_rent_frame = DynamicFrame.fromDF(stack_df_rent[cat_name], glueContext, "nested")
    glueContext.write_dynamic_frame.from_options(
       frame = glue_df_rent_frame,
       connection_type = "s3",
       connection_options = {"path": "s3://etl-rental/"+cat_name+"/" + cat_name+"_"+"rental_parquet"},
       format = "parquet")