In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
import json
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType, DecimalType
from typing import List


In [3]:
spark = SparkSession.builder.master("local").appName("app").getOrCreate()


In [4]:
    csv_raw = spark.sparkContext.textFile("tq.csv")

In [5]:
def parse_csv(line:str):
    record_type_pos = 2
    record = line.split(",")
    try:
        # [logic to parse records]
        if record[record_type_pos] == "Q":
            event = [record[0], record[2], record[3], record[6], record[4], record[5], \
                     record[1], None, record[7], record[8], record[9], record[10], "Q"]
            return event
        elif record[record_type_pos] == "T":
            event = [record[0], record[2], record[3], record[6], record[4], record[5], \
                     record[1], record[7], None, record[8], None, None, "T"]
            return event
    except Exception as e:
        # [save record to dummy event in bad partition]
        # [fill in the fields as None or empty string]
        return [None,None,None,None,None,None,None,None,None,None,None,None,"B"]


In [6]:
    csv_parsed = csv_raw.map(lambda line: parse_csv(line))


In [7]:
schema = StructType([
    StructField("trade_dt", StringType(), True),
    StructField("rec_type", StringType(),True),
    StructField("symbol",   StringType(),True),
    StructField("exchange", StringType(), True),
    StructField("event_tm", StringType(), True),
    StructField("event_seq_nb", StringType(), True),
    StructField("arrival_tm",   StringType(), True),
    StructField("trade_pr", StringType(),True),
    StructField("bid_pr",   StringType(),True),
    StructField("bid_size", StringType(), True),
    StructField("ask_pr",   StringType(), True),
    StructField("ask_size", StringType(), True),
    StructField("partition",    StringType(), True)
  ])


In [8]:
 csv_data = spark.createDataFrame(csv_parsed, schema=schema)


In [9]:
csv_data.show(10)


+----------+--------+------+--------+--------------------+------------+--------------------+-----------------+-----------------+--------+-----------------+--------+---------+
|  trade_dt|rec_type|symbol|exchange|            event_tm|event_seq_nb|          arrival_tm|         trade_pr|           bid_pr|bid_size|           ask_pr|ask_size|partition|
+----------+--------+------+--------+--------------------+------------+--------------------+-----------------+-----------------+--------+-----------------+--------+---------+
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 09:34:...|           1|2020-08-05 09:30:...|             null|75.30254839137037|     100|75.35916738004924|     100|        Q|
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 09:40:...|           2|2020-08-05 09:30:...|             null|77.20874619466693|     100|78.90918015646369|     100|        Q|
|2020-08-05|       Q|  SYMA|    NYSE|2020-08-05 09:50:...|           3|2020-08-05 09:30:...|             null|77.159732732512

In [10]:
def parse_json(line:str):
    record = json.loads(line)
    record_type = record['event_type']
    try:
        # [logic to parse records]
        if record_type == "T":
            # [Get the applicable field values from json]
            if all(i in record for i in ['execution_id','trade_dt', 'event_type', 'symbol', 'exchange',
                'event_tm', 'event_seq_nb', 'file_tm', 'price', 'size']):
                event = [record['trade_dt'], record['event_type'], record['symbol'], record['exchange'], record['event_tm'],
                         record['event_seq_nb'], record['file_tm'], record['price'], None, record['size'], None, None,"T"]
            else:
                event = [None,None,None,None,None,None,None,None,None,None,None,None,"B"]
        elif record_type == "Q":
            # [Get the applicable field values from json]
            #if 'event_seq_nb' in record: 
            if all(i in record for i in ['trade_dt', 'event_type', 'symbol', 'exchange', 'event_tm',
                'event_seq_nb', 'file_tm', 'bid_pr', 'bid_size', 'ask_pr', 'ask_size']):
                event = [record['trade_dt'], record['event_type'], record['symbol'], record['exchange'], record['event_tm'],
                         record['event_seq_nb'], record['file_tm'], None, record['bid_pr'], record['bid_size'], record['ask_pr'], record['ask_size'],"Q"]
            else:
                event = [None,None,None,None,None,None,None,None,None,None,None,None,"B"]
        return event
    except Exception as e:
        # [save record to dummy event in bad partition]
        # [fill in the fields as None or empty string]
        return common_event[None,None,None,None,None,None,None,None,None,None,None,None,"B"]


In [11]:
json_raw = spark.sparkContext.textFile("tq.json")
json_parsed = json_raw.map(lambda line: parse_json(line))
json_data = spark.createDataFrame(json_parsed, schema=schema)
json_data.show(10)


+----------+--------+------+--------+--------------------+------------+--------------------+-----------------+-----------------+--------+-----------------+--------+---------+
|  trade_dt|rec_type|symbol|exchange|            event_tm|event_seq_nb|          arrival_tm|         trade_pr|           bid_pr|bid_size|           ask_pr|ask_size|partition|
+----------+--------+------+--------+--------------------+------------+--------------------+-----------------+-----------------+--------+-----------------+--------+---------+
|2020-08-05|       Q|  SYMA|  NASDAQ|2020-08-05 09:36:...|           1|2020-08-05 09:30:...|             null|76.10016521142818|     100| 77.9647975908747|     100|        Q|
|2020-08-05|       Q|  SYMA|  NASDAQ|2020-08-05 09:42:...|           2|2020-08-05 09:30:...|             null|75.44372945251948|     100|75.94452858561046|     100|        Q|
|2020-08-05|       Q|  SYMA|  NASDAQ|2020-08-05 09:48:...|           3|2020-08-05 09:30:...|             null|78.847985648284

In [12]:
    # Save output
    csv_data.write.partitionBy("partition").mode("overwrite").parquet("output_dir")
    json_data.write.partitionBy("partition").mode("overwrite").parquet("output_dir")