## Environment Setup

In [0]:
VOLUME_ROOT_PATH = "/Volumes/cscie103_catalog/final_project"
VOLUME_DATA_DIR = f"{VOLUME_ROOT_PATH}/data"

CATALOG_NAME = "cscie103_catalog"
SCHEMA_NAME = "final_project"
spark.sql(f"USE {CATALOG_NAME}.{SCHEMA_NAME}")

class DataframeNames:
    HOLIDAYS = "holidays"
    OIL = "oil"
    STORES = "stores"
    TEST = "test"
    TRAIN = "train"
    TRANSACTIONS = "transactions"
    TRAINING = "training"

    ALL = [ HOLIDAYS, OIL, STORES, TEST, TRAIN, TRANSACTIONS, TRAINING ]

class DataTier:
    BRONZE = "bronze"
    SILVER = "silver"
    GOLD = "gold"

    def getBronzeName(tablename):
        return DataTier.BRONZE + "_" + tablename

    def getSilverName(tablename):
        return DataTier.SILVER + "_" + tablename
    
    def getGoldName(tablename):
        return DataTier.GOLD + "_" + tablename

In [0]:
def extractTransformLoad(bronze_tablename, silver_tablename, checkpoint_path, transform):
    """
    :param: bronze_tablename - bronze UC table name e.g. bronze_tablename
    :param: silver_tablename - silver UC table name e.g. silver_tablename
    :param: checkpoint_path - volume path to checkpoint e.g. /Volumes/...
    :param: transform - transformation function to apply to bronze table, should accept readStream

    :return: streaming query
    """
    print("Reading from bronze table: " + bronze_tablename)
    read_stream_df = spark.readStream.format("delta").table(bronze_tablename)

    print("Applying transformation(s)...")
    transformed_df = transform(read_stream_df)

    print("Writing to silver table: " + silver_tablename + "...")
    streaming_query = transformed_df.writeStream \
        .outputMode("append") \
        .format("delta") \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(once=True) \
        .toTable(silver_tablename)

    return streaming_query

def extractTransformLoadStatic(bronze_tablename, silver_tablename, transform):
    read_df = spark.read.format("delta").table(bronze_tablename)
    transformed_df = transform(read_df)
    transformed_df.write.format("delta").mode("overwrite").saveAsTable(silver_tablename)
    
    return transformed_df

In [0]:
# set up checkpoint volume for streaming in Silver Tier
VOLUME_CHECKPOINTS_DIR = f"{VOLUME_DATA_DIR}/checkpoints"
for dataframe_name in DataframeNames.ALL:
    checkpoint_path = f"{VOLUME_CHECKPOINTS_DIR}/{dataframe_name}"
    dbutils.fs.rm(checkpoint_path, True)
    dbutils.fs.mkdirs(checkpoint_path)

## Building Out Silver Data

### Stores

In [0]:
# Stores Dataframe
bronze_tablename_stores = DataTier.getBronzeName(DataframeNames.STORES)
silver_tablename_stores = DataTier.getSilverName(DataframeNames.STORES)
checkpoint_path_stores = f"{VOLUME_CHECKPOINTS_DIR}/{DataframeNames.STORES}"
transform = lambda df: df.dropna()

stores_streaming_query = extractTransformLoad(
    bronze_tablename_stores,
    silver_tablename_stores,
    checkpoint_path_stores,
    transform
)

stores_streaming_query.awaitTermination()
print("Stores silver table written.")

### Transactions

In [0]:
# Transactions Dataframe
bronze_tablename_transactions = DataTier.getBronzeName(DataframeNames.TRANSACTIONS)
silver_tablename_transactions = DataTier.getSilverName(DataframeNames.TRANSACTIONS)
checkpoint_path_transactions = f"{VOLUME_CHECKPOINTS_DIR}/{DataframeNames.TRANSACTIONS}"
transform = lambda df: df.dropna()

transactions_streaming_query = extractTransformLoad(
    bronze_tablename_transactions,
    silver_tablename_transactions,
    checkpoint_path_transactions,
    transform
)

transactions_streaming_query.awaitTermination()
print("Transactions silver table written.")

### Holidays

In [0]:
# Holidays Dataframe
bronze_tablename_holidays = DataTier.getBronzeName(DataframeNames.HOLIDAYS)
silver_tablename_holidays = DataTier.getSilverName(DataframeNames.HOLIDAYS)

