## Lakehouse 1: Import Data

This notebook will download additional historic information and merge it into the raw_stock_data table. This additional data will help provide more interesting reports and data science exploration.

Before starting, the raw_stock_data table should be receiving data regularly from the Eventstream.

In [None]:
import datetime
from datetime import timedelta
from delta.tables import *
from pyspark.sql.functions import *

# change table name if not raw_stock_data
targetTableName = 'raw_stock_data'
daysToImport = 30

minDateInRaw = datetime.datetime.utcnow() # will be recalculated as needed

## Download historical data

The cells below will download and unzip historical data to the lakehouse unmanaged files.

In [None]:
class HistoryData:
    def __init__(self, file_uri, filename, year) -> None:
        self.file_uri = file_uri
        self.filename = filename
        self.year = year

def getDownloadInfo(year):
    if year==2023:
        return HistoryData(
            'https://github.com/microsoft/fabricrealtimelab/raw/refs/heads/main/files/AbboCostStockHistory/stockhistory-2023.tgz',
            'stockhistory-2023.tgz',
            year)
    elif year==2024:
        return HistoryData(
            'https://github.com/microsoft/fabricrealtimelab/raw/refs/heads/main/files/AbboCostStockHistory/stockhistory-2024.tgz',
            'stockhistory-2024.tgz',
            year)
    elif year==2025:
        return HistoryData(
            'https://github.com/microsoft/fabricrealtimelab/raw/refs/heads/main/files/AbboCostStockHistory/stockhistory-2025.tgz',
            'stockhistory-2025.tgz',
            year)
    elif year==2026:
        return HistoryData(
            'https://github.com/microsoft/fabricrealtimelab/raw/refs/heads/main/files/AbboCostStockHistory/stockhistory-2026.tgz',
            'stockhistory-2026.tgz',
            year)
    else:
        return None



In [None]:
# class HistoryData:
#     def __init__(self, file_uri, filename, year) -> None:
#         self.file_uri = file_uri
#         self.filename = filename
#         self.year = year

# def getDownloadInfo(year):
#     if year==2023:
#         return HistoryData(
#             'https://fabricrealtimelab.blob.core.windows.net/public/AbboCost_Stock_History_v2/stockhistory-2023.tgz?sp=r&st=2024-01-01T17:00:00Z&se=2032-01-01T17:00:00Z&spr=https&sv=2022-11-02&sr=b&sig=ledWmONUdRKvcpDumZHpLPqkrTLWu%2B9GrF0gMh5QK2c%3D',
#             'stockhistory-2023.tgz',
#             year)
#     elif year==2024:
#         return HistoryData(
#             'https://fabricrealtimelab.blob.core.windows.net/public/AbboCost_Stock_History_v2/stockhistory-2024.tgz?sp=r&st=2024-01-01T17:00:00Z&se=2032-01-01T17:00:00Z&spr=https&sv=2022-11-02&sr=b&sig=TIFg2tvEww3rdTVNOKo5ef1xTx%2Bs0XAbdEARKGhOiX8%3D',
#             'stockhistory-2024.tgz',
#             year)
#     elif year==2025:
#         return HistoryData(
#             'https://fabricrealtimelab.blob.core.windows.net/public/AbboCost_Stock_History_v2/stockhistory-2025.tgz?sp=r&st=2024-01-01T17:00:00Z&se=2032-01-01T17:00:00Z&spr=https&sv=2022-11-02&sr=b&sig=UB4QhOmsfwhPC0rE14wRJQxeiXXutHxm%2BOVnFA3xDFQ%3D',
#             'stockhistory-2025.tgz',
#             year)
#     elif year==2026:
#         return HistoryData(
#             'https://fabricrealtimelab.blob.core.windows.net/public/AbboCost_Stock_History_v2/stockhistory-2026.tgz?sp=r&st=2024-01-01T17:00:00Z&se=2032-01-01T17:00:00Z&spr=https&sv=2022-11-02&sr=b&sig=l4tonO4SZfuCbHrheomO0WNkuYfyTTdfdNrcfu%2Fc7dU%3D',
#             'stockhistory-2026.tgz',
#             year)
#     else:
#         return None



In [None]:
import os
import datetime
from datetime import timedelta

LAKEHOUSE_FOLDER = "/lakehouse/default"
DATA_FOLDER = "Files/stockhistory/raw"

TAR_FILE_PATH = f"/{LAKEHOUSE_FOLDER}/{DATA_FOLDER}/tar/"
CSV_FILE_PATH = f"/{LAKEHOUSE_FOLDER}/{DATA_FOLDER}/csv/"

