In [0]:
dbutils.fs.mounts()

In [0]:
dbutils.fs.unmount('/mnt/data')
print('unmount completed')
print('begin re-mount')
storageAccountName = "blobcontainershilsdemo"

# Make sure your SAS token is not expired by validating it in you Azure portal>> Storage Account >> Shared Access Signature
# Generate a new token just to be sure that the code doesn't throw any authentication error
sasToken = "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2024-04-02T02:50:52Z&st=2024-04-01T18:50:52Z&spr=https&sig=DQHW9q022OBc8M2r1%2Bu4V8ubBagwAv%2FqMdeSojMSyxI%3D"
blobContainerName = "democontainer1"
mountPoint = "/mnt/data/"
print(mountPoint)
try:
    dbutils.fs.mount(
    source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
    mount_point = mountPoint,
    #extra_configs = {'fs.azure.account.key.' + storageAccountName + '.blob.core.windows.net': storageAccountAccessKey}
    extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
)
    print("mount succeeded!")
except Exception as e:
    print("mount exception", e)
print('remount completed')

In [0]:
# lists all the files in NYSE and NASDAQ in your Azure blob
dbutils.fs.ls("/mnt/data/NYSE")
dbutils.fs.ls("/mnt/data/NASDAQ")
dbutils.fs.ls("/mnt/data/NASDAQ1")

In [0]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.master('local').appName('guided-capstone').getOrCreate()
raw = spark.read.format("csv").load("dbfs:/mnt/data/NYSE/*.txt")

In [0]:
from pyspark.sql import SparkSession 
from typing import List
from pyspark.sql.types import StructType, StructField, DateType, StringType, TimestampType, IntegerType, DecimalType
from datetime import datetime
from decimal import Decimal
import json


# Define the schema for the common event
common_event = StructType([
    StructField("trade_dt", DateType(), True),
    # StructField("event_tm", TimestampType(), True),
    StructField("event_tm", StringType(), True),
    StructField("rec_type", StringType(), True),
    StructField("symbol", StringType(), True),
    # StructField("arrival_tm", TimestampType(), True),
    StructField("arrival_tm", StringType(), True),
    StructField("event_seq_nb", IntegerType(), True),
    StructField("exchange", StringType(), True),
    StructField("trade_pr", DecimalType(10, 2), True),
    StructField("trade_size", IntegerType(), True),
    StructField("bid_pr", DecimalType(10, 2), True),
    StructField("bid_size", IntegerType(), True),
    StructField("ask_pr", DecimalType(10, 2), True),
    StructField("ask_size", IntegerType(), True),
    StructField("ext_rec_type", StringType(), True),
    StructField("flag", StringType(), True)
])

# Print the schema
print(common_event)

In [0]:
from datetime import datetime
from decimal import Decimal

def parse_csv(row):
    record = [row._c0,row._c1,row._c2,row._c3,row._c4,row._c5,row._c6,row._c7,row._c8,row._c9,row._c10]
    try:
        trade_dt = datetime.strptime(record[0], "%Y-%m-%d") if record[0] else None
        # event_tm = datetime.strptime(record[1], "%Y-%m-%d %H:%M:%S") if record[1] else None
        event_tm = record[1]
        rec_type = record[2]
        symbol = record[3]
        # arrival_tm = datetime.strptime(record[4], "%Y-%m-%d %H:%M:%S") if record[4] else None
        arrival_tm = record[4]
        event_seq_nb = int(record[5]) if record[5] else None
        exchange = record[6]
        #trade_pr = Decimal(record[7]) if record[7] else None
        if rec_type == "T":
            # Event for record type "T"
            trade_pr = Decimal(record[7]) if record[7] else None
            trade_size = int(record[8]) if record[8] else None
            event = [trade_dt, event_tm, rec_type, symbol, arrival_tm, event_seq_nb, exchange,trade_pr, trade_size, None, None, None, None, "T", ""]
        elif rec_type == "Q":
            # Event for record type "Q"
            bid_pr = Decimal(record[7]) if record[7] else None
            bid_size = int(record[8]) if record[8] else None
            ask_pr = Decimal(record[9]) if record[9] else None
            ask_size = int(record[10]) if record[10] else None
            event = [trade_dt, event_tm, rec_type, symbol, arrival_tm, event_seq_nb, exchange, None, None, bid_pr, bid_size, ask_pr, ask_size, "Q", ""]
        else:
            raise ValueError("Invalid record type")
        
        return event
    except Exception as e:
        # If there's any error, return a common event with "B" as record type
        return [None, None, None, None, None, None, None, None, None, None, None, None, None,"B", ""]