def transform(holidays_events_df):
    import pyspark.sql.functions as F
    # Preparation of holidays data (holidays_events_df):

    # 1. Drop rows with 'transfered' = true -> these were transferred to another date.
    #    Identifiable by 'type' = 'Transfer'.
    # 2. Explode & Construct
    #   a. Explode nationwide holiday to per state, identifiable by 'locale_name' = 'Ecuador'.
    #   b. Construct new dataframe with 2 columns: 'date', 'is_holiday' from the holidays df.
    # 3. Deduplicate dates. This is made under assumption that all the rest of holiday types are actual holidays.
    # 4. Add dates for which 'is_holiday' is 0:
    #   a. Add all dates from train_df with 'is_holiday' as 0 and 'state' as 'Ecuador'
    #   b. Explode & Construct for each row where 'state' is 'Ecuador' (this time 'is_holiday' = 0)

    # 1. Drop rows with 'transfered' = true -> these were transferred to another date.
    holidays_events_df = holidays_events_df.where(F.col('locale_name') != 'Transfer')

    # 2. Explode & Construct
    # retrieve list of all states from stores_df
    bronze_tablename_stores = DataTier.getBronzeName(DataframeNames.STORES)
    stores_df = spark.read.format("delta").table(bronze_tablename_stores)

    ecuador_states = [ row['state'] for row in stores_df.select('state').distinct().collect()]

    # add array with all the states to 'Ecuador' rows
    holidays_events_df = holidays_events_df.withColumn(
        'locale_name_array',
        F.when(
            F.col('locale_name') == 'Ecuador',
            F.array([ F.lit(s) for s in ecuador_states ])
        ).otherwise(
            F.array(F.col('locale_name'))
        )
    )
    # a, b. Explode & Construct new dataframe with 2 columns: 'date', 'is_holiday'
    holidays_events_df = holidays_events_df.select(
        'date',
        F.explode('locale_name_array').alias('state'),
        F.lit(1).alias('is_holiday') 
    )

    # 3. Deduplicate rows by leaving unique per date-state
    holidays_events_df = holidays_events_df.dropDuplicates(['date', 'state'])

    # 4. Add dates for which 'is_holiday' is 0.
    # a. Add all dates from train_df with 'is_holiday' as 0 and 'state' as 'Ecuador'
    # read all dates from train
    bronze_tablename_train = DataTier.getBronzeName(DataframeNames.TRAIN)
    train_df = spark.read.format("delta").table(bronze_tablename_train)
    train_dates = train_df.select('date').distinct()

    # right join to train_dates -> results in na for new dates for 'is_holiday', 'state'
    holidays_events_df = holidays_events_df.join(
        train_dates,
        on='date',
        how='right'
    )
    # fill na-s
    holidays_events_df = holidays_events_df.fillna(0, subset=['is_holiday'])
    holidays_events_df = holidays_events_df.fillna('Ecuador', subset=['state'])

    # b. Explode & Construct for each row where 'state' is 'Ecuador' (this time 'is_holiday' = 0)
    holidays_events_df = holidays_events_df.withColumn(
        'locale_name_array',
        F.when(
            F.col('state') == 'Ecuador',
            F.array([ F.lit(s) for s in ecuador_states ])
        ).otherwise(
            F.array(F.col('state'))
        )
    )
    holidays_events_df = holidays_events_df.select(
        'date',
        F.explode('locale_name_array').alias('state'),
        F.col('is_holiday')
    )

    return holidays_events_df


extractTransformLoadStatic(
    bronze_tablename_holidays,
    silver_tablename_holidays,
    transform
)
print("Holidays silver table written.")

### Oil

In [0]:
# Oil Dataframe
bronze_tablename_oil = DataTier.getBronzeName(DataframeNames.OIL)
silver_tablename_oil = DataTier.getSilverName(DataframeNames.OIL)

def transform(oil_df):
    import pyspark.sql.functions as F
    from pyspark.sql.window import Window
    # Preparation of oil data (oil_df):
    # 1. Drop rows with 'dcoilwtico' = null -> these are missing oil prices.
    # 2. Add missing dates with forward fill.
    # 3. Write to silver table.
    oil_df = oil_df \
        .dropna(subset=["dcoilwtico"]) \
        .withColumn("date", F.to_date("date")) \
        .withColumn("dcoilwtico", F.col("dcoilwtico").cast("double"))

    window_ffill = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, 0)
    oil_df = oil_df.withColumn(
        "dcoilwtico",
        F.last("dcoilwtico", ignorenulls=True).over(window_ffill)
    )

    return oil_df

extractTransformLoadStatic(
    bronze_tablename_oil,
    silver_tablename_oil,
    transform
)
print("Oil silver table written.")

### Train

In [0]:
# Train Dataframe
bronze_tablename_train = DataTier.getBronzeName(DataframeNames.TRAIN)
silver_tablename_train = DataTier.getSilverName(DataframeNames.TRAIN)
checkpoint_path = f"{VOLUME_CHECKPOINTS_DIR}/{DataframeNames.TRAIN}"
transform = lambda df: df.dropna()

train_streaming_query = extractTransformLoad(
    bronze_tablename_train,
    silver_tablename_train,
    checkpoint_path,
    transform
)

train_streaming_query.awaitTermination()
print("Train silver table written.")

### Training (WIP)

In [0]:
# Training Dataframe
def trainingETL():
  pass
