In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import *

import os

In [None]:

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("reader") \
    .master("local[*]") \
    .getOrCreate()

In [None]:
# Define the schema of the CSV file, if known.
# If the CSV file has a header, you can let Spark infer the schema automatically.
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("symbol", StringType(), True),
    StructField("order_side", StringType(), True),
    StructField("size", DoubleType(), True),
    StructField("price", DoubleType(), True),
    StructField("status", StringType(), True),
    StructField("created_at", IntegerType(), True),
    StructField("total", DoubleType(), True),
    StructField("cum_sum", DoubleType(), True),
])

In [None]:
# Path to your local CSV file
this_script_dir = os.getcwd()
output_dir_path_string = os.path.join(
    os.path.dirname(
        this_script_dir
    ),
    '..',
    'src',
    'main',
    'scala',
    'org',
    'pintu',
    'output',
    'date_partition=2023-11-06'
)

source_dir_path_string = os.path.join(
    os.path.dirname(
        this_script_dir
    ),
    'producer',
    'order_book_mockup.csv'
)

In [None]:
# Read the parquet data
df = spark.read.parquet(output_dir_path_string)

# Read Source data
source_df = spark.read.csv(path=source_dir_path_string, header=True)


In [None]:
source_df.printSchema()

In [None]:
df.printSchema()

In [None]:
# Show the DataFrame content
df.show()

In [None]:
df.count()
# Before: 7825
# After Filtered: 5985

In [None]:
source_df.count()
# Before: 7826
# After: 7826

In [None]:
filtered_status_df = df.filter(col('status') == "CLOSED")
filtered_status_df.count()
# Before: 30
# After: 0

In [None]:
filtered_order_side_buy_df = df.filter(col('side') == "BUY")
filtered_order_side_buy_df.show()
filtered_order_side_buy_df.count()
# SELL: 2990
# BUYL: 2995
# Total: 5985

In [None]:
filtered_order_side_sell_df = df.filter(col('side') == "SELL")
filtered_order_side_sell_df.show()
filtered_order_side_sell_df.count()

In [None]:
filtered_df = source_df.filter(col('order_id') == 110)
filtered_df.show()

# Wed, 06 Sep 2023 07:08:28 GMT
# Wednesday, September 6, 2023 6:38:28 AM

In [None]:
source_filtered_status_df = source_df.filter(col('status') == "CLOSED")
source_filtered_status_df.count()
# Before: 1826
# After: -

In [None]:
source_filtered_order_side_df = source_df.filter(col('order_side').isNull())
source_filtered_order_side_df.count()
# SELL: 2994
# BUY: 3006
# NULL: 1826
# Total: 7826

In [None]:
# Define the window specification
windowSpec = Window.partitionBy('order_id').orderBy('created_at').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Add a new column 'isClosed' which will be true if the last status in the window is 'CLOSED'
source_dedup_df = source_df.withColumn('isClosed', when(last(col('status')).over(windowSpec) == 'CLOSED', True).otherwise(False))
source_dedup_df.show()

# OPEN:   Wednesday, September 6, 2023 4:05:08 AM
# CLOSED: Wednesday, September 6, 2023 5:50:08 AM

In [None]:
# Ensure to use parentheses properly and '==' for equality checks.
# Also, use the bitwise '&' for 'and' operations within the filter function.
source_open_dedup_df = source_dedup_df.filter((col("isClosed") == False) & (col("status") == "OPEN"))

# Show the results
source_open_dedup_df.show()

# Count the number of rows
source_open_dedup_df.count()

In [None]:
source_open_dedup_df = source_open_dedup_df.withColumn("total", col("size") * col("price"))

# Define the window specification
windowSpec = Window.partitionBy('order_side').orderBy('created_at').rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Calculate the cumulative sum within each partition defined by 'order_side'
source_cumsum_df = source_open_dedup_df.withColumn('cumulative_sum', sum(col("total")).over(windowSpec))
source_cumsum_df.show()

# OPEN:   Wednesday, September 6, 2023 4:05:08 AM
# CLOSED: Wednesday, September 6, 2023 5:50:08 AM

In [None]:
# Define the window specification for row number without ordering as ordering is not required for this operation
window_spec = Window.partitionBy("order_side").orderBy(lit(0))

# Filter BUY and SELL into separate DataFrames and add row numbers
buy_df = source_cumsum_df.filter(col("order_side") == "BUY") \
    .withColumn("row_num", row_number().over(window_spec)) \
    .withColumnRenamed("order_side", "buy_side") \
    .withColumn("buy_side", concat(lit("buy_"), col("row_num")))

sell_df = source_cumsum_df.filter(col("order_side") == "SELL") \
    .withColumn("row_num", row_number().over(window_spec)) \
    .withColumnRenamed("order_side", "sell_side") \
    .withColumn("sell_side", concat(lit("sell_"), col("row_num")))

# Combine BUY and SELL with a union and sort by row number
combined_df = buy_df.select(col("buy_side").alias("side"), "*") \
    .union(sell_df.select(col("sell_side").alias("side"), "*")) \
    .sort("row_num")

combined_df.show()
combined_df.count()