In [0]:
spark.conf.set("spark.sql.ansi.enabled", "false")

LANDING_PATH = "abfss://landing-dev@stcryptomedallion.dfs.core.windows.net/"
BRONZE_PATH = "abfss://bronze-dev@stcryptomedallion.dfs.core.windows.net/"
BASE_CHECKPOINT = f"{BRONZE_PATH}_checkpoints/"
BASE_TABLE = "crypto.bronze"

In [0]:
from pyspark.sql.functions import explode, col, lit, array, struct, posexplode, input_file_name, regexp_extract


# Autoloader for Binance
def autoload_binance(name: str, path: str):
    try:
        df = (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", f"{BRONZE_PATH}{name}/_schema")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.rescuedDataColumn", "_rescued_data")
            .option("recursiveFileLookup", "true")
            .option("multiLine", "true")
            .load(path)
        )
        
        # Extract date/hour/minute
        df_with_metadata = df.withColumn("_input_file", input_file_name()) \
            .withColumn("file_date", regexp_extract(col("_input_file"), r"date=(\d{4}-\d{2}-\d{2})", 1)) \
            .withColumn("file_hour", regexp_extract(col("_input_file"), r"hour=(\d{2})", 1)) \
            .withColumn("file_minute", regexp_extract(col("_input_file"), r"\d{8}_\d{2}(\d{2})\d{2}\.json", 1)) \
            .withColumn("file_second", regexp_extract(col("_input_file"), r"\d{8}_\d{4}(\d{2})\.json", 1))
        
        df_exploded = df_with_metadata.select(
            col("timestamp").alias("ingestion_timestamp"),
            col("file_date"),
            col("file_hour"),
            col("file_minute"),
            col("file_second"),
            col("intervals"),
            explode(col("symbols")).alias("symbol_data")
        )
        
        df_symbols = df_exploded.select(
            lit("binance").alias("exchange"),
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "intervals",
            col("symbol_data.symbol").alias("symbol"),
            col("symbol_data.timestamp").alias("symbol_timestamp"),
            col("symbol_data.intervals_included"),
            col("symbol_data.klines").alias("klines")
        )
        
        df_intervals = df_symbols.select(
            "exchange",
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "symbol",
            "symbol_timestamp",
            explode(array(
                struct(lit("15m").alias("interval"), col("klines.15m").alias("candles")),
                struct(lit("1h").alias("interval"), col("klines.1h").alias("candles")),
                struct(lit("4h").alias("interval"), col("klines.4h").alias("candles")),
                struct(lit("1d").alias("interval"), col("klines.1d").alias("candles"))
            )).alias("interval_data")
        ).select(
            "exchange",
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "symbol",
            "symbol_timestamp",
            col("interval_data.interval").alias("interval"),
            col("interval_data.candles").alias("candles")
        )
        
        df_candles = df_intervals.select(
            "exchange",
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "symbol",
            "symbol_timestamp",
            "interval",
            posexplode("candles").alias("candle_index", "candle_array")
        )
        
        df_final = df_candles.select(
            "exchange",
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "symbol",
            "symbol_timestamp",
            "interval",
            "candle_index",
            col("candle_array")[0].cast("long").alias("open_time"),
            col("candle_array")[1].cast("double").alias("open"),
            col("candle_array")[2].cast("double").alias("high"),
            col("candle_array")[3].cast("double").alias("low"),
            col("candle_array")[4].cast("double").alias("close"),
            col("candle_array")[5].cast("double").alias("volume"),
            col("candle_array")[6].cast("long").alias("close_time"),
            col("candle_array")[7].cast("double").alias("quote_volume"),
            col("candle_array")[8].cast("int").alias("trades"),
            col("candle_array")[9].cast("double").alias("taker_buy_base"),
            col("candle_array")[10].cast("double").alias("taker_buy_quote")
        )
        
        stream = (
            df_final.writeStream
            .format("delta")
            .option("checkpointLocation", f"{BASE_CHECKPOINT}{name}")
            .option("mergeSchema", "true")
            .outputMode("append")
            .trigger(availableNow=True)
            .toTable(f"{BASE_TABLE}.{name}_raw")
        )
        
        print(f"✓ Started stream: {name}")
        return stream
        
    except Exception as e:
        print(f"✗ Failed to start {name}: {e}")
        return None


