In [0]:
# install this maven lib to run
# com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22 

from pyspark.sql.functions import *
from pyspark.sql.types import *

symbols = ["BTCUSDT", "ETHBTC", 'BNBBTC']

In [0]:
%sql
DROP  CATALOG IF EXISTS streaming CASCADE;
DROP SCHEMA IF  EXISTS streaming.bronze CASCADE ;
DROP SCHEMA IF  EXISTS streaming.silver CASCADE ;
DROP SCHEMA IF EXISTS streaming.gold CASCADE ;

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS streaming MANAGED LOCATION '';
CREATE SCHEMA IF NOT EXISTS streaming.bronze;
CREATE SCHEMA IF NOT EXISTS streaming.silver;
CREATE SCHEMA IF NOT EXISTS streaming.gold;


In [0]:

# Config
# Replace with your Event Hub namespace, name, and key
connectionString = ""
eventHubName = ""

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()


path_bronze = {
    'catalog' :'streaming',
    'schema' :'bronze',
    'table' : 'data'
}

df.writeStream\
    .option("checkpointLocation", f"/mnt/{path_bronze['catalog']}/{path_bronze['schema']}/{path_bronze['table']}")\
    .outputMode("append")\
    .format("delta")\
    .toTable(f"{path_bronze['catalog']}.{path_bronze['schema']}.{path_bronze['table']}")

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM2Njk4MicsICdzeW1ib2wnOiAnRVRIQlRDJywgJ3RyYWRlX2lkJzogJzQzNTMwNTE1MCcsICdwcmljZSc6ICcwLjA1NzcyMDAwJywgJ3F1YW4= (truncated),5,72176,183,2024-03-06T20:02:51.852Z,,ETHBTC,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> ETHBTC)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM2MTYyNScsICdzeW1ib2wnOiAnQlRDVVNEVCcsICd0cmFkZV9pZCc6ICczNDYzMTcwMTAyJywgJ3ByaWNlJzogJzY2ODA5LjE0MDAwMDAwJyw= (truncated),1,76496,190,2024-03-06T20:02:54.024Z,,BTCUSDT,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> BTCUSDT)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM2Njk4MicsICdzeW1ib2wnOiAnRVRIQlRDJywgJ3RyYWRlX2lkJzogJzQzNTMwNTE1MScsICdwcmljZSc6ICcwLjA1NzcyMDAwJywgJ3F1YW4= (truncated),5,72576,184,2024-03-06T20:02:56.196Z,,ETHBTC,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> ETHBTC)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM2MTY5NicsICdzeW1ib2wnOiAnQlRDVVNEVCcsICd0cmFkZV9pZCc6ICczNDYzMTcwMTAzJywgJ3ByaWNlJzogJzY2ODA5LjE0MDAwMDAwJyw= (truncated),1,76904,191,2024-03-06T20:02:58.384Z,,BTCUSDT,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> BTCUSDT)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM3NTQzNycsICdzeW1ib2wnOiAnQk5CQlRDJywgJ3RyYWRlX2lkJzogJzIzNTE3NDU1NycsICdwcmljZSc6ICcwLjAwNjM2OTAwJywgJ3F1YW4= (truncated),4,71376,181,2024-03-06T20:03:00.571Z,,BNBBTC,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> BNBBTC)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM2Njk4MicsICdzeW1ib2wnOiAnRVRIQlRDJywgJ3RyYWRlX2lkJzogJzQzNTMwNTE1MicsICdwcmljZSc6ICcwLjA1NzcyMDAwJywgJ3F1YW4= (truncated),5,72976,185,2024-03-06T20:03:02.728Z,,ETHBTC,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> ETHBTC)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM2MTcxMScsICdzeW1ib2wnOiAnQlRDVVNEVCcsICd0cmFkZV9pZCc6ICczNDYzMTcwMTA0JywgJ3ByaWNlJzogJzY2ODA5LjEzMDAwMDAwJyw= (truncated),1,77312,192,2024-03-06T20:03:04.9Z,,BTCUSDT,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> BTCUSDT)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM3NjQzNycsICdzeW1ib2wnOiAnQk5CQlRDJywgJ3RyYWRlX2lkJzogJzIzNTE3NDU1OCcsICdwcmljZSc6ICcwLjAwNjM2OTAwJywgJ3F1YW4= (truncated),4,71776,182,2024-03-06T20:03:07.088Z,,BNBBTC,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> BNBBTC)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM2ODc0NicsICdzeW1ib2wnOiAnRVRIQlRDJywgJ3RyYWRlX2lkJzogJzQzNTMwNTE1MycsICdwcmljZSc6ICcwLjA1NzczMDAwJywgJ3F1YW4= (truncated),5,73376,186,2024-03-06T20:03:09.244Z,,ETHBTC,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> ETHBTC)"
eydldmVudF90eXBlJzogJ3RyYWRlJywgJ2V2ZW50X3RpbWUnOiAnMTcwOTc1NTM2MTc4NicsICdzeW1ib2wnOiAnQlRDVVNEVCcsICd0cmFkZV9pZCc6ICczNDYzMTcwMTA1JywgJ3ByaWNlJzogJzY2ODA5LjE0MDAwMDAwJyw= (truncated),1,77720,193,2024-03-06T20:03:11.416Z,,BTCUSDT,Map(),"Map(x-opt-sequence-number-epoch -> -1, x-opt-partition-key -> BTCUSDT)"


In [0]:

