In [0]:
import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, DecimalType, StringType, DateType, TimestampType
from pyspark.sql import functions as F
from decimal import Decimal
from datetime import datetime
spark = SparkSession.builder.appName('AnalyticalETL').getOrCreate()

In [0]:
df = spark.read.parquet('/mnt/loaded/trade').write.mode('overwrite').saveAsTable('trades')

In [0]:
trade_date = '2020-08-06'
# in real runtime this would be datetime.today().strftime('%Y-%m-%d')

In [0]:
df = spark.sql(f"select trade_dt, symbol, exchange, event_tm, event_seq_nb, trade_pr from trades where trade_dt = '{trade_date}'")

In [0]:
df.createOrReplaceTempView("tmp_trade_moving_avg")

In [0]:
mov_avg_df = spark.sql("""
select main.trade_dt, main.symbol, main.exchange, main.event_tm, main.event_seq_nb, main.trade_pr,
(select avg(trade_pr) from tmp_trade_moving_avg as sub where (unix_timestamp(main.event_tm) - unix_timestamp(sub.event_tm))/60 <= 30 and main.symbol=sub.symbol) as mov_avg_pr
from tmp_trade_moving_avg as main
""")

In [0]:
mov_avg_df.write.mode('overwrite').option('mergeSchema', 'true').saveAsTable("temp_trade_moving_avg")

In [0]:
import datetime
date = datetime.datetime.strptime(trade_date, '%Y-%m-%d')
prev_date_str = (date - datetime.timedelta(days=1)).strftime('%Y-%m-%d')


In [0]:
df = spark.sql(f"select trade_dt, symbol, exchange, event_tm, event_seq_nb, trade_pr from trades where trade_dt = '{prev_date_str}'")

In [0]:
df.createOrReplaceTempView("tmp_last_trade")

In [0]:
last_pr_df = spark.sql("""select trade_dt, symbol, exchange, last_pr from (select
trade_dt, symbol, exchange, event_tm, event_seq_nb, trade_pr, 
(select avg(trade_pr) from tmp_last_trade as sub where (unix_timestamp(main.event_tm) - unix_timestamp(sub.event_tm))/60 <= 30 and main.symbol=sub.symbol HAVING MAX(event_tm) = event_tm) AS last_pr
FROM tmp_last_trade AS main) a
WHERE last_pr IS NOT NULL
""")

In [0]:
last_pr_df.write.mode('overwrite').option('mergeSchema', 'true').saveAsTable("temp_last_trade")

In [0]:
spark.read.parquet('/mnt/loaded/quote').filter(F.col('trade_dt') == trade_date).write.mode('overwrite').saveAsTable('quotes')

In [0]:
quote_union = spark.sql("""SELECT * FROM 
                                (SELECT trade_dt, "T" AS rec_type, symbol, event_tm, NULL AS event_seq_nb, exchange, NULL AS bid_pr, NULL AS bid_size, NULL AS ask_pr, NULL AS ask_size, trade_pr, mov_avg_pr
                                FROM temp_trade_moving_avg)
                                UNION
                                (SELECT trade_dt, "Q" AS rec_type, symbol, event_tm, event_seq_nb, exchange, bid_pr, bid_size, ask_pr, ask_size, NULL AS trade_pr, NULL AS mov_avg_pr
                                FROM quotes)
                        """)
quote_union.createOrReplaceTempView("quote_union")

In [0]:
quote_union_update = spark.sql("""
                               SELECT *,
                               first_value(trade_pr, true) OVER(PARTITION BY symbol ORDER BY event_tm DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS last_trade_pr,
                               first_value(mov_avg_pr, true) OVER(PARTITION BY symbol ORDER BY event_tm DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS last_mov_avg_pr
                               FROM quote_union
                               """)

In [0]:
quote_union_update.createOrReplaceTempView("quote_union_update")

In [0]:
quote_update = spark.sql("""
select trade_dt, symbol, event_tm, event_seq_nb, exchange,
bid_pr, bid_size, ask_pr, ask_size, quote_union_update.last_trade_pr, quote_union_update.last_mov_avg_pr
from quote_union_update
where rec_type = 'Q'
""")
quote_update.createOrReplaceTempView("quote_update")

In [0]:
quote_final = spark.sql('''
                        select trade_dt, symbol, event_tm, event_seq_nb, exchange,
                    bid_pr, bid_size, ask_pr, ask_size, last_trade_pr, last_mov_avg_pr,
                    bid_pr - close_pr as bid_pr_mv, ask_pr - close_pr as ask_pr_mv
                    from (
                        SELECT /*+ BROADCAST(t) */ q.*, t.last_pr AS close_pr
                        FROM
                        quote_update q LEFT OUTER JOIN temp_last_trade t ON q.symbol = t.symbol
                        )
                        ''')

In [0]:
quote_final.write.mode('overwrite').partitionBy('trade_dt').parquet('/mnt/quote-trade-analytical')