# Autoloader for Bybit
def autoload_bybit(name: str, path: str):
    try:
        df = (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.schemaLocation", f"{BRONZE_PATH}{name}/_schema")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.rescuedDataColumn", "_rescued_data")
            .option("recursiveFileLookup", "true")
            .option("multiLine", "true")
            .load(path)
        )
        
        # Extract date/hour/minute
        df_with_metadata = df.withColumn("_input_file", input_file_name()) \
            .withColumn("file_date", regexp_extract(col("_input_file"), r"date=(\d{4}-\d{2}-\d{2})", 1)) \
            .withColumn("file_hour", regexp_extract(col("_input_file"), r"hour=(\d{2})", 1)) \
            .withColumn("file_minute", regexp_extract(col("_input_file"), r"\d{8}_\d{2}(\d{2})\d{2}\.json", 1)) \
            .withColumn("file_second", regexp_extract(col("_input_file"), r"\d{8}_\d{4}(\d{2})\.json", 1))
        
        df_exploded = df_with_metadata.select(
            col("timestamp").alias("ingestion_timestamp"),
            col("file_date"),
            col("file_hour"),
            col("file_minute"),
            col("file_second"),
            col("intervals"),
            explode(col("symbols")).alias("symbol_data")
        )
        
        df_symbols = df_exploded.select(
            lit("bybit").alias("exchange"),
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "intervals",
            col("symbol_data.symbol").alias("symbol"),
            col("symbol_data.timestamp").alias("symbol_timestamp"),
            col("symbol_data.intervals_included"),
            col("symbol_data.klines.15m.result.list").alias("klines_15m"),
            col("symbol_data.klines.1h.result.list").alias("klines_1h"),
            col("symbol_data.klines.4h.result.list").alias("klines_4h"),
            col("symbol_data.klines.1d.result.list").alias("klines_1d")
        )
        
        df_intervals = df_symbols.select(
            "exchange",
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "symbol",
            "symbol_timestamp",
            explode(array(
                struct(lit("15m").alias("interval"), col("klines_15m").alias("candles")),
                struct(lit("1h").alias("interval"), col("klines_1h").alias("candles")),
                struct(lit("4h").alias("interval"), col("klines_4h").alias("candles")),
                struct(lit("1d").alias("interval"), col("klines_1d").alias("candles"))
            )).alias("interval_data")
        ).select(
            "exchange",
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "symbol",
            "symbol_timestamp",
            col("interval_data.interval").alias("interval"),
            col("interval_data.candles").alias("candles")
        )
        
        df_candles = df_intervals.select(
            "exchange",
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "symbol",
            "symbol_timestamp",
            "interval",
            posexplode("candles").alias("candle_index", "candle_array")
        )
        
        df_final = df_candles.select(
            "exchange",
            "ingestion_timestamp",
            "file_date",
            "file_hour",
            "file_minute",
            "file_second",
            "symbol",
            "symbol_timestamp",
            "interval",
            "candle_index",
            col("candle_array")[0].cast("long").alias("open_time"),
            col("candle_array")[1].cast("double").alias("open"),
            col("candle_array")[2].cast("double").alias("high"),
            col("candle_array")[3].cast("double").alias("low"),
            col("candle_array")[4].cast("double").alias("close"),
            col("candle_array")[5].cast("double").alias("volume"),
            col("candle_array")[6].cast("long").alias("close_time")
        )
        
        stream = (
            df_final.writeStream
            .format("delta")
            .option("checkpointLocation", f"{BASE_CHECKPOINT}{name}")
            .option("mergeSchema", "true")
            .outputMode("append")
            .trigger(availableNow=True)
            .toTable(f"{BASE_TABLE}.{name}_raw")
        )
        
        print(f"✓ Started stream: {name}")
        return stream
        
    except Exception as e:
        print(f"✗ Failed to start {name}: {e}")
        return None


def autoload_safe(name: str, path: str):
    if name == "binance":
        return autoload_binance(name, path)
    elif name == "bybit":
        return autoload_bybit(name, path)
    else:
        print(f"Unknown exchange: {name}")
        return None

exchanges = [
    {"name": "binance", "path": f"{LANDING_PATH}binance/"},
    {"name": "bybit", "path": f"{LANDING_PATH}bybit/"}
]

streams = [autoload_safe(ex["name"], ex["path"]) for ex in exchanges]
active = [s for s in streams if s is not None]

print(f"Successfully started {len(active)} stream(s)")

✓ Started stream: binance
✓ Started stream: bybit
Successfully started 2 stream(s)
