## Lakehouse 2: Stock Load to Lakehouse Dimensional Model
This notebook will process raw data from the eventstream into a dimension model.
Configure the variable sourceTableName to match the source table where stock events are being saved.

In [None]:
from delta.tables import *
from pyspark.sql.functions import *

sourceTableName = 'raw_stock_data'
highWaterMark = None
newHighWaterMark = None

In [None]:
# this function adds symbols to dim_symbol that may not exist in table
# this allows for new symbols to be added to feed over time

def dim_symbol_incremental_load(df_stocks, df_existing_symbols):

    # determine max id of current symbols table
    if df_existing_symbols.rdd.isEmpty():
        maxId = 0
    else:
        maxId = df_existing_symbols.select("Symbol_SK").rdd.max()[0]

    # for the new rows to be ingested, get a list of unique symbols
    df_symbols = df_stocks.select("Symbol").distinct().orderBy("Symbol")

    # get the symbols in the new dataset that do not exist in current symbols dimension
    df_symbols = df_symbols.join(df_existing_symbols, df_symbols.Symbol == df_existing_symbols.Symbol, "left_outer")\
                        .where(df_existing_symbols.Symbol.isNull()) \
                        .select(df_symbols.Symbol) \
                        .orderBy("Symbol")

    df_symbols = df_symbols.withColumn("Symbol_SK", monotonically_increasing_id() + maxId + 1)
    df_symbols = df_symbols.withColumn("Name", when(df_symbols.Symbol == "BCUZ","Company Because")
        .when(df_symbols.Symbol == "IDGD","Company IDontGiveADarn")
        .when(df_symbols.Symbol == "IDK","Company IDontKnow")
        .when(df_symbols.Symbol == "TDY","Company Today")
        .when(df_symbols.Symbol == "TMRW","Company Tomorrow")
        .when(df_symbols.Symbol == "WHAT","Company What")
        .when(df_symbols.Symbol == "WHY","Company Why")
        .when(df_symbols.Symbol == "WHO","Company Who")
        .otherwise("Company Unknown"))
    df_symbols = df_symbols.withColumn("Market", when(substring(df_symbols.Symbol,1,1) == "B","NASDAQ")
        .when(substring(df_symbols.Symbol,1,1) == "W","NASDAQ")
        .when(substring(df_symbols.Symbol,1,1) == "I","NYSE")
        .when(substring(df_symbols.Symbol,1,1) == "T","NYSE")
        .otherwise("No Market"))
    df_symbols = df_symbols.select(df_symbols.Symbol_SK, df_symbols.Symbol, df_symbols.Name, df_symbols.Market)

    # if the dataframe is empty, there are no missing symbols
    if df_symbols.rdd.isEmpty():
        print("No new symbols.") 
        return df_existing_symbols

    print("New Symbols:")
    df_symbols.show()

    dim_symbol_table = DeltaTable.forName(spark, "dim_symbol")

    dim_symbol_table.alias('dim_symbol') \
    .merge(
        df_symbols.alias('updates'),
        'dim_symbol.Symbol = updates.Symbol'
    ) \
    .whenNotMatchedInsert(values =
        {
            "Symbol_SK": "updates.Symbol_SK"
            ,"Symbol": "updates.Symbol"
            ,"Name": "updates.Name"
            ,"Market": "updates.Market"
        }
    ) \
    .execute()

    return spark.sql("SELECT * FROM dim_symbol ORDER BY Symbol ASC")

In [None]:
# get the high watermark which tracks which rows have already been ingested
# raise error if no record is found in our metadata table

df_temp = spark.sql(f"SELECT WaterMark FROM etl_ingestsourceinfo WHERE IsActiveFlag = 'Y' and ObjectName = '{sourceTableName}'")

if df_temp.rdd.isEmpty():
    msg = f"No valid ingestion source: {sourceTableName}"
    print(msg)
    raise SystemExit(msg)
else:
    highWaterMark = df_temp.first()["WaterMark"]
    print(f"High watermark: {highWaterMark}")

In [None]:
# get new stock data to ingest, starting at watermark
# limit is arbitrary; limited primarily for demo purposes

df_stocks = spark.sql(f"SELECT symbol, price, timestamp FROM {sourceTableName} \
    WHERE timestamp > '{highWaterMark}' \
    ORDER BY timestamp ASC LIMIT 4000000")
df_stocks.show()

In [None]:
# load the date dimension for later joins

df_date = spark.sql("SELECT * FROM dim_date")
df_date.show()

In [None]:
# load the symbols dimension 

# creating the symbols incremental load in this way allows new symbols
# to be added over time dynamically. if new symbols are found in the 
# new stock data, they will be imported into the symbol dimension
# before continuing

df_symbol = spark.sql("SELECT * FROM dim_symbol ORDER BY Symbol ASC")
print("Current Symbols:")
df_symbol.show()

# load any new symbols into dimension
df_symbol = dim_symbol_incremental_load(df_stocks, df_symbol)

