In [1]:
from pyspark.sql import *
from pyspark.sql.types import *

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1666202487840_0004,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
import os
import json
import pandas as pd
from pathlib import Path
import decimal
import logging

In [3]:
# A set of functions

def parse_csv(line):
    """
   This function will parse ".txt" file from blob storage.

   since we are working with comma-separated values file so we want to return event object

   :param line: A line of ".txt" file in CSV format.
   :return: Common event object.
   """
    try:
        record_type_pos = 2 # filter for "Q" or "T" value
        record = line.split(",")
        # Filter by record_type (T = Trade or Q = Quote):
        if record[record_type_pos] == "T":
            # Create event object by using values from record object and performing data type conversion
            event = [record[0], # trade_dt
                     record[1], # file_tm
                     record[2], # event_type
                     record[3], # symbol
                     record[4], # event_tm
                     int(record[5]), # event_seq_nb
                     record[6],   # exchange
                     decimal.Decimal(record[7]), # bid_pr
                     int(record[8]), # bid_size
                     None, None,
                     "T"]
            return event
        elif record[record_type_pos] == "Q":
            event = [record[0], 
                     record[1], 
                     record[2], 
                     record[3], 
                     record[4], 
                     int(record[5]), 
                     record[6],
                     decimal.Decimal(record[7]), 
                     int(record[8]), 
                     decimal.Decimal(record[9]), # ask_pr
                     int(record[10]),            # ask_size
                     "Q"]
            return event
    except Exception as e:
        # Return exception as "Bad record" and convert values to None preceding record_type == "B"
        event = [None, None, None, None, None, None, None, None, None, None, None, "B"]
        logging.error("Bad record", e)
        # print(f"Bad record: {e}")
        return event
    
def parse_json(line):
    """
    This function will parse through each line in the JSON formatted ".txt" file stored from blob storage.

    :param line: Each line of ".txt" file in JSON format.
    :return: common_event() object
    """
    try:
        record = json.loads(line)
        record_type = record["event_type"]
        # Parse records for each type and convert data type as necessary
        # Filter by record_type (T = Trade or Q = Quote):
        if record_type == "T":
            # Create event object based and
            event = [record["trade_dt"], 
                     record["file_tm"], 
                     record["event_type"], 
                     record["symbol"],
                     record["event_tm"], 
                     int(record["event_seq_nb"]), 
                     record["exchange"],
                     decimal.Decimal(record["bid_pr"]), 
                     int(record["bid_size"]), 
                     None, None, # Try place None values
                     "T"]
            return event
        elif record_type == "Q":
            event = [record["trade_dt"], 
                     record["file_tm"], 
                     record["event_type"], 
                     record["symbol"],
                     record["event_tm"], 
                     int(record["event_seq_nb"]), 
                     record["exchange"],
                     decimal.Decimal(record["bid_pr"]), 
                     int(record["bid_size"]), 
                     decimal.Decimal(record["ask_pr"]),
                     int(record["ask_size"]), 
                     "Q"]
            return event
    except Exception as e:
        # Return exception as "Bad record" and convert values to None preceding record_type == "B"
        event = [None, None, None, None, None, None, None, None, None, None, None, "B"]
        logging.error("Bad record", e)
        # print(f"Bad record: {e}")
        return event

In [4]:
# Initialize schema

commonEventSchema = StructType([
            StructField("trade_dt", StringType(), True),
            StructField("file_tm", StringType(), True),
            StructField("record_type", StringType(), True),
            StructField("symbol", StringType(), True),
            StructField("event_tm", StringType(), True),
            StructField("event_seq_nb", IntegerType(), True),
            StructField("exchange", StringType(), True),
            StructField("bid_pr", DecimalType(), True),
            StructField("bid_size", IntegerType(), True),
            StructField("ask_pr", DecimalType(), True),
            StructField("ask_size", IntegerType(), True),
            StructField("partition", StringType(), True)

])

In [5]:
key = 'xDxCsSmD37UlT2gVRhO1+KQ77zbiHlr4BdbAGMeITp/6YIPZeV+pNF/0OYYG8z05cTAvBPGOmia9+AStxVaM/A'
storage_name = 'rupuhdistorage'
container_name = 'rupu-2022-10-19t17-52-17-362z'
csv_dir_1 = '/HdiNotebooks/part-00000-5e4ced0a-66e2-442a-b020-347d0df4df8f-c000.txt'
csv_dir_2 = '/HdiNotebooks/part-00000-214fff0a-f408-466c-bb15-095cd8b648dc-c000.txt'
json_dir_1 = '/HdiNotebooks/part-00000-c6c48831-3d45-4887-ba5f-82060885fc6c-c000.txt'
json_dir_2 = '/HdiNotebooks/part-00000-092ec1db-39ab-4079-9580-f7c7b516a283-c000.txt'

In [6]:
# Create Spark Session
spark = SparkSession.builder.master('local').appName('app').getOrCreate()

spark.conf.set(
        "fs.azure.account.key.%s.blob.core.windows.net" % (storage_name), 
         key
)

