### Reading Trade partition dataset from its temporary location

In [1]:
trade_common = spark.read.parquet("/HdiNotebooks/output_dir_csv_json/partition=T/*.parquet")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1613489928550_0006,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


### Selecting the Necessary columns for Trade Records

In [2]:
trade = trade_common.select("trade_dt", "symbol", "exchange", "event_tm","event_seq_nb", "arrival_tm", "trade_pr")

### Applying Data Correction

In [3]:
from pyspark.sql.window import Window
from datetime import date
import pyspark.sql.functions as F 


# In the exchange dataset, you can uniquely identify a record by the combination of trade_dt,
# symbol, exchange, event_tm, event_seq_nb. However, the exchange may correct an error in
# any submitted record by sending a new record with the same uniqueID. Such records will come with later arrival_tm. 

# Below code uses row_number and window partition and orderby to accept records with latest arrival_tme

trade_corrected=trade.withColumn("row_number",F.row_number().over(Window.partitionBy(trade.trade_dt,\
                   trade.symbol,trade.exchange,trade.event_tm,trade.event_seq_nb) \
                   .orderBy(trade.arrival_tm.desc()))).filter(F.col("row_number")==1).drop("row_number")

### Writing the Trade Dataset back to Azure Storage

In [4]:
trade_date = "2020-08-05"
trade_corrected.coalesce(1).write.parquet("wasbs://guidedsparkpro-2021-02-16t15-01-46-464z@guidedsparkprhdistorage.blob.core.windows.net/trade/trade_dt={}".format(trade_date))

### Reading the Quote partition dataset from its temporary location

In [5]:
quote_common = spark.read.parquet("/HdiNotebooks/output_dir_csv_json/partition=Q/*.parquet")

### Selecting the Necessary columns for Quote Records

In [7]:
quote=quote_common.select("trade_dt","symbol","exchange","event_tm","event_seq_nb","arrival_tm","bid_pr","ask_pr")

In [9]:
from pyspark.sql.window import Window
from datetime import date
import pyspark.sql.functions as F 

# In the exchange dataset, you can uniquely identify a record by the combination of trade_dt,
# symbol, exchange, event_tm, event_seq_nb. However, the exchange may correct an error in
# any submitted record by sending a new record with the same uniqueID. Such records will come with later arrival_tm. 

# Below code uses row_number and window partition and orderby to accept records with latest arrival_tme

quote_corrected=quote.withColumn("row_number",F.row_number().over(Window.partitionBy(quote.trade_dt,quote.symbol,\
                                            quote.exchange,quote.event_tm,quote.event_seq_nb).\
                                            orderBy(quote.arrival_tm.desc()))).filter(F.col("row_number")==1).drop("row_number")

### Writing back the Quote Dataset back to Azure Storage

In [10]:
trade_date = "2020-08-05"
quote.coalesce(1).write.parquet("wasbs://guidedsparkpro-2021-02-16t15-01-46-464z@guidedsparkprhdistorage.blob.core.windows.net/quote/trade_dt={}".format(trade_date))