In [0]:
!pip install findspark
!pip install pyspark

In [0]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DateType, DecimalType

# Define the schema using StructType
schema = StructType([
    StructField("trade_dt", DateType(), True),
    StructField("rec_type", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("exchange", StringType(), True),
    StructField("event_tm", TimestampType(), True),
    StructField("event_seq_nb", IntegerType(), True),
    StructField("arrival_tm", TimestampType(), True),
    StructField("trade_pr", DecimalType(10, 2), True),  # DecimalType(10, 2) is an example; adjust precision and scale as needed
    StructField("bid_pr", DecimalType(10, 2), True),  # Adjust precision and scale as needed
    StructField("bid_size", IntegerType(), True),
    StructField("ask_pr", DecimalType(10, 2), True),  # Adjust precision and scale as needed
    StructField("ask_size", IntegerType(), True),
    StructField("partition", StringType(), True)
])



In [0]:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.master('local').appName('app').getOrCreate()
# spark.conf.set(
# "fs.azure.account.key.dbstorage7ezax62buslq2.blob.core.windows.net",
# "<your-storage-account-access-key>"
# )
# raw =
# spark.textFile("wasbs://<container-name>@dbstorage7ezax62buslq2.blob.core.w
# indows.net/<path_in_container>")
# parsed = raw.map(lambda line: parse_json(line))
csv_1 = "dbfs:/Volumes/data_engineering/default/data/part-00000-5e4ced0a-66e2-442a-b020-347d0df4df8f-c000.txt"
#data = spark.createDataFrame(parsed)
data = spark.read.csv(csv_1, header=True, inferSchema=True)

In [0]:
json_1 = "dbfs:/Volumes/data_engineering/default/data/part-00000-c6c48831-3d45-4887-ba5f-82060885fc6c-c000.txt"
nyc_data = spark.read.json(json_1)

In [0]:
nyc_data.show(5)

+-----------------+--------+-----------------+--------+------------+--------------------+----------+--------+------------+--------------------+-----+----+------+----------+
|           ask_pr|ask_size|           bid_pr|bid_size|event_seq_nb|            event_tm|event_type|exchange|execution_id|             file_tm|price|size|symbol|  trade_dt|
+-----------------+--------+-----------------+--------+------------+--------------------+----------+--------+------------+--------------------+-----+----+------+----------+
| 77.9647975908747|     100|76.10016521142818|     100|           1|2020-08-05 09:36:...|         Q|  NASDAQ|        NULL|2020-08-05 09:30:...| NULL|NULL|  SYMA|2020-08-05|
|75.94452858561046|     100|75.44372945251948|     100|           2|2020-08-05 09:42:...|         Q|  NASDAQ|        NULL|2020-08-05 09:30:...| NULL|NULL|  SYMA|2020-08-05|
|80.69114407667608|     100|78.84798564828422|     100|           3|2020-08-05 09:48:...|         Q|  NASDAQ|        NULL|2020-08-05 09

In [0]:
from typing import List
from pyspark.sql import Row

def parse_csv(line:str):
    record_type_pos = 2
    record = line.split(",")
    try:
        # [logic to parse records]
        if record[record_type_pos] == "T":
            fields = record
            return Row(date=fields[0], datetime=fields[1], record_type=fields[2], symbol=fields[3], timestamp=fields[4], id=int(fields[5]), exchange=fields[6], price1=float(fields[7]), qty1=int(fields[8]), price2=float(fields[9]), qty2=int(fields[10]))
            #event = common_event(col1_val, col2_val, ..., "T","")
            #return event
        elif record[record_type_pos] == "Q":
            fields = record
            return Row(date=fields[0], datetime=fields[1], record_type=fields[2], symbol=fields[3], timestamp=fields[4], id=int(fields[5]), exchange=fields[6], price1=float(fields[7]), qty1=int(fields[8]), price2=float(fields[9]), qty2=int(fields[10]))
            #event = common_event(col1_val, col2_val, … , "Q","")
            #return event
    except Exception as e:
        # [save record to dummy event in bad partition]
        # [fill in the fields as None or empty string]
        #return common_event(,,,....,,,,,"B",line)
        fields = [None]*11
        return Row(date=fields[0], datetime=fields[1], record_type=fields[2], symbol=fields[3], timestamp=fields[4], id=int(fields[5]), exchange=fields[6], price1=float(fields[7]), qty1=int(fields[8]), price2=float(fields[9]), qty2=int(fields[10]))

In [0]:
sc = spark.sparkContext
raw = sc.textFile("dbfs:/Volumes/data_engineering/default/data/part-00000-5e4ced0a-66e2-442a-b020-347d0df4df8f-c000.txt")
print(raw.take(5))
parsed = raw.map(lambda line: parse_csv(line))
print(parsed.take(5)) 

data = spark.createDataFrame(parsed)
print(data.show(5))


['2020-08-05,2020-08-05 09:30:00.0,Q,SYMA,2020-08-05 09:34:51.505,1,NYSE,75.30254839137037,100,75.35916738004924,100', '2020-08-05,2020-08-05 09:30:00.0,Q,SYMA,2020-08-05 09:40:52.586,2,NYSE,77.20874619466693,100,78.90918015646369,100', '2020-08-05,2020-08-05 09:30:00.0,Q,SYMA,2020-08-05 09:50:04.681,3,NYSE,77.15973273251218,100,77.3320469411047,100', '2020-08-05,2020-08-05 09:30:00.0,Q,SYMA,2020-08-05 09:57:46.343,4,NYSE,79.29977331004093,100,80.08399307353596,100', '2020-08-05,2020-08-05 09:30:00.0,Q,SYMA,2020-08-05 10:06:50.886,5,NYSE,77.8634951217078,100,78.30821537434917,100']
[Row(date='2020-08-05', datetime='2020-08-05 09:30:00.0', record_type='Q', symbol='SYMA', timestamp='2020-08-05 09:34:51.505', id=1, exchange='NYSE', price1=75.30254839137037, qty1=100, price2=75.35916738004924, qty2=100), Row(date='2020-08-05', datetime='2020-08-05 09:30:00.0', record_type='Q', symbol='SYMA', timestamp='2020-08-05 09:40:52.586', id=2, exchange='NYSE', price1=77.20874619466693, qty1=100, pri

In [0]:
import json
from pyspark.sql import Row


schema = StructType([
    StructField("trade_dt", DateType(), True),
    StructField("rec_type", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("exchange", StringType(), True),
    StructField("event_tm", TimestampType(), True),
    StructField("event_seq_nb", IntegerType(), True),
    StructField("arrival_tm", TimestampType(), True),
    StructField("trade_pr", DecimalType(10, 2), True),  # DecimalType(10, 2) is an example; adjust precision and scale as needed
    StructField("bid_pr", DecimalType(10, 2), True),  # Adjust precision and scale as needed
    StructField("bid_size", IntegerType(), True),
    StructField("ask_pr", DecimalType(10, 2), True),  # Adjust precision and scale as needed
    StructField("ask_size", IntegerType(), True),
    StructField("partition", StringType(), True)
])

# Reorder this common event function with the data types
def common_event(trade_dt, rec_type, symbol, exchange, event_tm, event_seq_nb, arrival_tm, trade_pr, bid_pr, bid_size, ask_pr, ask_size, partition):
    """
    Example common_event function to return a Row object.
    Adjust this function as needed to match your requirements.
    """
    return (trade_dt, rec_type, symbol, exchange, event_tm, event_seq_nb, arrival_tm, trade_pr, bid_pr, bid_size, ask_pr, ask_size, partition)

def parse_json(line: str):
    try:
        # Parse the JSON line into a Python dictionary
        record = json.loads(line)
        record_type = record.get('event_type', '')

        # Logic to parse records based on their type
        if record_type == "Q":
            # Get the applicable field values from the JSON record
            trade_dt = record.get('trade_dt', None)
            rec_type = record.get('rec_type', None)
            symbol = record.get('symbol', None)
            exchange = record.get('exchange', None)
            event_tm = record.get('event_tm', None)
            event_seq_nb = record.get('event_seq_nb', None)
            arrival_tm = record.get('arrival_tm', None)
            trade_pr = record.get('trade_pr', None)
            bid_pr = record.get('bid_pr', None)
            bid_size = record.get('bid_size', None)
            ask_pr = record.get('ask_pr', None)
            ask_size = record.get('ask_size', None)
            partition = record.get('partition', None)

            if not isdigit(ask_size):
                ask_size = 0

            # Check if any key fields are empty
            if not all([trade_dt, file_tm, symbol, event_tm, event_seq_nb, exchange, bid_pr, bid_size, ask_pr, ask_size]):
                event = common_event(trade_dt, file_tm, "Q", symbol, event_tm, event_seq_nb, exchange, bid_pr, bid_size, ask_pr, ask_size, "MISSING_FIELDS", line)
            else:
                event = common_event(trade_dt, file_tm, "Q", symbol, event_tm, event_seq_nb, exchange, bid_pr, bid_size, ask_pr, ask_size, "VALID", "")
            return event

        # Handle other event types like "T" if needed
        elif record_type == "T":
            # Assuming similar fields for "T" type; adjust as necessary
            trade_dt = record.get('trade_dt', None)
            file_tm = record.get('file_tm', None)
            # Add other fields extraction logic here for type "T"

            # Check if any key fields are empty
            if not all([trade_dt, file_tm]):  # Adjust based on required fields
                event = common_event(trade_dt, file_tm, "T", None, None, None, None, None, None, None, None, "MISSING_FIELDS", line)
            else:
                event = common_event(trade_dt, file_tm, "T", None, None, None, None, None, None, None, None, "VALID", "")
            return event

    except Exception as e:
        # Handle parsing exceptions by saving the record to a dummy event in bad partition
        print(f"Error parsing line: {line} - {str(e)}")
        # Fill in the fields as None or empty string
        return common_event(None, None, "B", None, None, None, None, None, None, None, None, "ERROR", line)



In [0]:
def parse_json(line:str):
    
    record = json.loads(line)
    record_type = record.get('event_type', '')
    try:
        date = record.get('trade_dt', None)
        file_tm = record.get('file_tm', None)
        symbol = record.get('symbol', None)
        event_tm = record.get('event_tm', None)
        event_seq_nb = record.get('event_seq_nb', None)
        exchange = record.get('exchange', None)
        bid_pr = record.get('bid_pr', None)
        bid_size = record.get('bid_size', None)
        ask_pr = record.get('ask_pr', None)
        ask_size = record.get('ask_size', None)
        
        # [logic to parse records]
        if record_type == "T":
            
            return Row(date=date, datetime=file_tm, record_type=record_type, symbol=symbol, timestamp=event_tm, id=event_seq_nb, exchange=exchange, price1=bid_pr, qty1=bid_size, price2=ask_pr, qty2=ask_size)
            #event = common_event(col1_val, col2_val, ..., "T","")
            #return event
        elif record_type == "Q":
            
            return Row(date=date, datetime=file_tm, record_type=record_type, symbol=symbol, timestamp=event_tm, id=event_seq_nb, exchange=exchange, price1=bid_pr, qty1=bid_size, price2=ask_pr, qty2=ask_size)
            #event = common_event(col1_val, col2_val, … , "Q","")
            #return event
    except Exception as e:
        # [save record to dummy event in bad partition]
        # [fill in the fields as None or empty string]
        #return common_event(,,,....,,,,,"B",line)
        fields = [None]*11
        return Row(date=fields[0], datetime=fields[1], record_type=fields[2], symbol=fields[3], timestamp=fields[4], id=fields[5], exchange=fields[6], price1=fields[7], qty1=fields[8], price2=fields[9], qty2=fields[10])

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

json_1 = "dbfs:/Volumes/data_engineering/default/data/part-00000-c6c48831-3d45-4887-ba5f-82060885fc6c-c000.txt"
json_raw = sc.textFile(json_1)
print(json_raw.take(5))
parsed = json_raw.map(lambda line: parse_json(line))
print(parsed)
data = spark.createDataFrame(parsed)
print(data.show(5))

['{"trade_dt":"2020-08-05","file_tm":"2020-08-05 09:30:00.000","event_type":"Q","symbol":"SYMA","event_tm":"2020-08-05 09:36:55.284","event_seq_nb":1,"exchange":"NASDAQ","bid_pr":76.10016521142818,"bid_size":100,"ask_pr":77.9647975908747,"ask_size":100}', '{"trade_dt":"2020-08-05","file_tm":"2020-08-05 09:30:00.000","event_type":"Q","symbol":"SYMA","event_tm":"2020-08-05 09:42:32.247","event_seq_nb":2,"exchange":"NASDAQ","bid_pr":75.44372945251948,"bid_size":100,"ask_pr":75.94452858561046,"ask_size":100}', '{"trade_dt":"2020-08-05","file_tm":"2020-08-05 09:30:00.000","event_type":"Q","symbol":"SYMA","event_tm":"2020-08-05 09:48:06.767","event_seq_nb":3,"exchange":"NASDAQ","bid_pr":78.84798564828422,"bid_size":100,"ask_pr":80.69114407667608,"ask_size":100}', '{"trade_dt":"2020-08-05","file_tm":"2020-08-05 09:30:00.000","event_type":"Q","symbol":"SYMA","event_tm":"2020-08-05 09:53:09.803","event_seq_nb":4,"exchange":"NASDAQ","bid_pr":74.98336890552693,"bid_size":100,"ask_pr":76.162565308

In [0]:
data.dtypes

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
event_type_counts = data.groupBy("event_type").count()
display(event_type_counts)

In [0]:
#data.write.partitionBy("partition").mode("overwrite").parquet("output_dir")
data.write.partitionBy("record_type").mode("overwrite").parquet("output_dir")
#output_dir/partition=T/
#output_dir/partition=Q/
#output_dir/partition=B/

In [0]:
output_dir/record_type=='T'

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-3739635544351247>, line 1[0m
[0;32m----> 1[0m output_dir[38;5;241m/[39mrecord_type[38;5;241m==[39m[38;5;124m'[39m[38;5;124mT[39m[38;5;124m'[39m

[0;31mNameError[0m: name 'output_dir' is not defined