parsed = raw.rdd.map(parse_csv)

In [0]:
df = spark.createDataFrame(parsed, common_event)
df.show()

In [0]:
dbutils.fs.mkdirs("dbfs:/mnt/data/output_csv")
output_dir = "/mnt/data/output_csv"
df.write.partitionBy("rec_type").mode("overwrite").parquet(output_dir)

In [0]:
for row in parsed.collect():
    print(row)

In [0]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("guided-capstone") \
    .getOrCreate()
df = spark.read.json("dbfs:/mnt/data/NASDAQ/*.txt")
#df.show()

In [0]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("guided-capstone") \
    .getOrCreate()
df = spark.read.json("dbfs:/mnt/data/NASDAQ/*.txt")
df.show()
def parse_json(row):
    record = json.loads(row)  # Parse JSON string into a dictionary
    try: 
        if df.event_type == "T":
            df.write.partitionBy("event_type").mode("overwrite").parquet("output_dir")
        elif df.event_type == "Q":
            # Event for record type "Q"
            df.write.partitionBy("event_type").mode("overwrite").parquet("output_dir")
        else:
            raise ValueError("Invalid record type")
        return row
    except Exception as e:
        # Handle exceptions: return dummy event in bad partition and fill in the fields as None or empty string
        return [row, "B"]

parsed = raw.rdd.map(parse_json)
# data = spark.createDataFrame(parsed)

In [0]:
dbutils.fs.mkdirs("dbfs:/mnt/data/output_json")
#databricks fs mkdirs dbfs:/mnt/data/output
output_dir = "/mnt/data/output_json"

# Write the DataFrame to Parquet format partitioned by "event_type"
df.write.partitionBy("event_type").mode("overwrite").parquet(output_dir)

In [0]:
### Code for Guided Capstone Step 3 starts here ###
### End-of-Day (EOD) Data Load ###

trade_common_json = spark.read.parquet("/mnt/data/output_json/event_type=T")
trade_common_csv = spark.read.parquet("/mnt/data/output_csv/rec_type=T")

In [0]:
#selecting those fields that belong to trade data and removing unnecessary fields
trade1 = trade_common_csv.select("trade_dt","event_tm", "symbol","arrival_tm","event_seq_nb","exchange","trade_pr","trade_size","ext_rec_type")

trade2 = trade_common_json.select("event_tm","event_seq_nb","exchange","execution_id","file_tm","price","size","symbol","trade_dt")

trade_json_with_col_renamed= trade2.withColumnRenamed(trade2.columns[4],"arrival_tm")
trade_json_with_col_renamed.show()

In [0]:
from pyspark.sql.functions import col, desc

#data correction function
def applyLatest(df):
    # unique col fields that identify the trade record
    unique_combo = ["trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"]
    #GroupBy by the unique_combo key and aggregate to get the row with the latest arrival_tm
    latestRows = df.groupBy(*unique_combo).agg({"arrival_tm": "max"}).alias("latest_arrival_tm")
    # Joining the original dataframe(df) and the latestRows dataframe. Syntax: dataframe.join(dataframe1, 
               #dataframe.ID == dataframe1.ID, 
               #"inner").show() 
    latest_output_rows = latestRows.join(df, (latestRows.trade_dt == df.trade_dt) &
                                      (latestRows.symbol == df.symbol) &
                                      (latestRows.exchange == df.exchange) &
                                      (latestRows.event_tm == df.event_tm) &
                                      (latestRows.event_seq_nb == df.event_seq_nb),"inner"
                                )
    return latestRows
