In [0]:
import json
import time
from os import getenv
import asyncio
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import expr
from alpaca_trade_api.stream import Stream
from delta.tables import DeltaTable
import nest_asyncio

nest_asyncio.apply() # This line is needed when working with Juypter Notebook


In [0]:
## CREAT A NEW SPARK SESSION 

spark = SparkSession.builder.appName('ILUORE').getOrCreate()
spark.getActiveSession()

In [0]:
# LOAD ENVIRONMENTAL VARIABLES CONTAINING ALpaca API KEYS
alpaca_api_key = getenv("APCA_API_KEY_ID")
alpaca_secret_key = getenv("APCA_API_SECRET_KEY")
alpaca_base_url = getenv("APCA_API_BASE_URL")

In [0]:
#from urllib.error import HTTPError
def run_connection(conn):
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
  
    try:
        conn.run()
    except KeyboardInterrupt:
        print("Interrupted execution by user")
        loop.run_until_complete(conn.stop_ws())
        exit(0)
    except Exception as e:
        print(f'Exception from websocket connection: {e}')
    finally:
        print("Trying to re-establish connection")
        time.sleep(3)
        run_connection(conn)

async def on_crypto_bar(bar):
    # Ignore Bars data that is not exchanged on Coinbase
    if bar.exchange != 'CBSE':
        return
  # Extract the values from the Alpaca responce class and add them to a new dict
    values = bar.__dict__.pop('_raw')
    bars = dict()
    bars['symbol'] = values.pop('symbol')
    bars['timestamp'] = values.pop('timestamp')
    bars['exchange'] = values.pop('exchange')
    bars['open'] = values.pop('open')
    bars['high'] = values.pop('high')
    bars['low'] = values.pop('low')
    bars['close'] = values.pop('close')
    bars['volume'] = values.pop('volume')
    bars['trade_count'] = values.pop('trade_count')
    bars['vwap'] = values.pop('vwap')
  
    df = spark.read.json(sc.parallelize([bars]))
    df.coalesce(1).write.mode('append').json('alpaca/crypto/btc/bars')
    #print(dbutils.fs.ls('./alpaca/crypto/btc/bars'))

if __name__ == '__main__':
    conn = Stream(alpaca_api_key, 
                  alpaca_secret_key,
                  data_feed='iex')

    conn.subscribe_crypto_bars(on_crypto_bar, "BTCUSD")

    run_connection(conn)

data websocket error, restarting connection: code = 1006 (connection closed abnormally [internal]), no reason


In [0]:
# Import Libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
## LET US START A STRUCTURED STREAMING QUERY FOR BRONZE DATA

#Start a Structured Streaming Query with Bronze Data
# Path on S3 that has all our Raw files produced by the websocket listener

inputPath = "/alpaca/crypto/btc/bars/"
jsonSchema = StructType([
  StructField("close", StringType(), True),
  StructField("exchange", StringType(), True),
  StructField("high", StringType(), True),
  StructField("low", StringType(), True),
  StructField("open", StringType(), True),
  StructField("symbol", StringType(), True),
  StructField("timestamp", LongType(), True),
  StructField("trade_count", StringType(), True),
  StructField("volume", StringType(), True),
  StructField("vwap", StringType(), True)])

# Lets create a Structured Streaming DataFrame using readStream. Note, the the crypto timestamp is in epoch time (unix)
# So it is necessary to convert this to a usable format before creating our Silve Table
streamingInputDF = (
  spark
    .readStream
    .schema(jsonSchema)
    .json(inputPath)
    .withColumn('timestamp', f.from_utc_timestamp(f.from_unixtime(f.col('timestamp')/1000000000, "yyyy-MM-dd hh:mm:ss"), tz='America/New_York'))
)

display(streamingInputDF, processingTime = "5 seconds")



close,exchange,high,low,open,symbol,timestamp,trade_count,volume,vwap
54764.54,CBSE,54874.18,54714.97,54874.18,BTCUSD,2021-11-25T21:41:00.000+0000,1129,35.61272538,54780.8957598994
54761.87,CBSE,54874.18,54714.97,54874.18,BTCUSD,2021-11-25T21:41:00.000+0000,1135,35.67833973,54780.8536867723
54430.51,CBSE,54517.64,54379.87,54517.64,BTCUSD,2021-11-27T03:42:00.000+0000,1108,39.26169427,54452.6410397947
54383.81,CBSE,54517.64,54379.87,54517.64,BTCUSD,2021-11-27T03:42:00.000+0000,1135,39.66372052,54452.3021481624
54505.46,CBSE,54586.29,54484.83,54584.88,BTCUSD,2021-11-25T21:14:00.000+0000,699,21.79585028,54527.0775865232
54505.45,CBSE,54586.29,54484.83,54584.88,BTCUSD,2021-11-25T21:14:00.000+0000,707,22.29999949,54526.5643180105
54495.05,CBSE,54528.73,54490.83,54505.45,BTCUSD,2021-11-25T21:15:00.000+0000,572,12.38149577,54503.3155630053
54583.44,CBSE,54599.38,54459.09,54459.09,BTCUSD,2021-11-25T21:18:00.000+0000,569,12.52682563,54550.0329220079
54583.45,CBSE,54599.38,54459.09,54459.09,BTCUSD,2021-11-25T21:18:00.000+0000,571,12.53489705,54550.0544397971
54706.43,CBSE,54707.43,54512.16,54583.44,BTCUSD,2021-11-25T21:19:00.000+0000,843,22.07215023,54591.3876502547


In [0]:
###  USING STRUCTURE STREAMING + DELTA As a DATA SINK (SILVER)