spark.conf.set("fs.azure.account.key.pipehdistorage.blob.core.windows.net","UfE04fjAVTlo5NtelKegLyczL21tpz8AQmaY0Mo3UoCj5vTPmuIBzMIrJ9G7jGePhcmUHkRT5BiV+ASti/8VVg" )


# Initilize spark context
sc = spark.sparkContext

# Raw text files
raw_csv_1 = sc.textFile("wasbs://%s@%s.blob.core.windows.net%s" %( container_name, storage_name, csv_dir_1))

raw_csv_2 = sc.textFile( "wasbs://%s@%s.blob.core.windows.net%s" %( container_name, storage_name, csv_dir_2))

raw_json_1 = sc.textFile( "wasbs://%s@%s.blob.core.windows.net%s" %( container_name, storage_name, json_dir_1))

raw_json_2 =  sc.textFile( "wasbs://%s@%s.blob.core.windows.net%s" %( container_name, storage_name, json_dir_2))

# Parsed files
parsed_csv1 = raw_csv_1.map(lambda line: parse_csv(line))
parsed_csv2 = raw_csv_2.map(lambda line: parse_csv(line))

parsed_json1= raw_json_1.map(lambda line: parse_json(line))
parsed_json2= raw_json_2.map(lambda line: parse_json(line))

# Create Data Frames
spark_df1 = spark.createDataFrame(parsed_csv1, commonEventSchema)
spark_df2 = spark.createDataFrame(parsed_csv2, commonEventSchema)
spark_df3 = spark.createDataFrame(parsed_json1, commonEventSchema)
spark_df4 = spark.createDataFrame(parsed_json2, commonEventSchema)

In [7]:
union_df = spark_df1.union(spark_df2)\
                    .union(spark_df3)\
                    .union(spark_df4)

In [14]:
union_df.write.partitionBy("partition").mode("overwrite").format("parquet").save("/HdiNotebooks/output_dir")

In [15]:
trade_common = spark.read.parquet("output_dir/partition=T")

In [17]:
quote_common = spark.read.parquet("output_dir/partition=Q")

In [18]:
trade = trade_common.select("trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb", "file_tm", "bid_pr")
quote = quote_common.select("trade_dt", "symbol", "exchange", "event_tm", "event_seq_nb", "file_tm", "bid_pr")

In [19]:
trade.show(30)

+----------+------+--------+--------------------+------------+--------------------+------+
|  trade_dt|symbol|exchange|            event_tm|event_seq_nb|             file_tm|bid_pr|
+----------+------+--------+--------------------+------------+--------------------+------+
|2020-08-06|  SYMB|    NYSE|2020-08-06 16:57:...|          60|2020-08-06 09:30:...|    33|
|2020-08-06|  SYMB|    NYSE|2020-08-06 18:06:...|          70|2020-08-06 09:30:...|    34|
|2020-08-06|  SYMB|    NYSE|2020-08-06 19:21:...|          80|2020-08-06 09:30:...|    33|
|2020-08-06|  SYMB|    NYSE|2020-08-06 20:36:...|          90|2020-08-06 09:30:...|    33|
|2020-08-06|  SYMB|    NYSE|2020-08-06 21:46:...|         100|2020-08-06 09:30:...|    36|
|2020-08-06|  SYMC|    NYSE|2020-08-06 10:42:...|          10|2020-08-06 09:30:...|   157|
|2020-08-06|  SYMC|    NYSE|2020-08-06 11:52:...|          20|2020-08-06 09:30:...|   160|
|2020-08-06|  SYMC|    NYSE|2020-08-06 13:01:...|          30|2020-08-06 09:30:...|   158|

In [20]:
quote.show(30)

+----------+------+--------+--------------------+------------+--------------------+------+
|  trade_dt|symbol|exchange|            event_tm|event_seq_nb|             file_tm|bid_pr|
+----------+------+--------+--------------------+------------+--------------------+------+
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 09:38:...|           1|2020-08-06 09:30:...|    78|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 09:46:...|           2|2020-08-06 09:30:...|    77|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 09:52:...|           3|2020-08-06 09:30:...|    79|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 09:58:...|           4|2020-08-06 09:30:...|    76|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:07:...|           5|2020-08-06 09:30:...|    77|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:15:...|           6|2020-08-06 09:30:...|    79|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:22:...|           7|2020-08-06 09:30:...|    78|
|2020-08-06|  SYMA|  NASDAQ|2020-08-06 10:29:...|           8|2020-08-06 09:30:...|    76|

In [56]:
from pyspark.sql import functions as func
def applyLatest(df):
    
    #grouping the dataset by trade date and then aggregating it by maximum arrival time
    correct_df = df.groupBy("trade_dt").agg(func.max("event_tm").alias("max_arrival_time"))
    # return only the latest arrival time
    return correct_df.select("trade_dt").collect()[0][0]

In [57]:
trade_corrected = applyLatest(trade)
quote_corrected = applyLatest(quote)

In [58]:
trade_corrected

'2020-08-06'

In [59]:
quote_corrected

'2020-08-06'

In [67]:
trade.write.parquet("/HdiNotebooks/trade/trade_dt={}".format(trade_corrected)) #formatting with corrected schema 

In [68]:
quote.write.parquet("/HdiNotebooks/quote/trade_dt={}".format(quote_corrected))