In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from delta import *
import logging

In [2]:
# Parameters
city = 'bern'

In [9]:
# Paths
SILVER_TABLE_PATH = '/opt/data_lake/silver/house_prices_merged'
GOLDEN_TABLE_PATH = '/opt/data_lake/golden/house_prices'

In [10]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

In [6]:
logger.info(f"Cleaning silver dataset to golden dataset ...")

2022-07-09 23:23:19,651 [INFO] Cleaning silver dataset to golden dataset ...


In [7]:
# Create Spark Session
builder = (
    SparkSession
    .builder
    .appName("real-estate-etl")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()



:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1b15a8df-060f-4a74-a7b5-a54677519f36;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 3331ms :: artifacts dl 180ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-r

In [11]:
# Load silver dataset
silver_data = (
    spark
    .read
    .format('delta')
    .load(SILVER_TABLE_PATH)
    .filter(F.col('city') == city)
)

logger.info(f"{silver_data.count()} rows before cleaninig the data")

2022-07-09 23:25:36,052 [INFO] 48 rows before cleaninig the data    (2 + 2) / 4]
                                                                                

In [12]:
# Fix values
# If the house does not have a price, i'll remove it

SELECTED_COLUMNS = ['property_id', 'city', 'attributes_inside_attic', 'attributes_inside_cellar', 'attributes_technology_dishwasher',
                    'attributes_technology_cable_tv', 'attributes_outside_balcony', 'attributes_outside_playground',
                    'attributes_outside_parking', 'attributes_outside_garage', 'number_of_rooms', 'surface_property', 
                    'surface_usable', 'surface_living', 'normalized_price']

NUMERIC_COLUMNS = ['number_of_rooms', 'surface_property', 'surface_usable', 'surface_living']

golden_data = (
    silver_data
    .filter(F.col('normalized_price').isNotNull())
    .select(SELECTED_COLUMNS)
    .withColumn('attributes_inside_attic', F.coalesce(F.col('attributes_inside_attic'), F.lit(False)).cast(IntegerType()))
    .withColumn('attributes_inside_cellar', F.coalesce(F.col('attributes_inside_cellar'), F.lit(False)).cast(IntegerType()))
    .withColumn('attributes_technology_dishwasher', F.coalesce(F.col('attributes_technology_dishwasher'), F.lit(False)).cast(IntegerType()))
    .withColumn('attributes_technology_cable_tv', F.coalesce(F.col('attributes_technology_cable_tv'), F.lit(False)).cast(IntegerType()))
    .withColumn('attributes_outside_balcony', F.coalesce(F.col('attributes_outside_balcony'), F.lit(False)).cast(IntegerType()))
    .withColumn('attributes_outside_playground', F.coalesce(F.col('attributes_outside_playground'), F.lit(False)).cast(IntegerType()))
    .withColumn('attributes_outside_parking', F.coalesce(F.col('attributes_outside_parking'), F.lit(False)).cast(IntegerType()))
    .withColumn('attributes_outside_garage', F.coalesce(F.col('attributes_outside_garage'), F.lit(False)).cast(IntegerType()))
    .withColumn('surface_property', F.coalesce(F.col('surface_property'), F.col('surface_usable'), F.col('surface_living')))
    .withColumn('surface_usable', F.coalesce(F.col('surface_usable'), F.col('surface_property'), F.col('surface_living')))
    .withColumn('surface_living', F.coalesce(F.col('surface_living'), F.col('surface_usable'), F.col('surface_property')))
)

for col in NUMERIC_COLUMNS:
    golden_data = (
        golden_data
        .withColumn(col, F.coalesce(F.col(col), F.lit(silver_data.agg(F.mean(col).astype(IntegerType())).collect()[0][0])))
    )
    
logger.info(f"{golden_data.count()} rows remained after cleaninig the data")

2022-07-09 23:25:49,984 [INFO] 48 rows remained after cleaninig the data        


In [13]:
# Store it on golden Delta Table
(
    golden_data
    .write
    .format('delta')
    .partitionBy(['city'])
    .mode('overwrite')
    .option('mergeSchema', 'true')
    .option('replaceWhere', f"city=='{city}'")
    .save(GOLDEN_TABLE_PATH)
)

22/07/09 23:25:50 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [57]:
# # Vacuum table
# delta_table = DeltaTable.forPath(spark, GOLDEN_TABLE_PATH)
# delta_table.vacuum()