# DBTITLE 1,Use Structure Streaming + Delta as a Data Sink (Silver) 
streamingInputDF.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/alpaca/crypto/btc/silver_bars/_checkpoints/elt-from-json") \
  .start("/alpaca/crypto/btc/silver_bars") 


Out[3]: <pyspark.sql.streaming.StreamingQuery at 0x7f229a5c5910>

In [0]:
# VISUALIZING SOME DATA FROM THE DELTA SILVER (SILVER)


# DBTITLE 1,Visualize Some Data From the Delta Sink (Silver)
silver_df = spark.readStream.format("delta").load("/alpaca/crypto/btc/silver_bars")
display(silver_df, processingTime = "5 seconds")

close,exchange,high,low,open,symbol,timestamp,trade_count,volume,vwap
55088.64,CBSE,55100.0,55047.18,55047.96,BTCUSD,2021-11-27T03:24:00.000+0000,258,4.22907979,55082.9000301666
55069.55,CBSE,55096.54,55054.6,55088.65,BTCUSD,2021-11-27T03:25:00.000+0000,190,1.54549172,55071.1818941577
55049.37,CBSE,55067.47,55038.64,55062.46,BTCUSD,2021-11-27T03:26:00.000+0000,191,1.60593103,55054.157574212
54980.19,CBSE,55019.97,54980.19,55013.8,BTCUSD,2021-11-27T03:31:00.000+0000,214,4.57934479,55001.6359078348
54989.92,CBSE,55008.03,54971.9,54981.92,BTCUSD,2021-11-27T03:32:00.000+0000,278,5.64331173,54981.8543362617
54979.45,CBSE,55008.03,54971.9,54981.92,BTCUSD,2021-11-27T03:32:00.000+0000,280,5.66880818,54981.8738565524
54958.59,CBSE,54988.12,54958.59,54986.0,BTCUSD,2021-11-27T03:33:00.000+0000,285,3.07305256,54970.9314151216
54886.66,CBSE,54958.6,54870.29,54958.6,BTCUSD,2021-11-27T03:34:00.000+0000,448,11.30170254,54896.3302359559
54886.66,CBSE,54958.6,54870.29,54958.6,BTCUSD,2021-11-27T03:34:00.000+0000,449,11.30260912,54896.3294603082
54474.8,CBSE,54474.8,54389.85,54436.36,BTCUSD,2021-11-27T03:44:00.000+0000,439,10.03455637,54451.5926192753


In [0]:
##  LETS Create a Streaming Aggregate DataFrame that Summarizing Data on an Hourly Basis and use Delta as a Sink (Gold)
# Remove any duplicate entries that may impact our aggregates
transf_df = silver_df.distinct()

# Truncate timestamp for aggregations
transf_df = transf_df \
  .withColumn('year', f.date_trunc("Year", f.col('timestamp'))) \
  .withColumn('month', f.date_trunc("Month", f.col('timestamp'))) \
  .withColumn('day', f.date_trunc("Day", f.col('timestamp'))) \
  .withColumn('hour', f.date_trunc("Hour", f.col('timestamp')))

# Use groupBy to aggregate our live data to hourly summaries data that we can store in Gold
gold = transf_df \
  .groupBy('symbol', 'exchange', 'hour') \
  .agg(
    f.max(f.col('high')).alias('high'),
    f.min(f.col('low')).alias('low'),
    f.first(f.col('open')).alias('open'),
    f.last(f.col('close')).alias('close'),
    f.sum(f.col('trade_count')).alias('trade_count'),
    f.sum(f.col('volume')).alias('volume'))

# Because we are ingesting and processing data for the current hour we need to set our gold table to "complete" mode. 
# This will ensure each time new rows are processed our aggregate table reflects the correct values.
# Since Spark 2.1.1, a new output mode called Update can be used to sink only the changed rows since the last trigger
gold.writeStream \
  .format("delta") \
  .outputMode("complete") \
  .option("checkpointLocation", "/alpaca/crypto/btc/gold_bars/_checkpoints/hourly-aggregate") \
  .start("/alpaca/crypto/btc/gold_bars")
  
display(gold, processingTime = "60 seconds") 

symbol,exchange,hour,high,low,open,close,trade_count,volume
BTCUSD,CBSE,2021-11-28T01:00:00.000+0000,54629.12,53327.0,54028.87,53933.8,59873.0,1423.2577595700004
BTCUSD,CBSE,2021-11-26T21:00:00.000+0000,55265.83,54736.91,54802.48,55097.59,20874.0,572.5019083900004
BTCUSD,CBSE,2021-11-27T22:00:00.000+0000,54479.95,54068.76,54318.65,54322.57,37217.0,602.05273444
BTCUSD,CBSE,2021-11-25T21:00:00.000+0000,54923.98,54437.2,54854.83,54505.46,64855.0,1950.42445138
BTCUSD,CBSE,2021-11-27T01:00:00.000+0000,55105.24,54710.87,54900.38,54842.89,26730.0,488.4729603700001
BTCUSD,CBSE,2021-11-27T02:00:00.000+0000,55050.0,54887.48,54972.31,55000.36,20201.0,342.1901196299999
BTCUSD,CBSE,2021-11-25T22:00:00.000+0000,54725.49,54260.0,54440.01,54523.54,84159.0,4719.038311599999
BTCUSD,CBSE,2021-11-28T00:00:00.000+0000,54430.0,53830.05,54212.65,53892.01,44961.0,827.1747722200001
BTCUSD,CBSE,2021-11-28T02:00:00.000+0000,54953.68,54082.0,54092.31,54102.98,44054.0,1007.21253997
BTCUSD,CBSE,2021-11-27T00:00:00.000+0000,55163.19,54877.17,54955.88,54955.89,23957.0,533.63882402
