In [0]:
#Guided Capstone Step Three
'''
Step Three: End-of-Day (EOD) Data Load
Now that you’ve preprocessed the incoming data from the exchange, you need to create the
final data format to store on the cloud. The cloud will also store historic exchange data, so
Spring Capital can look up any trading day and easily find historic data.
This preprocessed data will be used in the following ETL process, as well as for adhoc user
queries.
At the end of the last step, you have created three partitions under output_dir. It’s easy to go
through them one by one and create corresponding datasets. Note that the target dataset
should have the specific schema required by the partition.
Learning Objectives:
By the end of this step, you will be able to…
● Create Spark DataFrames using Parquet files
● Perform data cleaning using Spark aggregation methods.
● Use cloud storage as output of Spark jobs.
Prerequisites:
- PySpark: read multiple Parquet files into a single DataFrame, transformations, write
DataFrame.
'''

'\nStep Three: End-of-Day (EOD) Data Load\nNow that you’ve preprocessed the incoming data from the exchange, you need to create the\nfinal data format to store on the cloud. The cloud will also store historic exchange data, so\nSpring Capital can look up any trading day and easily find historic data.\nThis preprocessed data will be used in the following ETL process, as well as for adhoc user\nqueries.\nAt the end of the last step, you have created three partitions under output_dir. It’s easy to go\nthrough them one by one and create corresponding datasets. Note that the target dataset\nshould have the specific schema required by the partition.\nLearning Objectives:\nBy the end of this step, you will be able to…\n● Create Spark DataFrames using Parquet files\n● Perform data cleaning using Spark aggregation methods.\n● Use cloud storage as output of Spark jobs.\nPrerequisites:\n- PySpark: read multiple Parquet files into a single DataFrame, transformations, write\nDataFrame.\n'

In [0]:
!pip install findspark
!pip install pyspark

Collecting findspark
  Using cached findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Using cached findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Collecting pyspark
  Using cached pyspark-3.5.5-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.5
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("EOD Data Load").getOrCreate()
from pyspark.sql.functions import *

In [0]:
# 3.1 Populate trade dataset
# 3.1.1 Read Trade Partition Dataset From It’s Temporary Location
# Set Spark Configuration for Azure Blob Storage
storage_account_name = "trial25"
storage_account_key = ""
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",storage_account_key)
trade_location ="wasbs://equity-data@trial25.blob.core.windows.net/output_dir/partition=T"
trade_common = spark.read.parquet(trade_location)

In [0]:
# 3.1.2 Select The Necessary Columns For Trade Records
# The temporary data that you get is associated with a common schema, fitting both trade and
# quote events. Since you’re going to produce trade and quote data separately, you need to
# remove unnecessary columns to save space
trade = trade_common.select("trade_dt", "symbol", "exchange", "event_tm",
"event_seq_nb","arrival_tm", "trade_pr")

In [0]:
trade.select("arrival_tm").distinct().show()

+-------------------+
|         arrival_tm|
+-------------------+
|2020-08-06 09:30:00|
|2020-08-05 09:30:00|
+-------------------+



In [0]:
display(trade.select("*").tail(30))

trade_dt,symbol,exchange,event_tm,event_seq_nb,arrival_tm,trade_pr
2020-08-05,SYMA,NYSE,2020-08-05T10:37:21.581Z,10,2020-08-05T09:30:00Z,79.19
2020-08-05,SYMA,NYSE,2020-08-05T11:56:13.086Z,20,2020-08-05T09:30:00Z,76.49
2020-08-05,SYMA,NYSE,2020-08-05T13:09:12.493Z,30,2020-08-05T09:30:00Z,75.05
2020-08-05,SYMA,NYSE,2020-08-05T14:24:34.33Z,40,2020-08-05T09:30:00Z,78.43
2020-08-05,SYMA,NYSE,2020-08-05T15:31:56.932Z,50,2020-08-05T09:30:00Z,78.15
2020-08-05,SYMA,NYSE,2020-08-05T16:37:06.676Z,60,2020-08-05T09:30:00Z,79.19
2020-08-05,SYMA,NYSE,2020-08-05T17:49:22.23Z,70,2020-08-05T09:30:00Z,77.07
2020-08-05,SYMA,NYSE,2020-08-05T19:04:45.677Z,80,2020-08-05T09:30:00Z,75.48
2020-08-05,SYMA,NYSE,2020-08-05T20:21:09.752Z,90,2020-08-05T09:30:00Z,74.6
2020-08-05,SYMA,NYSE,2020-08-05T21:30:19.773Z,100,2020-08-05T09:30:00Z,77.79


In [0]:
# Count occurrences of each unique identifier
duplicate_counts = trade.groupBy(
    "trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"
).agg(count("arrival_tm").alias("count"))

# Keep only identifiers that have multiple occurrences (more than 1)
duplicates_only = duplicate_counts.filter(col("count") > 1).drop("count")