print("Symbols After Merge:")
df_symbol.show()

In [None]:
# calculate the new watermark

df_temp = df_stocks.agg(min(df_stocks["timestamp"]), max(df_stocks["timestamp"]))
newHighWaterMark = df_temp.first()["max(timestamp)"]
print(f"New watermark: {newHighWaterMark}")

Doing aggregations and transformations are easily accomplished in Data Wrangler. With a notebook running, any spark or pandas dataframe can be loaded by Data Wrangler, allowing you to perform various processing and transformation steps. When complete, Data Wrangler will automatically generate the code to process the data in a new dataframe, as seen below. 

In this case, a new datestamp column is added to the table that represents the current date without a time component for grouping purposes.

Next, the data is grouped by the symbol and datestamp, and then the min, max, and current close price is calculated (for simplicity, close price is always determined to be the last price of any given day). 

Extra data is removed, and the results sorted. 

In [None]:
# Code generated by Data Wrangler for PySpark DataFrame

from pyspark.sql import functions as F

def clean_data(df_stocks):
    df_stocks = df_stocks.withColumn('datestamp', to_date(df_stocks['timestamp']))
    df_stocks = df_stocks.groupBy('symbol', 'datestamp').agg(F.min('price').alias('newMinPrice'), 
        F.max('price').alias('newMaxPrice'), F.last('price').alias('newClosePrice'))
    df_stocks = df_stocks.dropna()
    df_stocks = df_stocks.sort(df_stocks['symbol'].asc(), df_stocks['datestamp'].asc())
    return df_stocks

df_stocks_agg = clean_data(df_stocks)
display(df_stocks_agg)

In [None]:
# join the aggregated data to the date dimension

df_join = df_stocks_agg.join(df_date, df_stocks_agg.datestamp == df_date.DateKey)
display(df_join)

In [None]:
# join the data from above with the symbols dimension

df_join = df_join.join(df_symbol, df_join.symbol == df_symbol.Symbol)
display(df_join)

In [None]:
# create a final view with cleaned names for processing ease

df_final_view = df_join.select(col("datekey").alias("newPriceDateKey"), col("dim_symbol.Symbol").alias("newSymbol"),
    col("dim_symbol.Symbol_SK").alias("newSymbol_SK"),"newMinPrice","newMaxPrice","newClosePrice")

df_final_view.show()

In [None]:
# to insert the new data, we'll merge the dataframe with the fact table.
# for existing records, update the high/low/close price of the stock
# for new records, insert a new row with the current high/low/close

from delta.tables import *

fact_stock_prices_table = DeltaTable.forName(spark, "fact_stocks_daily_prices")

fact_stock_prices_table.alias('fact') \
  .merge(
    df_final_view.alias('updates'),
    'fact.PriceDateKey = updates.newPriceDateKey and fact.Symbol_SK = updates.newSymbol_SK'
  ) \
  .whenMatchedUpdate(set =
    {
        "MinPrice": "CASE WHEN fact.MinPrice < updates.newMinPrice THEN fact.MinPrice ELSE updates.newMinPrice END"
        ,"MaxPrice": "CASE WHEN fact.MaxPrice > updates.newMaxPrice THEN fact.MaxPrice ELSE updates.newMaxPrice END"
        ,"ClosePrice": "updates.newClosePrice"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
        "Symbol_SK": "updates.newSymbol_SK"
        ,"PriceDateKey": "updates.newPriceDateKey"
        ,"MinPrice": "updates.newMinPrice"
        ,"MaxPrice": "updates.newMaxPrice"
        ,"ClosePrice": "updates.newClosePrice"
    }
  ) \
  .execute()


In [None]:
# update the watermark for next run

spark.sql(f"UPDATE etl_ingestsourceinfo SET WaterMark = '{newHighWaterMark}' WHERE IsActiveFlag = 'Y' and ObjectName = '{sourceTableName}'")
spark.sql("SELECT * FROM etl_ingestsourceinfo LIMIT 1000").show()


The code below is for observing the output and comparing results. After running, use the Freeze option of the cell that contains the output (with the comment 'run 1 results') to prevent it from running in the future; you can then run the notebook again, and compare the results between runs.

Note that while this notebook can be scheduled and run periodically, it's written primarily for interactive use to illustrate the processing steps. Some of these steps can be shortened or removed when running automatically (such as the cells below).

In [None]:
# function that gets the latest fact data.

def get_latest_fact():
    return spark.sql("SELECT dim.Symbol, fact.Symbol_SK, PriceDateKey, MinPrice, MaxPrice, ClosePrice \
        FROM fact_stocks_daily_prices fact \
        INNER JOIN dim_symbol dim on fact.Symbol_SK = dim.Symbol_SK \
        ORDER BY PriceDateKey DESC, fact.Symbol_SK ASC LIMIT 100")

In [None]:
# run 1 results:
display(get_latest_fact())