def downloadHistoryIfNotExists():

    currYear = datetime.datetime.utcnow().year

    if not os.path.exists(LAKEHOUSE_FOLDER):
        # add a lakehouse if the notebook has no default lakehouse
        # a new notebook will not link to any lakehouse by default
        raise FileNotFoundError(
            "Lakehouse not found, please add a lakehouse for the notebook."
        )
    else:
        for year in range(currYear, currYear-2, -1):
            fileInfo = getDownloadInfo(year)

            if (fileInfo is None):
                print(f'No file exists for {year}')
                continue

            # verify if files are already in the lakehouse, and if not, download and unzip
            if not os.path.exists(f"{TAR_FILE_PATH}{fileInfo.filename}"):
                print(f'Downloading {fileInfo.filename}')
                os.makedirs(TAR_FILE_PATH, exist_ok=True)
                os.system(f"wget '{fileInfo.file_uri}' -O {TAR_FILE_PATH}{fileInfo.filename}")

                #todo: better file checking
                os.makedirs(CSV_FILE_PATH, exist_ok=True)
                print(f'Extracting {fileInfo.filename}')
                os.system(f"tar -zxvf {TAR_FILE_PATH}{fileInfo.filename} -C {CSV_FILE_PATH}")
            else:
                print(f'File already exists: {fileInfo.filename}')

downloadHistoryIfNotExists()

## Verify/Create Table, find earliest date

Create the target table if it doesn't exist; find minimum date in the table to act as a cut-off point.

In [None]:
def create_raw_table_if_needed():
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {targetTableName} (
            timestamp TIMESTAMP
            ,price DOUBLE 
            ,Symbol STRING
            )
        USING DELTA
        """)

create_raw_table_if_needed()

In [None]:
# find earliest date in raw_stock_data table, or use today's date if none

df_min_date = spark.sql(f"SELECT coalesce(min(to_timestamp(timestamp)),current_date()) as minDate FROM {targetTableName}")
minDateInRaw = df_min_date.first()["minDate"]
print(f"Min date in raw table: {minDateInRaw}")

# import data up until about 1 hour before the current data
historicalEndDate = (minDateInRaw + datetime.timedelta(hours=-1)).replace(microsecond=0)
historicalBeginDate = historicalEndDate + datetime.timedelta(days=-daysToImport)

# # import data before current day
# historicalEndDate = minDateInRaw.replace(hour=0, minute=0, second=0, microsecond=0)
# historicalBeginDate = historicalEndDate + datetime.timedelta(days=-daysToImport)

print(f'Historical import begin date: {historicalBeginDate}')
print(f'Historical import end date: {historicalEndDate}')

In [None]:
# verify csv files are available

import time

path_to_check = f'{DATA_FOLDER}/csv'
files_found = False
check_count = 0

while (files_found == False):
    try:
        check_count += 1
        files = mssparkutils.fs.ls(path_to_check)
        if (len(files) > 0):
            files_found = True
        print(f'Found {len(files)} CSV folders.')
    except Exception as e:
        if (check_count > 10):
            print('Unable to verify CSV files. Please restart session and verify files are downloading and extracting.')
            raise e
        print('Checking for files...')
        time.sleep(1)


## Read the data into a dataframe

Loads the data and filters out data outside the timeframe window.

In [None]:
# read the CSV files, {year}/{month}/{day}.csv

from pyspark.sql import types as T

schema = T.StructType([
    T.StructField("price", T.DoubleType()),
    T.StructField("Symbol", T.StringType()),
    T.StructField("timestamp", T.TimestampType())
])

df_stocks = (
    spark.read.format("csv")
    .option("header", "true")
    .schema(schema)
    .load(f"{DATA_FOLDER}/csv/*/*/*.csv")
)

df_stocks.tail(8)

In [None]:
# filter stocks to between min/max dates

df_stocks = df_stocks.select("*").where( \
    f'timestamp >= "{historicalBeginDate}" and timestamp < "{historicalEndDate}"').sort("timestamp")

df_stocks.tail(8)

## Write to Delta table

Append is the fastest method, but merge can be used if looking to update existing data.

In [None]:
 # a merge offers more flexibility to update the table, but risks a concurrency error
 # because the table is being updated by the eventstream

 from delta.tables import *

 def importIntoRaw(df):

   raw_table = DeltaTable.forName(spark, targetTableName)

   raw_table.alias('raw') \
     .merge(
       df.alias('history'),
       'raw.timestamp = history.timestamp and raw.symbol = history.symbol'
     ) \
     .whenMatchedUpdate(set =
         {
           "price": "history.price"
         }
     ) \
     .whenNotMatchedInsert(values =
       {
           "symbol": "history.symbol"
           ,"price": "history.price"
           ,"timestamp": "history.timestamp"
       }
     ) \
     .execute()

 Retries = 3
 IsSuccess = False

 for i in range(Retries):
   try:
     importIntoRaw(df_stocks)
     IsSuccess = True
     print(f"Completed merge")
     break
   except delta.exceptions.ConcurrentAppendException as e:
     print(f"Concurrency error - please wait and try again: {e}")
     time.sleep(1)
     continue

 if not IsSuccess:
   msg = f"Failed to merge after {Retries} retries"
   raise SystemExit(msg)