# Join back to the original DataFrame to filter only these duplicate rows
trade_duplicates = trade.join(
    duplicates_only,
    on=["trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"],
    how="inner"
).orderBy("trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb", "arrival_tm")

# Show duplicate rows with different arrival_tm
trade_duplicates.show(truncate=False)

+--------+------+--------+--------+------------+----------+--------+
|trade_dt|symbol|exchange|event_tm|event_seq_nb|arrival_tm|trade_pr|
+--------+------+--------+--------+------------+----------+--------+
+--------+------+--------+--------+------------+----------+--------+



In [0]:
'''
3.1.3 Apply Data Correction
In the exchange dataset, you can uniquely identify a record by the combination of trade_dt,
symbol, exchange, event_tm, event_seq_nb. However, the exchange may correct an error in
any submitted record by sending a new record with the same uniqueID. Such records will come
with later arrival_tm. You must ensure you only accept the one with the most recent arrival_tm.
This operation requires aggregation in order to group the dataset by their unique ID, so that a
single function can be applied to a certain group entirely. In Spark, we use groupBy operation to
achieve this.
'''
def applyLatest_groupby(df):
    # Step 1: Get the latest arrival_tm for each unique record
    latest_arrival = df.groupBy(
        "trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"
    ).agg(max("arrival_tm").alias("latest_arrival_tm"))

    # Step 2: Filter records that match the latest arrival_tm in the original DataFrame
    df_filtered = df.join(
        latest_arrival,
        on=["trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"],
        how="inner"
    ).filter(col("arrival_tm") == col("latest_arrival_tm")).drop("latest_arrival_tm")

    return df_filtered

# Apply function to the trade DataFrame
trade_corrected = applyLatest_groupby(trade)

# Show results
trade_corrected.show(truncate=False)

+----------+------+--------+-----------------------+------------+-------------------+--------+
|trade_dt  |symbol|exchange|event_tm               |event_seq_nb|arrival_tm         |trade_pr|
+----------+------+--------+-----------------------+------------+-------------------+--------+
|2020-08-06|SYMA  |NYSE    |2020-08-06 10:49:37.345|10          |2020-08-06 09:30:00|74.49   |
|2020-08-06|SYMA  |NYSE    |2020-08-06 12:00:11.545|20          |2020-08-06 09:30:00|76.16   |
|2020-08-06|SYMA  |NYSE    |2020-08-06 13:11:57.308|30          |2020-08-06 09:30:00|76.90   |
|2020-08-06|SYMA  |NYSE    |2020-08-06 14:27:13.014|40          |2020-08-06 09:30:00|77.12   |
|2020-08-06|SYMA  |NYSE    |2020-08-06 15:39:08.521|50          |2020-08-06 09:30:00|76.37   |
|2020-08-06|SYMA  |NYSE    |2020-08-06 16:58:58.633|60          |2020-08-06 09:30:00|78.32   |
|2020-08-06|SYMA  |NYSE    |2020-08-06 18:14:28.899|70          |2020-08-06 09:30:00|78.23   |
|2020-08-06|SYMA  |NYSE    |2020-08-06 19:28:22.62

In [0]:

'''
# Define the function using groupBy()
def applyLatest_groupby(df):
    # Step 1: Get the latest arrival_tm for each unique record
    latest_arrival = df.groupBy(
        "trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"
    ).agg(max("arrival_tm").alias("latest_arrival_tm"))

    # Step 2: Join back to the original dataset to keep only the latest records
    df_latest = (
        df.alias("d")
        .join(
            latest_arrival.alias("l"),
            (col("d.trade_dt") == col("l.trade_dt")) &
            (col("d.symbol") == col("l.symbol")) &
            (col("d.exchange") == col("l.exchange")) &
            (col("d.event_tm") == col("l.event_tm")) &
            (col("d.event_seq_nb") == col("l.event_seq_nb")) &
            (col("d.arrival_tm") == col("l.latest_arrival_tm")),
            "inner"
        )
        .select("d.*")  # Select only columns from the original DataFrame
    )

    return df_latest
# Assuming `trade` is already a DataFrame
trade_corrected = applyLatest_groupby(trade)


# Show results
#trade_corrected.show(truncate=False)
'''

In [0]:
'''from pyspark.sql.functions import col, max
from pyspark.sql.window import Window 

# [implement “applyLatest” method to dataframe trade]
def applyLatest(df):
    return (
        df.withColumn("rank", max("arrival_tm").over(
            Window.partitionBy("trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb")
        ))
        .filter(col("rank") == col("arrival_tm"))
        .drop("rank")
    )

# Apply correction
trade_corrected_partitionby = applyLatest(trade)
'''

In [0]:
trade_corrected.dtypes

[('trade_dt', 'date'),
 ('symbol', 'string'),
 ('exchange', 'string'),
 ('event_tm', 'timestamp'),
 ('event_seq_nb', 'int'),
 ('arrival_tm', 'timestamp'),
 ('trade_pr', 'decimal(10,2)')]

