In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
import os
import json
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, TimestampType, IntegerType, FloatType

spark = SparkSession.builder.master('local').appName('app_ingestion').getOrCreate()

In [3]:
# define a common schema 
schema = StructType([
    StructField('trade_dt', DateType(), True),
    StructField('rec_type', StringType(),True),
    StructField('symbol', StringType(),True),
    StructField('exchange', StringType(),True),
    StructField('event_tm', TimestampType(),True),
    StructField('event_seq_nb', IntegerType(),True),
    StructField('arrival_tm', TimestampType(),True),
    StructField('trade_pr', FloatType(),True),
    StructField('bid_pr', FloatType(),True),
    StructField('bid_size', IntegerType(),True),
    StructField('ask_pr', FloatType(),True),
    StructField('ask_size', IntegerType(),True),
    StructField('partition', StringType(),True)])

In [4]:
#parse csv file
def parse_csv(line:str):
    record =line.split(',')
    try:
        if record[2]=='Q':
            return (datetime.strptime(record[0], '%Y-%m-%d'),
                    record[2],
                    record[3],
                    record[6],
                    datetime.strptime(record[4], '%Y-%m-%d %H:%M:%S.%f'),
                    int(record[5]),
                    datetime.strptime(record[1], '%Y-%m-%d %H:%M:%S.%f'),
                    None,
                    float(record[7]),
                    int(record[8]),
                    float(record[9]),
                    int(record[10]),
                    'Q')
        elif record[2]=='T':
             return (datetime.strptime(record[0], '%Y-%m-%d'),
                    record[2],
                    record[3],
                    record[6],
                    datetime.strptime(record[4], '%Y-%m-%d %H:%M:%S.%f'),
                    int(record[5]),
                    datetime.strptime(record[1], '%Y-%m-%d %H:%M:%S.%f'),
                    float(record[7]),
                    None,
                    int(record[8]),
                    None,
                    None,
                    'T')
    except Exception as e:   
            print("parse_csv exception:"+line)
            return (None,None,None,None,None,None,None,None,None,None,None,None,"B")

In [5]:
#parse json file
def parse_json(line:str):
    record=json.loads(line)
    try:
        # [logic to parse records]
        if record['event_type']=='Q':
            #if any of the fields missing, then, return a "B" record
            #if all(i in record for i in ['trade_dt', 'event_type', 'symbol', 'exchange', 'event_tm',
            #   'event_seq_nb', 'file_tm', 'bid_pr', 'bid_size', 'ask_pr', 'ask_size']):
            return (datetime.strptime(record['trade_dt'], '%Y-%m-%d'), 
                    record['event_type'], 
                    record['symbol'], 
                    record['exchange'], 
                    datetime.strptime(field['event_tm'], '%Y-%m-%d %H:%M:%S.%f'),
                    int(record['event_seq_nb']), 
                    datetime.strptime(field['file_tm'], '%Y-%m-%d %H:%M:%S.%f'), 
                    None, 
                    float(record['bid_pr']), 
                    int(record['bid_size']), 
                    float(record['ask_pr']), 
                    int(record['ask_size']),
                    "Q")
            #else:
                #return (None,None,None,None,None,None,None,None,None,None,None,None,"B")
        elif record['event_type']=='T':
            #if any of the fields missing, then, return a "B" record
            #if all(i in record for i in ['execution_id', trade_dt', 'event_type', 'symbol', 'exchange',
            #  'event_tm', 'event_seq_nb', 'file_tm', 'price', 'size']):
            return (datetime.strptime(field['trade_dt'], '%Y-%m-%d'), 
                    record['event_type'], 
                    record['symbol'], 
                    record['exchange'], 
                    datetime.strptime(record['event_tm'], '%Y-%m-%d %H:%M:%S.%f'),
                    int(record['event_seq_nb']), 
                    datetime.strptime(record['file_tm'], '%Y-%m-%d %H:%M:%S.%f'),
                    float(record['price']), 
                    None, 
                    int(record['size']), 
                    None, 
                    None,
                    "T")
            #else:
                #return (None,None,None,None,None,None,None,None,None,None,None,None,"B")
    except Exception as e:
            print("parse_json exception"+line)
            return (None,None,None,None,None,None,None,None,None,None,None,None,"B")

In [6]:
#rdd for csv file of date 0805
path = 'C:/sb/equity-market-data-analysis/data/csv/2020-08-05/NYSE/'
name =os.listdir(path)
print (name)
raw_csv=spark.sparkContext.textFile(path+name[2])
csv0805_rdd=raw_csv.map(parse_csv)

['.part-00000-5e4ced0a-66e2-442a-b020-347d0df4df8f-c000.txt.crc', '._SUCCESS.crc', 'part-00000-5e4ced0a-66e2-442a-b020-347d0df4df8f-c000.txt', '_SUCCESS']


In [7]:
#rdd for csv file of date 0806
path = 'C:/sb/equity-market-data-analysis/data/csv/2020-08-06/NYSE/'
name =os.listdir(path)
raw_csv=spark.sparkContext.textFile(path+name[2])
csv0806_rdd=raw_csv.map(parse_csv)

In [8]:
#rdd for json file of date 0805
path = 'C:/sb/equity-market-data-analysis/data/json/2020-08-05/NASDAQ/'
name =os.listdir(path)
raw_json=spark.sparkContext.textFile(path+name[2])
json0805_rdd=raw_json.map(parse_json)

In [9]:
#rdd for json file of date 0806
path = 'C:/sb/equity-market-data-analysis/data/json/2020-08-06/NASDAQ/'
name =os.listdir(path)
raw_json=spark.sparkContext.textFile(path+name[2])
json0806_rdd=raw_json.map(parse_json)

In [10]:
#union the DASDAQ and NYSE to get the rdd for date 0805
rdd0805 = csv0805_rdd.union(json0805_rdd)
rdd0806 = csv0806_rdd.union(json0806_rdd)

In [11]:
df = spark.createDataFrame(rdd0805, schema=schema)
df.show()
df.write.partitionBy("partition").mode("overwrite").parquet("c:/sb/equity-market-data-analysis/output_dir/2020-08-05")
df = spark.createDataFrame(rdd0806, schema=schema)
df.show()
df.write.partitionBy("partition").mode("overwrite").parquet("c:/sb/equity-market-data-analysis/output_dir/2020-08-06")


+----------+--------+------+--------+--------------------+------------+-------------------+---------+---------+--------+---------+--------+---------+
|  trade_dt|rec_type|symbol|exchange|            event_tm|event_seq_nb|         arrival_tm| trade_pr|   bid_pr|bid_size|   ask_pr|ask_size|partition|
+----------+--------+------+--------+--------------------+------------+-------------------+---------+---------+--------+---------+--------+---------+
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 09:34:...|           1|2020-08-05 09:30:00|     null| 75.30255|     100| 75.35917|     100|        Q|
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 09:40:...|           2|2020-08-05 09:30:00|     null| 77.20875|     100| 78.90918|     100|        Q|
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 09:50:...|           3|2020-08-05 09:30:00|     null| 77.15973|     100| 77.33205|     100|        Q|
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 09:57:...|           4|2020-08-05 09:30:00|     null