In [1]:
import os
import datetime
import json
import logging
import yaml
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.window import Window
from pyspark.sql.functions import year, month, dayofmonth,to_timestamp,to_date,split,substring,col,when
from pyspark.sql.types import StructType,StructField,DateType,TimestampType,StringType,IntegerType,DecimalType
from pyspark.sql.functions import udf, lit,split,concat,ceil


In [2]:
# Creating a spark session
spark = SparkSession.builder.enableHiveSupport().appName("Analytical_ETL_spark").getOrCreate()

In [5]:
# Loading the current date (7/29/2020) to a dataframe (df)
df = spark.read.parquet("C:/Users/KEVAL/Desktop/AnalyticalETL/trade/partition=2021-07-29")

SyntaxError: EOL while scanning string literal (<ipython-input-5-1c945f076be1>, line 2)

In [None]:
df.createOrReplaceTempView("tmp_trade_moving_avg")

In [None]:
# Calculating the moving average for last 30 minutes
mov_avg_df = spark.sql("""select symbol, exchange, trade_dt,event_tm, event_seq_nb, trade_pr,
                        AVG(trade_pr) OVER(PARTITION BY (symbol) ORDER BY CAST(event_tm AS timestamp) 
                        RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND CURRENT ROW) as mov_avg_pr
                        from tmp_trade_moving_avg""")

In [None]:

# DROP HIVE TABLE IF EXISTS
spark.sql("DROP TABLE IF EXISTS temp_trade_moving_avg")

In [None]:

# Save the data frame as table in hive
mov_avg_df.write.saveAsTable("temp_trade_moving_avg")

In [None]:
mov_avg_df.show(5)

In [None]:
# Loading the previous date (7/28/2020) to a dataframe (df)
df = spark.read.parquet("C:/Users/KEVAL/Desktop/AnalyticalETL/trade/partition=2021-07-28")

In [None]:
df.createOrReplaceTempView("temp_last_trade")

In [None]:
# Calculating the moving average for last 30 minutes
last_pr_df = spark.sql("""select symbol, trade_dt,exchange, event_tm, event_seq_nb, trade_pr,
                        AVG(trade_pr) OVER(PARTITION BY (symbol) ORDER BY CAST(event_tm AS timestamp) 
                        RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND CURRENT ROW) as last_pr
                        from temp_last_trade""")

In [None]:
# DROP HIVE TABLE IF EXISTS
spark.sql("DROP TABLE IF EXISTS temp_last_trade")

In [None]:
# Save the data frame as table in hive
last_pr_df.write.saveAsTable("temp_last_trade")

In [None]:
last_pr_df.show(5)

In [None]:
#Load Quotes Data for 2021-04-18
quotes = spark.read.parquet("hdfs://localhost:8020/stock/partition=Q")

In [None]:
quotes.createOrReplaceTempView("quotes")

In [None]:
#Union Quotes and Trades into Union View
quote_union = spark.sql("""
SELECT trade_dt,rec_type,symbol,event_tm,event_seq_nb,exchange,bid_pr,bid_size,ask_pr,
ask_size,null as trade_pr,null as mov_avg_pr FROM quotes
UNION
SELECT trade_dt,"T" as rec_type,symbol,event_tm,event_seq_nb,exchange,null as bid_pr,null as bid_size,null as ask_pr,
null as ask_size,trade_pr,mov_avg_pr FROM temp_trade_moving_avg
""")

In [None]:
quote_union.createOrReplaceTempView("quote_union")

In [None]:
#Populate the last not null trade price and moving average price from trade records
quote_union_update = spark.sql("""
select *,
last_value(trade_pr,true) OVER(PARTITION BY (symbol) 
ORDER BY CAST(event_tm AS timestamp) DESC) as last_trade_pr,
last_value(mov_avg_pr,true) OVER(PARTITION BY (symbol) 
ORDER BY CAST(event_tm AS timestamp) DESC) as last_mov_avg_pr
from quote_union
""")

In [None]:
quote_union_update.createOrReplaceTempView("quote_union_update")

In [None]:
#Filter Quote records
quote_update = spark.sql("""
select trade_dt, symbol, event_tm, event_seq_nb, exchange,
bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr
from quote_union_update
where rec_type = 'Q'
""")
quote_update.createOrReplaceTempView("quote_update")

In [None]:
#Join with table temp_last_trade to retrieve prior day close day
quote_final = spark.sql("""
select trade_dt, A.symbol, event_tm, event_seq_nb, exchange,
bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr,
bid_pr - close_pr as bid_pr_mv, ask_pr - close_pr as ask_pr_mv
from quote_update A LEFT OUTER JOIN (select distinct symbol,last_pr as close_pr from temp_last_trade) B
on A.symbol = B.symbol
""")

In [None]:
quote_final.show(5)

In [None]:
trade_date = "2021-07-29"
quote_final.write.mode("overwrite").parquet("C:/Users/KEVAL/Desktop/AnalyticalETL/stock/quote-trade-analytical/date={}".fo