In [0]:
'''
def compare_dataframes(df1, df2):
    return df1.exceptAll(df2).count() == 0 and df2.exceptAll(df1).count() == 0

# Example Usage
are_equal = compare_dataframes(trade_corrected, trade_corrected_partitionby)

print(f"Are DataFrames Identical? {are_equal}")
'''

Are DataFrames Identical? True


In [0]:
trade.count()

60

In [0]:
trade_corrected.count()

60

In [0]:
# 3.1.4 Write The Trade Dataset Back To Parquet On Azure Blob Storage
# Define trade dates
trade_dates = ["2020-08-06", "2020-08-05"]

# Write each trade dataset back to Parquet on Azure Blob Storage
for trade_date in trade_dates:
    trade_corrected.filter(f"trade_dt = '{trade_date}'") \
        .write.mode('append') \
        .parquet(f"wasbs://equity-data@trial25.blob.core.windows.net/trade/trade_dt={trade_date}")

print("Parquet files written successfully for trade dates:", trade_dates)

Parquet files written successfully for trade dates: ['2020-08-06', '2020-08-05']


In [0]:
# 3.2 Populate Quote dataset using the same method
quote_location = "wasbs://equity-data@trial25.blob.core.windows.net/output_dir/partition=Q"
quote_common = spark.read.parquet(quote_location)

In [0]:
# 3.1.2 Select The Necessary Columns For Quote Records
quote = quote_common.select("trade_dt", "symbol", "exchange", "event_tm",
"event_seq_nb","arrival_tm","bid_pr","bid_size","ask_pr","ask_size")

In [0]:
quote.select("arrival_tm").distinct().show()

+-------------------+
|         arrival_tm|
+-------------------+
|2020-08-06 09:30:00|
|2020-08-05 09:30:00|
+-------------------+



In [0]:
# Count occurrences of each unique identifier
duplicate_counts = quote.groupBy(
    "trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"
).agg(count("arrival_tm").alias("count"))

# Keep only identifiers that have multiple occurrences (more than 1)
duplicates_only = duplicate_counts.filter(col("count") > 1).drop("count")

# Join back to the original DataFrame to filter only these duplicate rows
quote_duplicates = quote.join(
    duplicates_only,
    on=["trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb"],
    how="inner"
).orderBy("trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb", "arrival_tm")

# Show duplicate rows with different arrival_tm
quote_duplicates.show(truncate=False)

+--------+------+--------+--------+------------+----------+------+--------+------+--------+
|trade_dt|symbol|exchange|event_tm|event_seq_nb|arrival_tm|bid_pr|bid_size|ask_pr|ask_size|
+--------+------+--------+--------+------------+----------+------+--------+------+--------+
+--------+------+--------+--------+------------+----------+------+--------+------+--------+



In [0]:
# Apply function to the quote DataFrame
quote_corrected = applyLatest_groupby(quote)

# Show results
quote_corrected.show(truncate=False)

+----------+------+--------+-----------------------+------------+-------------------+------+--------+------+--------+
|trade_dt  |symbol|exchange|event_tm               |event_seq_nb|arrival_tm         |bid_pr|bid_size|ask_pr|ask_size|
+----------+------+--------+-----------------------+------------+-------------------+------+--------+------+--------+
|2020-08-06|SYMB  |NYSE    |2020-08-06 21:31:19.68 |98          |2020-08-06 09:30:00|36.36 |100     |37.98 |100     |
|2020-08-06|SYMB  |NYSE    |2020-08-06 21:36:50.686|99          |2020-08-06 09:30:00|36.18 |100     |37.41 |100     |
|2020-08-06|SYMB  |NYSE    |2020-08-06 14:24:39.65 |39          |2020-08-06 09:30:00|31.95 |100     |33.67 |100     |
|2020-08-06|SYMB  |NYSE    |2020-08-06 12:09:32.442|21          |2020-08-06 09:30:00|34.83 |100     |36.23 |100     |
|2020-08-06|SYMC  |NYSE    |2020-08-06 11:34:20.858|17          |2020-08-06 09:30:00|161.31|100     |163.21|100     |
|2020-08-06|SYMA  |NYSE    |2020-08-06 16:33:46.316|57  

In [0]:
# 3.2.4 Write The Quote Dataset Back To Parquet On Azure Blob Storage
# Define quote dates
quote_dates = ["2020-08-06", "2020-08-05"]

# Write each quote dataset back to Parquet on Azure Blob Storage
for quote_date in quote_dates:
    quote_corrected.filter(f"quote_dt = '{quote_date}'") \
        .write.mode('append') \
        .parquet(f"wasbs://equity-data@trial25.blob.core.windows.net/quote/quote_dt={quote_date}")

print("Parquet files written successfully for quote dates:", quote_dates)