# Calling applyLatest() function pasing trade1 DataFrame as the parameter
TradeCorrected_csv = applyLatest(trade1)
TradeCorrected_json = applyLatest(trade_json_with_col_renamed)
TradeCorrected_json.show()

In [0]:
trade_date = "2020-08-05"
output_path = "/mnt/data/EOD_trade_data"

df_write = TradeCorrected_json.filter(TradeCorrected_json.trade_dt == trade_date)
#df_write.write.parquet("{}/trade_dt={}".format(output_path, trade_date))
df_write.write.mode("append").parquet("{}/trade_dt={}".format(output_path, trade_date))


In [0]:
quote_common_json = spark.read.parquet("/mnt/data/output_json/event_type=Q")
quote_common_csv = spark.read.parquet("/mnt/data/output_csv/rec_type=Q")
#quote_common_json.show()
#quote_common_csv.show()

#quote_common_csv field format: |  trade_dt|event_tm|symbol|arrival_tm|event_seq_nb|exchange|trade_pr|trade_size|bid_pr|bid_size|ask_pr|ask_size|ext_rec_type|flag
# quote_common_json field format:| ask_pr|ask_size| bid_pr|bid_size|event_seq_nb|event_tm|exchange|execution_id|file_tm|price|size|symbol|trade_dt|


In [0]:
#selecting those fields that belong to trade data and removing unnecessary fields
Quote1 = quote_common_csv.select("trade_dt","event_tm", "symbol", "arrival_tm", "event_seq_nb", "exchange", "bid_pr", "bid_size", "ask_pr", "ask_size", "ext_rec_type")

Quote2 = quote_common_json.select("ask_pr","ask_size","bid_pr","bid_size","event_seq_nb","event_tm", "exchange", "execution_id", "file_tm", "price","size","symbol","trade_dt")

quote_json_with_col_renamed= Quote2.withColumnRenamed(Quote2.columns[8],"arrival_tm")
quote_json_with_col_renamed.show()

In [0]:
from pyspark.sql.functions import col, desc

#data correction function
def applyLatest(df):
    # unique col fields that identify the quote record
    unique_combo = ["trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"]
    #GroupBy by the unique_combo key and aggregate to get the row with the latest arrival_tm
    latestRows = df.groupBy(*unique_combo).agg({"arrival_tm": "max"}).alias("latest_arrival_tm")
    # Joining the original dataframe(df) and the latestRows dataframe. Syntax: dataframe.join(dataframe1, 
               #dataframe.ID == dataframe1.ID, 
               #"inner").show() 
    latest_output_rows = latestRows.join(df, (latestRows.trade_dt == df.trade_dt) &
                                      (latestRows.symbol == df.symbol) &
                                      (latestRows.exchange == df.exchange) &
                                      (latestRows.event_tm == df.event_tm) &
                                      (latestRows.event_seq_nb == df.event_seq_nb),"inner"
                                )
    return latestRows
# Calling applyLatest() function pasing trade1 DataFrame as the parameter
QuoteCorrected_csv = applyLatest(Quote1)
QuoteCorrected_json = applyLatest(quote_json_with_col_renamed)
QuoteCorrected_json.show()


In [0]:
trade_date = "2020-08-05"
output_path = "/mnt/data/EOD_quote_data"

# Note: First write the CSV file, then ensure to change the "QuoteCorrected_csv" to "QuoteCorrected_json" to append JSON files to EOD_quote_data folder on Azure Blob storage. 

df_write = QuoteCorrected_csv.filter(QuoteCorrected_csv.trade_dt == trade_date)
#df_write.write.parquet("{}/trade_dt={}".format(output_path, trade_date))
df_write.write.mode("append").parquet("{}/trade_dt={}".format(output_path, trade_date))