# Define the schema
schema = StructType([
    StructField("event_type", StringType(), True),
    StructField("event_time", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("trade_id", StringType(), True),
    StructField("price", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("buyer_order_id", StringType(), True),
    StructField("seller_order_id", StringType(), True),
    StructField("trade_time", StringType(), True),
    StructField("is_market_maker", StringType(), True),
    StructField("ignore_field", StringType(), True)
])

silver_df = spark.readStream\
    .format("delta")\
    .table(f"{path_bronze['catalog']}.{path_bronze['schema']}.{path_bronze['table']}")

# Extract body and partition key from the event data
silver_df = silver_df.withColumn("body", col('body').cast('string')) \
    .withColumn("body", from_json(col("body"), schema)) \
    .select(
      col("body.event_type"),
      from_unixtime(col('body.event_time') / 1000).cast('timestamp').alias("event_time"),
      col("body.symbol"),
      col("body.trade_id").cast('long').alias("trade_id"),
      col("body.price").cast('float').alias("price"),
      col("body.quantity").cast('float').alias("quantity"),
      col("body.buyer_order_id").cast('long').alias("buyer_order_id"),
      col("body.seller_order_id").cast('long').alias("seller_order_id"),
      from_unixtime(col('body.trade_time') / 1000).cast('timestamp').alias("trade_time"),
      col("body.is_market_maker").cast('boolean').alias("is_market_maker")
            )

silver_df.display()

path_silver = {
    'catalog' :'streaming',
    'schema' :'silver',
    'table' : 'silver_df',

}

silver_df.writeStream\
    .option("checkpointLocation", f"/mnt/{path_silver['catalog']}/{path_silver['schema']}/{path_silver['table']}")\
    .format("delta")\
    .outputMode("append")\
    .toTable(f"{path_silver['catalog']}.{path_silver['schema']}.{path_silver['table']}")

event_type,event_time,symbol,trade_id,price,quantity,buyer_order_id,seller_order_id,trade_time,is_market_maker
trade,2024-03-06T19:44:02Z,BTCUSDT,3463132814,66811.88,8e-05,25416592251,25416589428,2024-03-06T19:44:02Z,False
trade,2024-03-06T19:44:02Z,BNBBTC,235173765,0.006355,3.49,1841854277,1841854388,2024-03-06T19:44:02Z,True
trade,2024-03-06T19:44:02Z,ETHBTC,435303085,0.05774,0.4,3882904434,3882902176,2024-03-06T19:44:02Z,False
trade,2024-03-06T19:44:02Z,BTCUSDT,3463132815,66811.9,8e-05,25416592260,25416592253,2024-03-06T19:44:02Z,False
trade,2024-03-06T19:44:02Z,BNBBTC,235173766,0.006355,2.126,1841854291,1841854388,2024-03-06T19:44:02Z,True
trade,2024-03-06T19:44:02Z,ETHBTC,435303086,0.05774,0.1,3882904435,3882902176,2024-03-06T19:44:02Z,False
trade,2024-03-06T19:44:02Z,BTCUSDT,3463132816,66813.12,8e-05,25416592260,25416592227,2024-03-06T19:44:02Z,False
trade,2024-03-06T19:44:04Z,BNBBTC,235173767,0.006355,0.123,1841854291,1841854418,2024-03-06T19:44:04Z,True
trade,2024-03-06T19:44:02Z,BTCUSDT,3463132817,66813.12,0.01488,25416592331,25416592227,2024-03-06T19:44:02Z,False
trade,2024-03-06T19:44:02Z,ETHBTC,435303087,0.05774,0.3,3882904435,3882902198,2024-03-06T19:44:02Z,False


In [0]:
path_gold = {
    'catalog' :'streaming',
    'schema' :'gold',
    'table': 'symbols'
}

gold_df = spark.readStream\
    .format("delta")\
    .table(f"{path_silver['catalog']}.{path_silver['schema']}.{path_silver['table']}")


for symbol in symbols:
    gold_df.where(col('symbol') == symbol ).writeStream\
        .option("checkpointLocation", f"/mnt/{path_gold['catalog']}/{path_gold['schema']}/{path_gold['table']}/{symbol}")\
        .format("delta")\
        .outputMode("append")\
        .toTable(f"{path_gold['catalog']}.{path_gold['schema']}.{symbol}")


In [0]:
gold_df.groupBy('symbol', 'is_market_maker').count().orderBy(col('count').desc()).display()


symbol,is_market_maker,count
ETHBTC,False,286
BTCUSDT,True,274
BNBBTC,False,245
BNBBTC,True,235
BTCUSDT,False,230
ETHBTC,True,200


In [0]:
gold_df.groupBy('symbol').count().orderBy(col('count').desc()).display()

symbol,count
BTCUSDT,504
ETHBTC,486
BNBBTC,480


In [0]:
gold_df.groupBy('symbol', 'buyer_order_id' ).count().orderBy(col('count').desc()).display()

symbol,buyer_order_id,count
BTCUSDT,25418321358,24
BNBBTC,1841872207,22
BNBBTC,1841869050,18
BTCUSDT,25417017724,16
BTCUSDT,25417653833,15
BTCUSDT,25417148592,15
ETHBTC,3882960209,13
BTCUSDT,25417729705,12
BTCUSDT,25417729532,12
BTCUSDT,25417404053,11


In [0]:
gold_df.groupBy('symbol','seller_order_id' ).count().orderBy(col('count').desc()).display()


symbol,seller_order_id,count
BTCUSDT,25417017633,15
BTCUSDT,25416952416,13
BTCUSDT,25417404418,12
BTCUSDT,25417653802,12
BTCUSDT,25417560838,11
BNBBTC,1841933544,11
BNBBTC,1841880358,11
BTCUSDT,25418321201,10
BTCUSDT,25417561573,10
ETHBTC,3882975560,9
