In [None]:
import os
#from pandas.tests.frame.methods.test_sort_values import ascending
from datetime import datetime, timezone, timedelta

from pandas.tests.frame.methods.test_sort_values import ascending

os.environ["SPARK_HOME"] = "/mnt/d/learn/DE/Semina_project/spark"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,ArrayType
from pyspark.sql.functions import input_file_name, explode, col,lit, date_format, to_timestamp
import duckdb
import pyarrow as pa

In [None]:
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
def get_latest_file(bucket_name, prefix,days_lookback):
    s3 = boto3.client(
        "s3",
        endpoint_url=S3_ENDPOINT,
        aws_access_key_id=S3_ACCESS,
        aws_secret_access_key=S3_SECRET,
        region_name="us-east-1"
    )
    all_objects = []
    # Paginator is used to list if more 1000 files
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
    for page in page_iterator:
        for obj in page.get('Contents', []):
            key=obj['Key']
            last_modified = obj['LastModified']
            if last_modified > datetime.now(timezone.utc)- timedelta(days=days_lookback):
                 all_objects.append(obj)
    all_objects.sort(key=lambda x: x['LastModified'], reverse=True)
    #latest_files = all_objects[:limit]
    #latest_files = all_objects
    paths = [f"s3a://{bucket_name}/{obj['Key']}" for obj in all_objects]
    return paths

latest_files = get_latest_file(
        bucket_name=S3_BUCKET,
        prefix="bronze/okx_trades",
        days_lookback=1 # thay ƒë·ªïi khi run l·∫°i to√†n b·ªô
)
latest_files

In [None]:
spark = SparkSession.builder \
    .appName("OKX_Bronze_To_Silver_Book_new") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
    .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
    .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

In [None]:
trade_schema = StructType([
    StructField("instId", StringType(), True),
    StructField("tradeId", StringType(), True),
    StructField("px", StringType(), True),
    StructField("sz", StringType(), True),
    StructField("side", StringType(), True),
    StructField("ts", StringType(), True),
    StructField("count", StringType(), True),
    StructField("source", StringType(), True),
    StructField("seqId", StringType(), True)
])
schema = StructType([
    StructField("received_at", StringType(), True),
    StructField("payload", StructType([
        StructField("data", ArrayType(trade_schema), True)
    ]), True)
])

df = spark.read.schema(schema).json(latest_files)
df.show(truncate=False)

In [None]:
df_exploded = df.select(
    col("received_at"),
    col("payload.arg.instId").alias("symbol"),
    col("payload.arg.channel").alias("channel"),  # V√≠ d·ª•: candle1s, candle1m
    explode(col("payload.data")).alias("candle")  # M·ªói d√≤ng l√† 1 m·∫£ng ["ts", "o", ...]
)
df_silver = df_exploded.select(
    col("symbol"),
    col("channel"),
    to_timestamp(col("candle")[0].cast("long") / 1000).alias("candle_time"),
    col("candle")[1].cast("double").alias("open"),
    col("candle")[2].cast("double").alias("high"),
    col("candle")[3].cast("double").alias("low"),
    col("candle")[4].cast("double").alias("close"),
    col("candle")[5].cast("double").alias("volume"),
    col("candle")[8].cast("int").alias("is_confirmed"),
    col("received_at").cast("timestamp").alias("ingestion_time")
)
df_silver.show()

In [None]:
#df.show(truncate=False)

#df_exploded.show(truncate=False)
# B3.2: √âp ki·ªÉu v√† Ch·ªçn c·ªôt

#df_silver.show(truncate=False)
df_cleaned = df_silver \
    .dropna(subset=["open", "candle_time"]) \
    .withColumn("date_part", date_format(col("candle_time"), "yyyy-MM-dd"))
df_cleaned.show()


In [None]:
df_cleaned.printSchema()

In [None]:
output = f"s3a://trading-okx/silver/okx-funding/"
df_silver.write.mode('append').format("parquet").save(output)

# config dw duckdb

In [None]:
# connect to duckdb
duck_path='/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
con=duckdb.connect(duck_path)

In [None]:
con.close()

In [None]:
# scrip create table
with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_database/source_db.sql', 'r') as f:
    sql_script = f.read()
con.execute(sql_script)

In [None]:
# test table
con.sql("show TABLEs ; ")
#con.sql("select * from fact_ohlc;")


In [None]:
# read file sql to connect to minio and create dim fact table
with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
    sql_script = f.read()
con.execute(sql_script)

In [None]:
arrow_table = pa.Table.from_pandas(df_cleaned.toPandas())
con.register("arrow_table", arrow_table)
con.sql('select * from arrow_table')
#con.close()

In [None]:
con.execute('''
                INSERT INTO fact_ohlc(
                                            symbol,
                                            channel,
                                            candle_time,
                                            open,
                                            high,
                                            low,
                                            close,
                                            volume,
                                            is_confirmed,
                                            ingestion_time,
                                            date_part)
                SELECT
                    symbol ,
                    channel ,
                    candle_time ,
                    open ,
                    high ,
                    low ,
                    close ,
                    volume ,
                    is_confirmed ,
                    ingestion_time ,
                    date_part
                FROM arrow_table
                ''')

In [None]:
# read file sql to connect to minio and create dim fact table
with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
    sql_script = f.read()
con.execute(sql_script)

output_path = f"s3://trading-okx/silver/okx-funding/*.parquet"
try:
    # Ch·ªâ load d·ªØ li·ªáu m·ªõi (V√≠ d·ª• l·ªçc theo ng√†y n·∫øu c·∫ßn)
    # ·ªû ƒë√¢y load h·∫øt (Full Load)
    con.execute(f"""
        INSERT INTO fact_funding_rate
        SELECT * FROM read_parquet('{output_path}')
       --WHERE ingestion_time NOT IN (SELECT ingestion_time FROM fact_orders_books) -- Tr√°nh tr√πng l·∫∑p (Dedup ƒë∆°n gi·∫£n)
    """)

    # L·∫•y s·ªë d√≤ng ƒë√£ insert
    count = con.execute("SELECT count(*) FROM fact_funding_rate").fetchone()[0]
    print(f"‚úÖ Data loaded. Total rows in DuckDB: {count}")

except Exception as e:
    print(f"‚ùå Error loading data: {e}")


In [None]:
con.close()
spark.stop()

In [None]:
import boto3
import os, duckdb
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, window, max, min, sum,
    struct, date_format, lit)
from pyspark.sql import functions as F
from datetime import datetime, timezone, timedelta
S3_ENDPOINT = "http://localhost:9000"
ACCESS_KEY='minio'
SECRET_KEY= 'minio123'
S3_BUCKET='trading-okx'

spark = SparkSession.builder \
    .appName("ETL_Trades_To_OHLC_Aggregator") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
    .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

In [None]:
input=f's3a://{S3_BUCKET}/silver/trades/*/*.parquet'
df = spark.read.parquet(input)
df.show(5, False)

In [None]:
df.printSchema()

In [None]:
CHECKPOINT_ROOT = f"s3a://{S3_BUCKET}/checkpoints/ohlc_parquet_v1/"
WATERMARK_DELAY = "10 minutes"
interval={
    "1m": "1 minute",
    "5m": "5 minutes",
    "15m": "15 minutes",
    # "1h": "1 hour",
    # "4h": "4 hours",
    # "1d": "1 day"
}
today = datetime.now().strftime("%Y-%m-%d")
process_date_str=today
input_path = f"s3a://{S3_BUCKET}/silver/trades/date_part={process_date_str}/*.parquet"

df_trades = spark.read.parquet(input_path)

for interval_name, interval_window in interval.items():
    print(f"   ‚è≥ T√≠nh to√°n khung: {interval_name}...")
    df_ohlc = df_trades.groupBy(
        col("symbol"),
        window(col("trade_time"), interval_window).alias("window_time")
    ).agg(
        min(struct(col("trade_time"), col("price"))).getItem("price").alias("open"),
        max("price").alias("high"),
        min("price").alias("low"),
        max(struct(col("trade_time"), col("price"))).getItem("price").alias("close"),
        sum("quantity").alias("volume")
    )
    df_final = df_ohlc.select(
        col("symbol"),
        col("window_time.start").alias("candle_time"),
        col("open"), col("high"), col("low"), col("close"), col("volume"),
        lit(interval_name).alias("interval"),
        # T·∫°o l·∫°i c·ªôt date_part ƒë·ªÉ Spark bi·∫øt ghi v√†o ƒë√¢u
        date_format(col("window_time.start"), "yyyy-MM-dd").alias("date_part")
    )
    output_path = f"s3a://{S3_BUCKET}/silver/calculated_ohlc/"

    # QUAN TR·ªåNG: D√πng mode("overwrite") + partitionOverwriteMode=dynamic
    # N√≥ s·∫Ω ch·ªâ thay th·∫ø th∆∞ m·ª•c c·ªßa ng√†y {process_date_str} b√™n trong output_path
    # D·ªØ li·ªáu ng√†y c≈© v·∫´n an to√†n.

    df_final.write \
        .mode("overwrite") \
        .partitionBy("interval", "date_part") \
        .format("parquet") \
        .save(output_path)

    print(f"      ‚úÖ ƒê√£ l∆∞u (Overwrite) partition: {process_date_str}")

In [None]:
df_final.show(10,False)

## ohlc from trades

In [None]:
import duckdb
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, max, min, sum, struct, lit, date_format, current_timestamp

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"

In [None]:
import duckdb
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, max, min, sum, struct, lit, date_format, current_timestamp

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"

# ƒê∆∞·ªùng d·∫´n Checkpoint (B·∫ÆT BU·ªòC PH·∫¢I C√ì ƒë·ªÉ resume)
# Spark s·∫Ω l∆∞u tr·∫°ng th√°i v√†o ƒë√¢y. Tuy·ªát ƒë·ªëi kh√¥ng x√≥a th∆∞ m·ª•c n√†y n·∫øu mu·ªën ch·∫°y ti·∫øp.
CHECKPOINT_ROOT = f"s3a://{S3_BUCKET}/checkpoints/ohlc_parquet_v1/"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
# C·∫•u h√¨nh ƒë·ªô tr·ªÖ cho ph√©p (Watermark)
# D·ªØ li·ªáu ƒë·∫øn mu·ªôn qu√° 10 ph√∫t s·∫Ω b·ªã b·ªè qua, n·∫øn s·∫Ω ƒë√≥ng sau 10 ph√∫t.
WATERMARK_DELAY = "10 minutes"
def get_spark():
    return SparkSession.builder \
        .appName("OKX_Bronze_To_Silver_trade") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.sql.shuffle.partitions", "1") \
        .getOrCreate()
def get_duckdb_conn():
    con = duckdb.connect(DUCKDB_PATH)
    with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
             sql_script = f.read()
    con.execute(sql_script)
    return con
def load_to_duckdb(interval_name):
    """
    H√†m n√†y ch·∫°y SAU KHI Spark ƒë√£ ghi xong file Parquet.
    N√≥ ra l·ªánh cho DuckDB ƒë·ªçc file Parquet m·ªõi v√† insert v√†o b·∫£ng.
    """
    print(f"ü¶Ü [DuckDB] ƒêang n·∫°p d·ªØ li·ªáu OHLC ({interval_name}) v√†o Warehouse...")
    con = get_duckdb_conn()

    # Spark partition theo: interval=1h/date_part=2023-12-01
    parquet_source = f"s3://{S3_BUCKET}/silver/calculated_ohlc/interval={interval_name}/*/*.parquet"

    try:
        # 2.1 Load v√†o Staging (Incremental)
        # B·∫£ng staging: ohlc
        # L∆∞u √Ω: Interval l√† hardcode t·ª´ tham s·ªë h√†m v√¨ Spark partition theo folder n√†y
        con.execute(f"""
            INSERT INTO ohlc (symbol, candle_time, open, high, low, close, volume, interval)
            SELECT symbol, candle_time, open, high, low, close, volume, '{interval_name}'
            FROM read_parquet('{parquet_source}', hive_partitioning=1)
            WHERE candle_time > (SELECT COALESCE(MAX(candle_time), '1970-01-01'::TIMESTAMP) FROM ohlc WHERE interval = '{interval_name}')
        """)
        print("   ‚úÖ Staging Loaded.")

        # 2.2 Update Dim Time
        # T·∫°o ng√†y m·ªõi n·∫øu ch∆∞a c√≥
        con.execute("""
            INSERT INTO dim_time
            SELECT DISTINCT
                CAST(strftime(candle_time, '%Y%m%d') AS INTEGER) as date_key,
                CAST(candle_time AS DATE),
                EXTRACT(YEAR FROM candle_time), EXTRACT(QUARTER FROM candle_time),
                EXTRACT(MONTH FROM candle_time), EXTRACT(DAY FROM candle_time),
                ISODOW(candle_time), CASE WHEN ISODOW(candle_time) IN (6, 7) THEN TRUE ELSE FALSE END
            FROM ohlc
            WHERE CAST(strftime(candle_time, '%Y%m%d') AS INTEGER) NOT IN (SELECT date_key FROM dim_time)
        """)

        # 2.3 Insert Fact Table (fact_ohlc_calculated)
        # JOIN v·ªõi dim_symbol v√† dim_time ƒë·ªÉ l·∫•y Key chu·∫©n
        con.execute(f"""
            INSERT INTO fact_ohlc_calculated (symbol_key, date_key, interval, candle_time, open, high, low, close, volume)
            SELECT
                d.symbol_key,
                CAST(strftime(s.candle_time, '%Y%m%d') AS INTEGER) as date_key,
                s.interval,
                s.candle_time,
                s.open, s.high, s.low, s.close, s.volume
            FROM ohlc s
            JOIN dim_symbol d ON s.symbol = d.symbol_code
            WHERE s.candle_time > (SELECT COALESCE(MAX(candle_time), '1970-01-01'::TIMESTAMP) FROM fact_ohlc_calculated WHERE interval = '{interval_name}')
            AND s.interval = '{interval_name}'
        """)
        print("   ‚úÖ Gold (Fact) Loaded.")

    except Exception as e:
        print(f"   ‚ö†Ô∏è DuckDB Load Error: {e}")
        # Kh√¥ng raise ƒë·ªÉ pipeline ch·∫°y ti·∫øp interval kh√°c
    finally:
        con.close()


In [None]:

def run_streaming_ohlc(spark, interval_name="1m", interval_window="1 minute"):
    print(f"üöÄ ƒêang x·ª≠ l√Ω khung th·ªùi gian: {interval_name}")

    # 1. INPUT: READ STREAM (Ch·ªâ ƒë·ªçc file m·ªõi)
    # Spark t·ª± theo d√µi file n√†o m·ªõi trong th∆∞ m·ª•c n√†y
    input_path = f"s3a://{S3_BUCKET}/silver/trades/"

    # L·∫•y schema t·ª´ 1 file m·∫´u (ƒë·ªÉ tr√°nh l·ªói schema evolution)
    try:
        schema = spark.read.parquet(input_path).schema
    except:
        print("‚ö†Ô∏è Ch∆∞a c√≥ data trades. Tho√°t.")
        return

    df_trades = spark.readStream \
        .schema(schema) \
        .format("parquet") \
        .option("maxFilesPerTrigger", 1000) \
        .load(input_path)

    # 2. TRANSFORM: AGGREGATE V·ªöI WATERMARK
    # B·∫Øt bu·ªôc ph·∫£i c√≥ withWatermark ƒë·ªÉ d√πng mode 'append'

    df_ohlc = df_trades \
        .withWatermark("trade_time", WATERMARK_DELAY) \
        .groupBy(
            col("symbol"),
            window(col("trade_time"), interval_window).alias("window_time")
        ).agg(
            min(struct(col("trade_time"), col("price"))).getItem("price").alias("open"),
            max("price").alias("high"),
            min("price").alias("low"),
            max(struct(col("trade_time"), col("price"))).getItem("price").alias("close"),
            sum("quantity").alias("volume")
        )

    # Chu·∫©n h√≥a ƒë·∫ßu ra
    df_final = df_ohlc.select(
        col("symbol"),
        col("window_time.start").alias("candle_time"),
        col("open"), col("high"), col("low"), col("close"), col("volume"),
        lit(interval_name).alias("interval"),
        date_format(col("window_time.start"), "yyyy-MM-dd").alias("date_part")
    )

    # 3. OUTPUT: WRITE STREAM (APPEND ONLY)
    # Output path ri√™ng cho t·ª´ng interval
    output_path = f"s3a://{S3_BUCKET}/silver/calculated_ohlc/"
    checkpoint_path = f"{CHECKPOINT_ROOT}/{interval_name}"
    # Checkpoint ri√™ng cho t·ª´ng interval (Quan tr·ªçng!)

    query = df_final.writeStream \
        .format("parquet") \
        .outputMode("append") \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(availableNow=True) \
        .partitionBy("interval", "date_part") \
        .start(output_path)

    # trigger(availableNow=True):
    # - ƒê·ªçc h·∫øt data m·ªõi -> T√≠nh to√°n -> Ghi xu·ªëng Parquet -> L∆∞u Checkpoint -> Stop.
    # - Kh√¥ng treo m√°y ch·ªù data nh∆∞ streaming th√¥ng th∆∞·ªùng.

    query.awaitTermination()

    print(f"‚úÖ Ho√†n t·∫•t x·ª≠ l√Ω {interval_name}. Data ƒë√£ ƒë∆∞·ª£c Append v√†o MinIO.")
    load_to_duckdb(interval_name)

if __name__ == "__main__":
    spark = get_spark()
    spark.sparkContext.setLogLevel("ERROR")

    # B·∫°n c√≥ th·ªÉ ch·∫°y nhi·ªÅu interval
    intervals = [
        ("1m", "1 minute"),
        ("5m", "5 minute")
    ]

    for name, window_duration in intervals:
        run_streaming_ohlc(spark, name, window_duration)

    spark.stop()

In [None]:
con = duckdb.connect('/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb')
'''with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
     sql_script = f.read()
con.execute(sql_script)'''
##print(con.execute('drop table fact_ohlc; '))
#print(con.sql('PRAGMA show_tables;'))
print(con.sql('select * from trades'))
print(con.sql('select * from fact_trades'))

con.close()

In [None]:
def calculate_complex_features(spark, date_process, interval_name="5m", interval_window="5 minutes"):
    print(f"üöÄ Processing Features for Date: {date_process} | Interval: {interval_name}")

    input_path = f"s3a://{S3_BUCKET}/silver/trades/date_part={date_process}/*.parquet"
    try:
        df = spark.read.parquet(input_path)
    except:
        print(f"‚ö†Ô∏è Kh√¥ng t√¨m th·∫•y data ng√†y {date_process}")
        return False
    # 2. Pre-calculation (T√≠nh c√°c c·ªôt ph·ª• tr·ª£ tr∆∞·ªõc khi gom nh√≥m)

    df = df.withColumn("turnover", F.col("price") * F.col("quantity"))
    # groupby symbol
    features_df = df.groupBy(
        F.col("symbol"),
        F.window(F.col("trade_time"), interval_window).alias("window") # window t·∫°o c·ª≠a s·ªï: [00:00,01:00), [01:00,02:00)
    ).agg(
        # --- A. BASIC OHLCV ---
        F.min(F.struct("trade_time", "price")).getItem("price").alias("open"),
        F.max("price").alias("high"),
        F.min("price").alias("low"),
        F.max(F.struct("trade_time", "price")).getItem("price").alias("close"),
        F.sum("quantity").alias("volume"),
        F.sum("turnover").alias("total_turnover"),
        F.count("*").alias("trade_count"),

        # --- B. BUY/SELL PRESSURE (√Åp l·ª±c mua b√°n) ---
        F.sum(F.when(F.col("side") == "buy", F.col("quantity")).otherwise(0)).alias("vol_buy"),
        F.sum(F.when(F.col("side") == "sell", F.col("quantity")).otherwise(0)).alias("vol_sell"),
        F.count(F.when(F.col("side") == "buy", 1)).alias("count_buy"),
        F.count(F.when(F.col("side") == "sell", 1)).alias("count_sell"),

        # --- C. STATISTICAL FEATURES (Ph√¢n ph·ªëi gi√°) ---
        # Polars: std, skew, kurtosis
        F.stddev("price").alias("price_std"),
        F.skewness("price").alias("price_skew"),
        F.kurtosis("price").alias("price_kurtosis"),

        # Polars: quantiles (0.25, 0.5, 0.75)
        # percentile_approx l√† h√†m x·∫•p x·ªâ r·∫•t nhanh tr√™n Big Data
        F.percentile_approx("price", 0.25).alias("price_q25"),
        F.percentile_approx("price", 0.50).alias("price_median"),
        F.percentile_approx("price", 0.75).alias("price_q75"),

        # --- D. SIZE DISTRIBUTION (Ph√¢n ph·ªëi kh·ªëi l∆∞·ª£ng l·ªánh) ---
        F.max("quantity").alias("size_max"),
        F.avg("quantity").alias("size_mean"),
        F.stddev("quantity").alias("size_std")
    )

    # 4. POST-CALCULATION (T√≠nh to√°n tr√™n k·∫øt qu·∫£ ƒë√£ gom)
    final_df = features_df.select(
        F.col("symbol"),
        F.col("window.start").alias("candle_time"),
        F.lit(interval_name).alias("interval"),
        # Basic
        "open", "high", "low", "close", "volume", "trade_count",
        # Advanced
        "vol_buy", "vol_sell",
        "count_buy", "count_sell",
        (F.col("vol_buy") - F.col("vol_sell")).alias("net_volume"), # Volume r√≤ng
        # VWAP (Volume Weighted Average Price)
        (F.col("total_turnover") / F.col("volume")).alias("vwap"),
        # Stats
        "price_std", "price_skew", "price_kurtosis",
        "price_q25", "price_median", "price_q75",
        "size_max", "size_mean", "size_std",
        # Partition Col
        F.lit(date_process).alias("date_part")
    )

    # 5. WRITE TO MINIO (Feature Store - Silver/Gold)
    # L∆∞u √Ω: Overwrite partition c·ªßa ng√†y h√¥m ƒë√≥ ƒë·ªÉ ƒë·∫£m b·∫£o t√≠nh nh·∫•t qu√°n (Idempotency)
    output_path = f"s3a://{S3_BUCKET}/gold/features_ml/"

    print(f"üíæ Saving ML Features to: {output_path}")
    final_df.write \
        .mode("overwrite") \
        .partitionBy("interval", "date_part") \
        .format("parquet") \
        .save(output_path)

    return True

def load_features_to_duckdb(interval_name, date_process):
    """Load Features v√†o DuckDB Fact Table"""
    print(f"ü¶Ü Loading Features ({interval_name}) to DuckDB...")
    con = duckdb.connect(DUCKDB_PATH)

    # Setup MinIO
    with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
             sql_script = f.read()
    con.execute(sql_script)

    # Path (Hive Partitioning)
    parquet_path = f"s3://{S3_BUCKET}/gold/features_ml/interval={interval_name}/date_part={date_process}/*.parquet"

    try:
        # Create Table (N·∫øu ch∆∞a c√≥) - Schema si√™u to kh·ªïng l·ªì
        # D√πng m·∫πo CREATE AS SELECT LIMIT 0 ƒë·ªÉ ƒë·ª° ph·∫£i g√µ tay schema
        con.execute(f"""
            CREATE TABLE IF NOT EXISTS fact_market_features AS
            SELECT * FROM read_parquet('{parquet_path}') LIMIT 0
        """)

        # X√≥a data c≈© c·ªßa ng√†y n√†y (ƒë·ªÉ tr√°nh double) -> C∆° ch·∫ø Overwrite
        con.execute(f"DELETE FROM fact_market_features WHERE interval='{interval_name}' AND date_part='{date_process}'")

        # Insert
        con.execute(f"""
            INSERT INTO fact_market_features
            SELECT * FROM read_parquet('{parquet_path}')
        """)
        print("‚úÖ Features Loaded Successfully.")

    except Exception as e:
        print(f"‚ö†Ô∏è DuckDB Error: {e}")
    finally:
        con.close()

if __name__ == "__main__":
    spark = get_spark()
    spark.sparkContext.setLogLevel("ERROR")

    # Config ng√†y ch·∫°y (Th∆∞·ªùng l√† ng√†y h√¥m qua T-1 ho·∫∑c ch·∫°y ƒë·ªãnh k·ª≥)
    # V√≠ d·ª•: Run cho ng√†y 2025-12-06
    target_date = "2025-12-06"

    # Ch·∫°y cho nhi·ªÅu khung th·ªùi gian
    intervals = [("5m", "5 minutes"), ("1h", "1 hour")]

    for name, window in intervals:
        success = calculate_complex_features(spark, target_date, name, window)
        if success:
            load_features_to_duckdb(name, target_date)

    spark.stop()

upgrade1: fit for training ML/AI

In [None]:
import duckdb
from datetime import datetime, timedelta
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
STAGING_OUTPUT_PATH = f"s3a://{S3_BUCKET}/silver/agg_trades/"

def get_spark():
    return SparkSession.builder \
        .appName("Feature_Engineering_Job") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .getOrCreate()
def parse_interval_to_minutes(interval_str):
    """
    Chuy·ªÉn ƒë·ªïi chu·ªói interval ('1m', '1h', '1d') th√†nh s·ªë ph√∫t (int).
    D√πng ƒë·ªÉ t√≠nh buffer an to√†n.
    """
    unit = interval_str[-1].lower()
    try:
        value = int(interval_str[:-1])
    except:
        return 60 # Default fallback

    if unit == 'm': return value
    if unit == 'h': return value * 60
    if unit == 'd': return value * 1440
    return 60
def get_last_processed_time(interval_name):
    """H·ªèi DuckDB xem l·∫ßn cu·ªëi t√≠nh feature l√† l√∫c n√†o"""
    con = duckdb.connect(DUCKDB_PATH)
    try:
        # Ki·ªÉm tra b·∫£ng c√≥ t·ªìn t·∫°i kh√¥ng
        table_exists = con.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'fact_market_features'").fetchone()[0]
        if table_exists == 0:
            return None

        query = f"SELECT MAX(candle_time) FROM fact_market_features WHERE interval = '{interval_name}'"
        result = con.execute(query).fetchone()[0]
        return result # Tr·∫£ v·ªÅ datetime ho·∫∑c None
    except:
        return None
    finally:
        con.close()

def calculate_features_incremental(spark, interval_name, interval_window):
    print(f"\nüöÄ Processing Features: {interval_name} ({interval_window})")
    # 1. T√çNH TO√ÅN BUFFER DYNAMIC (Quan tr·ªçng)
    interval_minutes = parse_interval_to_minutes(interval_name)

    # Quy t·∫Øc an to√†n: L√πi l·∫°i √≠t nh·∫•t 2 l·∫ßn ƒë·ªô d√†i n·∫øn + 10 ph√∫t tr·ªÖ
    # V√≠ d·ª•: 12h -> L√πi 24h. 1m -> L√πi 12 ph√∫t.
    buffer_minutes = (interval_minutes * 2) + 10

    last_time = get_last_processed_time(interval_name)
    cutoff_time = datetime.now() - timedelta(minutes=1)
    input_path = f"s3a://{S3_BUCKET}/silver/trades/"

    # ƒê·ªçc d·ªØ li·ªáu (T·ªëi ∆∞u: Ch·ªâ ƒë·ªçc c√°c file parquet c·∫ßn thi·∫øt n·∫øu c√≥ partition date)
    # ·ªû ƒë√¢y ƒë·ªçc full folder r·ªìi filter (Spark s·∫Ω t·ª± t·ªëi ∆∞u ƒë·∫©y filter xu·ªëng)
    df = spark.read.parquet(input_path)

    # if last_time:
    #     start_filter = last_time - timedelta(minutes=buffer_minutes)
    #     print(f"   ‚ÑπÔ∏è Incremental Mode: Reading trades after {start_filter} (Buffer: {buffer_minutes}m)")
    #     df = df.filter(F.col("trade_time") >= F.lit(start_filter))
    # else:
    #     print("   ‚ÑπÔ∏è Full Load Mode: Reading all trades")
    if last_time:
        # Th·ªùi ƒëi·ªÉm b·∫Øt ƒë·∫ßu c·∫ßn l·∫•y d·ªØ li·ªáu (tr·ª´ hao buffer)
        start_timestamp = last_time - timedelta(minutes=buffer_minutes)

        # --- K·ª∏ THU·∫¨T PARTITION PRUNING (QUAN TR·ªåNG) ---
        # Chuy·ªÉn ƒë·ªïi timestamp th√†nh chu·ªói ng√†y (YYYY-MM-DD)
        start_date_str = start_timestamp.strftime("%Y-%m-%d")

        print(f"   ‚ÑπÔ∏è Incremental Mode: Reading from {start_timestamp}")
        print(f"   üìÇ Partition Pruning: Reading folders from date_part >= {start_date_str}")

        # B∆∞·ªõc 1: L·ªçc Folder (Nhanh) - Spark s·∫Ω b·ªè qua c√°c folder c≈©
        # date_part v·∫´n c√≥ trong file parquet d∆∞·ª£c ƒë∆∞a l√™n silver nh∆∞ng s·∫Ω ko insert v√†o table DB
        df = df.filter(F.col("date_part") >= F.lit(start_date_str))

        # B∆∞·ªõc 2: L·ªçc chi ti·∫øt Time (Ch√≠nh x√°c) - L·∫•y ƒë√∫ng ph√∫t/gi√¢y
        df = df.filter(F.col("trade_time") >= F.lit(start_timestamp))

    else:
        print("   ‚ÑπÔ∏è Full Load Mode: Reading all partitions")
    try:
        # L·∫•y d√≤ng c√≥ th·ªùi gian nh·ªè nh·∫•t trong batch hi·ªán t·∫°i
        min_row = df.select(F.min("trade_time")).collect()
        min_data_time = min_row[0][0]

        if min_data_time is None:
            print("   ‚ö†Ô∏è No data found in this range.")
            return False

        print(f"   üïí Batch Min Data Time: {min_data_time}")
    except Exception as e:
        print(f"   ‚ö†Ô∏è Error checking min time: {e}")
        return False
    # 2. T√≠nh to√°n Features (Gi·ªØ nguy√™n logic ph·ª©c t·∫°p c·ªßa b·∫°n)
    df = df.withColumn("turnover", F.col("price") * F.col("quantity"))

    features_df = df.groupBy(
        F.col("symbol"),
        F.window(F.col("trade_time"), interval_window).alias("window")
    ).agg(
        # Basic
        F.min(F.struct("trade_time", "price")).getItem("price").alias("open"),
        F.max("price").alias("high"),
        F.min("price").alias("low"),
        F.max(F.struct("trade_time", "price")).getItem("price").alias("close"),
        F.sum("quantity").alias("volume"),
        F.sum("turnover").alias("total_turnover"),
        F.count("*").alias("trade_count"),
        # Advanced (Skew, Kurtosis...)
        F.stddev("price").alias("price_std"),
        F.skewness("price").alias("price_skew"),
        F.kurtosis("price").alias("price_kurtosis"),
        # percentile_approx l√† h√†m x·∫•p x·ªâ r·∫•t nhanh tr√™n Big Data
        F.percentile_approx("price", 0.25).alias("price_q25"),
        F.percentile_approx("price", 0.50).alias("price_median"),
        F.percentile_approx("price", 0.75).alias("price_q75"),
        # Buy/Sell Volume
        F.sum(F.when(F.col("side") == "buy", F.col("quantity")).otherwise(0)).alias("vol_buy"),
        F.sum(F.when(F.col("side") == "sell", F.col("quantity")).otherwise(0)).alias("vol_sell"),
        F.count(F.when(F.col("side") == "buy", 1)).alias("count_buy"),
        F.count(F.when(F.col("side") == "sell", 1)).alias("count_sell"),
        # --- D. SIZE DISTRIBUTION (Ph√¢n ph·ªëi kh·ªëi l∆∞·ª£ng l·ªánh) ---
        F.max("quantity").alias("size_max"),
        F.avg("quantity").alias("size_mean"),
        F.stddev("quantity").alias("size_std")
    )
    features_df = features_df.filter(
        (F.col("window.end") <= F.lit(cutoff_time)) &
        (F.col("window.start") >= F.lit(min_data_time))
    )

    # Ki·ªÉm tra xem c√≥ d·ªØ li·ªáu kh√¥ng sau khi l·ªçc
    if features_df.rdd.isEmpty():
        print(f"   ‚ö†Ô∏è No closed candles found for {interval_name}. Waiting for more data...")
        return False
    final_df = features_df.select(
        F.col("symbol"),
        F.col("window.start").alias("candle_time"),
        F.lit(interval_name).alias("interval"),
        "open", "high", "low", "close", "volume", "trade_count",
        "price_std","price_skew", "price_kurtosis", "price_q25","price_median","price_q75",
        "vol_buy", "vol_sell","count_buy","count_sell","size_max","size_mean","size_std",
        (F.col("total_turnover") / F.col("volume")).alias("vwap"),
        F.current_timestamp().alias("ingestion_time")
    )

    # 3. Ghi ra Staging Path (Ch·∫ø ƒë·ªô OVERWRITE cho folder staging n√†y th√¥i)
    # Folder n√†y ch·ªâ ch·ª©a data c·ªßa l·∫ßn ch·∫°y n√†y, kh√¥ng ch·ª©a data c≈©
    # Partition by date_part ƒë·ªÉ t·ªëi ∆∞u file size
    #final_df = final_df.withColumn("date_part", F.date_format("candle_time", "yyyy-MM-dd"))

    # Path ri√™ng cho interval n√†y trong staging
    staging_path = f"{STAGING_OUTPUT_PATH}/{interval_name}"

    print(f"   üíæ Writing to Staging: {staging_path}")
    final_df.write.mode("overwrite").parquet(staging_path)

    return True

def merge_to_duckdb(interval_name):
    print(f"ü¶Ü Merging {interval_name} into DuckDB...")
    con = duckdb.connect(DUCKDB_PATH)
    with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
        sql_script = f.read()
    con.execute(sql_script)
    staging_source = f"s3://{S3_BUCKET}/silver/agg_trades/{interval_name}/*.parquet"

    try:
        # Create Table (N·∫øu ch∆∞a c√≥)
        con.execute(f"""
            CREATE TABLE IF NOT EXISTS fact_market_features AS
            SELECT * FROM read_parquet('{staging_source}') LIMIT 0
        """)

        # X√≥a d·ªØ li·ªáu c≈© tr√πng l·∫∑p (D·ª±a tr√™n symbol, time V√Ä interval)
        # V√¨ ta ƒëang ch·∫°y loop, ch·ªâ x√≥a nh·ªØng d√≤ng thu·ªôc interval ƒëang x·ª≠ l√Ω
        print("   üîÑ Cleaning overlapping data...")
        con.execute(f"""
            DELETE FROM fact_market_features
            WHERE interval = '{interval_name}'
            AND (symbol, candle_time) IN (
                SELECT symbol, candle_time FROM read_parquet('{staging_source}')
            )
        """)

        # Insert m·ªõi
        print("   üì• Inserting new data...")
        con.execute(f"""
            INSERT INTO fact_market_features
            SELECT * FROM read_parquet('{staging_source}')
        """)

        print("   ‚úÖ Merge Complete.")

    except Exception as e:
        print(f"   ‚ö†Ô∏è DuckDB Merge Error: {e}")
    finally:
        con.close()

if __name__ == "__main__":
    spark = get_spark()
    spark.sparkContext.setLogLevel("ERROR")

    # Danh s√°ch Interval c·∫ßn ch·∫°y
    intervals = [
        ("1m", "1 minute"),
        ("5m", "5 minutes"),
        ("15m", "15 minutes"),
        ("1h", "1 hour"),
        ("4h", "4 hours"),
        ("12h", "12 hours") # Test interval l·ªõn
    ]

    print(f"STARTING BATCH PIPELINE FOR {len(intervals)} INTERVALS...")

    for name, window in intervals:
        # B∆∞·ªõc 1: T√≠nh to√°n
        calculate_features_incremental(spark, name, window)
        # B∆∞·ªõc 2: N·∫°p v√†o Fact
        merge_to_duckdb(name)

    spark.stop()

In [None]:
con = duckdb.connect('/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb')
with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
     sql_script = f.read()
con.execute(sql_script)
#rint(con.sql('PRAGMA show_tables;'))
#print(con.execute('drop table fact_trades'))
print(con.sql('select * from trades '))
#print(con.sql('select * from fact_market_features '))
#print(con.sql('select * from fact_ohlc_calculated'))
con.close()

upgrade 2: fit for dashboard chat bot

In [None]:
import duckdb
import os
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'

# Checkpoint l√† B·∫ÆT BU·ªòC ƒë·ªÉ Spark nh·ªõ tr·∫°ng th√°i (ƒë√£ t√≠nh ƒë·∫øn ƒë√¢u, n·∫øn 5m ƒë√£ gom ƒë∆∞·ª£c bao nhi√™u trade)
CHECKPOINT_ROOT = f"s3a://{S3_BUCKET}/checkpoints/features_streaming_v1/"
WATERMARK_DELAY = "1 minute" # ƒê·ªô tr·ªÖ cho ph√©p. Sau 1 ph√∫t k·ªÉ t·ª´ khi n·∫øn ƒë√≥ng, Spark s·∫Ω ghi file.

def get_spark():
    return SparkSession.builder \
        .appName("Streaming_Feature_Engineering") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.sql.shuffle.partitions", "4") \
        .getOrCreate()
        # T·ªëi ∆∞u cho stateful streaming


def get_duckdb_conn():
    con = duckdb.connect(DUCKDB_PATH)
    with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
             sql_script = f.read()
    con.execute(sql_script)
    return con

# --- LOGIC T√çNH TO√ÅN AGGREGATION (D√πng chung cho c√°c interval) ---
def build_aggregation_plan(df_stream, interval_name, interval_window):
    # 1. T√≠nh Turnover ƒë·ªÉ t√≠nh VWAP
    df = df_stream.withColumn("turnover", F.col("price") * F.col("quantity"))

    # 2. Group By Window & Aggregate
    # withWatermark l√† b·∫Øt bu·ªôc ƒë·ªÉ d√πng Append mode
    agg_df = df \
        .withWatermark("trade_time", WATERMARK_DELAY) \
        .groupBy(
            F.col("symbol"),
            F.window(F.col("trade_time"), interval_window).alias("window")
        ).agg(
            # Basic OHLC
            F.min(F.struct("trade_time", "price")).getItem("price").alias("open"),
            F.max("price").alias("high"),
            F.min("price").alias("low"),
            F.max(F.struct("trade_time", "price")).getItem("price").alias("close"),

            # Volume & Counts
            F.sum("quantity").alias("volume"),
            F.sum("turnover").alias("total_turnover"),
            F.count("*").alias("trade_count"),

            # Buy/Sell Pressure
            F.sum(F.when(F.col("side") == "buy", F.col("quantity")).otherwise(0)).alias("vol_buy"),
            F.sum(F.when(F.col("side") == "sell", F.col("quantity")).otherwise(0)).alias("vol_sell"),

            # Statistics (Approximate algorithms for Streaming efficiency)
            # L∆∞u √Ω: Streaming kh√¥ng h·ªó tr·ª£ percentile ch√≠nh x√°c ho√†n to√†n, d√πng approx
            F.stddev("price").alias("price_std"),
            F.skewness("price").alias("price_skew"),
            F.kurtosis("price").alias("price_kurtosis"),
            F.percentile_approx("price", 0.25).alias("price_q25"),
            F.percentile_approx("price", 0.50).alias("price_median"),
            F.percentile_approx("price", 0.75).alias("price_q75"),

            # Size stats
            F.max("quantity").alias("size_max"),
            F.avg("quantity").alias("size_mean"),
            F.stddev("quantity").alias("size_std")
        )

    # 3. Final Selection
    return agg_df.select(
        F.col("symbol"),
        F.col("window.start").alias("candle_time"),
        F.lit(interval_name).alias("interval"),

        "open", "high", "low", "close", "volume", "trade_count",
        "vol_buy", "vol_sell",
        (F.col("total_turnover") / F.col("volume")).alias("vwap"),

        "price_std", "price_skew", "price_kurtosis",
        "price_q25", "price_median", "price_q75",
        "size_max", "size_mean", "size_std",

        # Partition cho Hive
        F.date_format(F.col("window.start"), "yyyy-MM-dd").alias("date_part")
    )

# --- SPARK STREAMING RUNNER ---
def process_stream(spark, interval_name, interval_window):
    print(f"\nüöÄ SPARK STREAM: Processing {interval_name}...")

    # 1. Input: Read Stream t·ª´ Silver Trades
    input_path = f"s3a://{S3_BUCKET}/silver/trades/"
    try:
        # L·∫•y schema t·ª´ file m·∫´u ƒë·ªÉ Spark Stream bi·∫øt c·∫•u tr√∫c
        schema = spark.read.parquet(input_path).schema
    except:
        print("‚ö†Ô∏è No trades data found.")
        return

    df_trades = spark.readStream \
        .schema(schema) \
        .format("parquet") \
        .option("maxFilesPerTrigger", 1000) \
        .load(input_path)

    # 2. Transform
    final_df = build_aggregation_plan(df_trades, interval_name, interval_window)

    # 3. Output: Write Stream (Append Mode)
    # Ghi v√†o th∆∞ m·ª•c ri√™ng: silver/features/{interval_name}/
    output_path = f"s3a://{S3_BUCKET}/silver/features/{interval_name}"
    checkpoint_path = f"{CHECKPOINT_ROOT}/{interval_name}"

    query = final_df.writeStream \
        .format("parquet") \
        .outputMode("append") \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(availableNow=True) \
        .partitionBy("date_part") \
        .start(output_path)

    # AvailableNow: X·ª≠ l√Ω h·∫øt d·ªØ li·ªáu m·ªõi r·ªìi d·ª´ng -> Ph√π h·ª£p ƒë·ªÉ trigger ƒë·ªãnh k·ª≥ ho·∫∑c v√≤ng l·∫∑p
    query.awaitTermination()
    print(f"   ‚úÖ Spark {interval_name}: Done writing batch.")

# --- DUCKDB LOADER (INCREMENTAL) ---
def load_to_duckdb(interval_name):
    print(f"ü¶Ü [DuckDB] Syncing {interval_name} features...")
    con = get_duckdb_conn()

    # Path ƒë·ªçc file Parquet (Recursive)
    parquet_source = f"s3://{S3_BUCKET}/silver/features/{interval_name}/**/*.parquet"

    try:
        # 1. Update Dim Time (T·ª± ƒë·ªông th√™m ng√†y m·ªõi n·∫øu c√≥)
        # L·∫•y max time hi·ªán t·∫°i trong Fact
        # Logic: Ch·ªâ load nh·ªØng d√≤ng c√≥ candle_time > max_time trong DB

        # Check b·∫£ng t·ªìn t·∫°i ch∆∞a
        con.execute("CREATE TABLE IF NOT EXISTS fact_market_features AS SELECT * FROM read_parquet('" + parquet_source + "', hive_partitioning=1) LIMIT 0")

        # Get Max Time
        max_time_query = f"SELECT COALESCE(MAX(candle_time), '1970-01-01'::TIMESTAMP) FROM fact_market_features WHERE interval = '{interval_name}'"
        max_time = con.execute(max_time_query).fetchone()[0]
        print(f"   ‚ÑπÔ∏è Last time in DB: {max_time}")

        # 2. Update Dim Time (Ch·ªâ v·ªõi c√°c d√≤ng m·ªõi)
        con.execute(f"""
            INSERT INTO dim_time
            SELECT DISTINCT
                CAST(strftime(candle_time, '%Y%m%d') AS INTEGER) as date_key,
                CAST(candle_time AS DATE),
                EXTRACT(YEAR FROM candle_time), EXTRACT(QUARTER FROM candle_time),
                EXTRACT(MONTH FROM candle_time), EXTRACT(DAY FROM candle_time),
                ISODOW(candle_time), CASE WHEN ISODOW(candle_time) IN (6, 7) THEN TRUE ELSE FALSE END
            FROM read_parquet('{parquet_source}', hive_partitioning=1)
            WHERE candle_time > '{max_time}'
            AND CAST(strftime(candle_time, '%Y%m%d') AS INTEGER) NOT IN (SELECT date_key FROM dim_time)
        """)

        # 3. Insert Fact (Ch·ªâ d√≤ng m·ªõi)
        con.execute(f"""
            INSERT INTO fact_market_features (
                symbol_key, date_key, interval, candle_time,
                open, high, low, close, volume, trade_count,
                vol_buy, vol_sell, vwap,
                price_std, price_skew, price_kurtosis,
                price_q25, price_median, price_q75,
                size_max, size_mean, size_std
            )
            SELECT
                d.symbol_key,
                CAST(strftime(s.candle_time, '%Y%m%d') AS INTEGER) as date_key,
                s.interval, s.candle_time,
                s.open, s.high, s.low, s.close, s.volume, s.trade_count,
                s.vol_buy, s.vol_sell, s.vwap,
                s.price_std, s.price_skew, s.price_kurtosis,
                s.price_q25, s.price_median, s.price_q75,
                s.size_max, s.size_mean, s.size_std
            FROM read_parquet('{parquet_source}', hive_partitioning=1) s
            JOIN dim_symbol d ON s.symbol = d.symbol_code
            WHERE s.candle_time > '{max_time}'
        """)
        print("   ‚úÖ DuckDB Sync Success.")

    except Exception as e:
        print(f"   ‚ö†Ô∏è DuckDB Error: {e}")
    finally:
        con.close()

# --- MAIN LOOP ---
if __name__ == "__main__":
    spark = get_spark()
    spark.sparkContext.setLogLevel("ERROR")

    # ƒê·ªãnh nghƒ©a c√°c khung th·ªùi gian c·∫ßn theo d√µi
    intervals = [
        ("1m", "1 minute"),
        ("5m", "5 minutes"),
        # ("1h", "1 hour")
    ]

    print("üöÄ STARTING CONTINUOUS PIPELINE (Ctrl+C to stop)")

    try:
        while True:
            start_time = time.time()

            for name, window in intervals:
                # 1. Spark x·ª≠ l√Ω d·ªØ li·ªáu m·ªõi (Micro-batch)
                process_stream(spark, name, window)

                # 2. DuckDB n·∫°p d·ªØ li·ªáu m·ªõi v√†o Warehouse
                load_to_duckdb(name)

            # Ngh·ªâ m·ªôt ch√∫t tr∆∞·ªõc khi qu√©t ti·∫øp (tr√°nh spam CPU n·∫øu kh√¥ng c√≥ data m·ªõi)
            # V·ªõi c·∫•u h√¨nh AvailableNow, n·∫øu kh√¥ng c√≥ data m·ªõi, Spark ch·∫°y r·∫•t nhanh r·ªìi tho√°t.
            sleep_time = 60
            print(f"üí§ Sleeping {sleep_time}s before next micro-batch...")
            time.sleep(sleep_time)

    except KeyboardInterrupt:
        print("\nüõë Pipeline Stopped.")
        spark.stop()

order book


In [None]:
import duckdb
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
STAGING_OUTPUT_PATH = f"s3a://{S3_BUCKET}/silver/agg_orderbook/"

def get_spark():
    return SparkSession.builder \
        .appName("Orderbook_Feature_Engineering") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .getOrCreate()

def parse_interval_to_minutes(interval_str):
    unit = interval_str[-1].lower()
    try:
        value = int(interval_str[:-1])
    except:
        return 60
    if unit == 'm': return value
    if unit == 'h': return value * 60
    if unit == 'd': return value * 1440
    return 60

def get_last_processed_time(interval_name):
    con = duckdb.connect(DUCKDB_PATH)
    try:
        table_exists = con.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'fact_orderbook_features'").fetchone()[0]
        if table_exists == 0: return None

        query = f"SELECT MAX(candle_time) FROM fact_orderbook_features WHERE interval = '{interval_name}'"
        result = con.execute(query).fetchone()[0]
        return result
    except:
        return None
    finally:
        con.close()

def calculate_orderbook_features(spark, interval_name, interval_window):
    print(f"\nüìö Processing Orderbook: {interval_name} ({interval_window})")
    interval_minutes = parse_interval_to_minutes(interval_name)
    buffer_minutes = (interval_minutes * 2) + 10
    last_time = get_last_processed_time(interval_name)
    # Cutoff Time: Th·ªùi gian hi·ªán t·∫°i tr·ª´ 1 ph√∫t (ƒë·ªÉ ch·∫Øc ch·∫Øn data ƒë√£ v·ªÅ)
    # Ch·ªâ x·ª≠ l√Ω c√°c window k·∫øt th√∫c TR∆Ø·ªöC th·ªùi ƒëi·ªÉm n√†y
    cutoff_time = datetime.now() - timedelta(minutes=1)

    input_path = f"s3a://{S3_BUCKET}/silver/order_books/"
    df = spark.read.parquet(input_path)

    if last_time:
        start_timestamp = last_time - timedelta(minutes=buffer_minutes)
        start_date_str = start_timestamp.strftime("%Y-%m-%d")

        print(f"   ‚ÑπÔ∏è Mode: Incremental (Reading >= {start_timestamp})")

        # Partition Pruning (L·ªçc Folder)
        df = df.filter(F.col("date_part") >= F.lit(start_date_str))
        # Row Filtering (L·ªçc Time)
        df = df.filter(F.col("snapshot_time") >= F.lit(start_timestamp))
    else:
        print("   ‚ÑπÔ∏è Mode: Full Load")
    try:
        # L·∫•y d√≤ng c√≥ th·ªùi gian nh·ªè nh·∫•t trong batch hi·ªán t·∫°i
        min_row = df.select(F.min("snapshot_time")).collect()
        min_data_time = min_row[0][0]

        if min_data_time is None:
            print("   ‚ö†Ô∏è No data found in this range.")
            return False

        print(f"   üïí Batch Min Data Time: {min_data_time}")
    except Exception as e:
        print(f"   ‚ö†Ô∏è Error checking min time: {e}")
        return False
    # ---------------------------------------------------------
    # B∆Ø·ªöC 1: SNAPSHOT AGGREGATION (T√°i t·∫°o s·ªï l·ªánh t·∫°i m·ªói gi√¢y)
    # ---------------------------------------------------------
    # V√¨ d·ªØ li·ªáu Silver ƒë√£ b·ªã explode (m·ªói d√≤ng 1 m·ª©c gi√°), ta ph·∫£i gom l·∫°i
    # ƒë·ªÉ t√≠nh to√°n Best Bid, Best Ask, Total Depth... cho t·ª´ng snapshot_time

    snapshot_df = df.groupBy("symbol", "snapshot_time").agg(
        # Best Price
        F.max(F.when(F.col("side") == "bid", F.col("price"))).alias("best_bid"),
        F.min(F.when(F.col("side") == "ask", F.col("price"))).alias("best_ask"),

        # Total Volume (Depth)
        F.sum(F.when(F.col("side") == "bid", F.col("quantity"))).alias("sum_bid"),
        F.sum(F.when(F.col("side") == "ask", F.col("quantity"))).alias("sum_ask"),

        # Weighted Price components (Total Money = Price * Qty)
        F.sum(F.col("price") * F.col("quantity")).alias("total_turnover"),
        F.sum("quantity").alias("total_qty")
    )

    # T√≠nh c√°c Derived Features (Gi·ªëng Polars)
    # 1. Spread
    snapshot_df = snapshot_df.withColumn("spread", F.col("best_ask") - F.col("best_bid"))

    # 2. Mid Price
    snapshot_df = snapshot_df.withColumn("mid_price", (F.col("best_ask") + F.col("best_bid")) / 2)

    # 3. Weighted Mid Price (WMP) ~ total_turnover / total_qty
    snapshot_df = snapshot_df.withColumn("wmp", F.col("total_turnover") / F.col("total_qty"))

    # 4. Imbalance (Bid / (Bid + Ask))
    snapshot_df = snapshot_df.withColumn("imbalance",
        F.col("sum_bid") / (F.col("sum_bid") + F.col("sum_ask") + 1e-9)
    )

    # 5. Book Pressure (WMP - MidPrice)
    snapshot_df = snapshot_df.withColumn("book_pressure", F.col("wmp") - F.col("mid_price"))

    # ---------------------------------------------------------
    # B∆Ø·ªöC 2: WINDOW AGGREGATION (Gom snapshot th√†nh n·∫øn OHLC)
    # ---------------------------------------------------------
    features_df = snapshot_df.groupBy(
        F.col("symbol"),
        F.window(F.col("snapshot_time"), interval_window).alias("window")
    ).agg(
        # --- A. MID PRICE OHLC ---
        F.min(F.struct("snapshot_time", "mid_price")).getItem("mid_price").alias("mid_open"),
        F.max("mid_price").alias("mid_high"),
        F.min("mid_price").alias("mid_low"),
        F.max(F.struct("snapshot_time", "mid_price")).getItem("mid_price").alias("mid_close"),

        # --- B. WMP STATS (Weighted Mid Price) ---
        F.avg("wmp").alias("wmp_mean"),
        F.stddev("wmp").alias("wmp_std"),

        # --- C. SPREAD STATS ---
        F.avg("spread").alias("spread_mean"),
        F.max("spread").alias("spread_max"),

        # --- D. IMBALANCE STATS ---
        F.avg("imbalance").alias("imbal_mean"),
        F.min("imbalance").alias("imbal_min"),
        F.max("imbalance").alias("imbal_max"),
        F.percentile_approx("imbalance", 0.5).alias("imbal_median"),

        # --- E. BOOK PRESSURE ---
        F.avg("book_pressure").alias("pressure_mean"),
        F.stddev("book_pressure").alias("pressure_std"),

        # --- F. DEPTH STATS ---
        F.avg("sum_bid").alias("depth_bid_mean"),
        F.avg("sum_ask").alias("depth_ask_mean"),

        # Count
        F.count("*").alias("snapshot_count")
    )

    # ---------------------------------------------------------
    # B∆Ø·ªöC 3: STRICT FILTER (Ch·ªâ l·∫•y n·∫øn ƒë√£ ƒë√≥ng)
    # ---------------------------------------------------------
    features_df = features_df.filter(
        (F.col("window.end") <= F.lit(cutoff_time)) &
        (F.col("window.start") >= F.lit(min_data_time))
    )
    if features_df.rdd.isEmpty():
        print(f"   ‚ö†Ô∏è No closed candles found for {interval_name}. Waiting for data...")
        return False

    final_df = features_df.select(
        F.col("symbol"),
        F.col("window.start").alias("candle_time"),
        F.lit(interval_name).alias("interval"),

        "mid_open", "mid_high", "mid_low", "mid_close",
        "wmp_mean", "wmp_std",
        "spread_mean", "spread_max",
        "imbal_mean", "imbal_min", "imbal_max", "imbal_median",
        "pressure_mean", "pressure_std",
        "depth_bid_mean", "depth_ask_mean",
        "snapshot_count",
        F.current_timestamp().alias("ingestion_time")
    )

    # 4. WRITE TO STAGING (Overwriting interval specific folder)
    staging_path = f"{STAGING_OUTPUT_PATH}/{interval_name}"
    print(f"   üíæ Writing to Staging: {staging_path}")
    final_df.write.mode("overwrite").parquet(staging_path)

    return True

def merge_to_duckdb(interval_name):
    print(f"ü¶Ü Merging Orderbook {interval_name} into DuckDB...")
    con = duckdb.connect(DUCKDB_PATH)

    # Config MinIO
    with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
             sql_script = f.read()
    con.execute(sql_script)

    staging_source = f"s3://{S3_BUCKET}/silver/agg_orderbook/{interval_name}/*.parquet"

    try:
        # Create Table (Schema to l·ªõn)
        con.execute(f"""
            CREATE TABLE IF NOT EXISTS fact_orderbook_features AS
            SELECT * FROM read_parquet('{staging_source}') LIMIT 0
        """)

        # Clean overlapping data
        print("   üîÑ Cleaning overlapping data...")
        con.execute(f"""
            DELETE FROM fact_orderbook_features
            WHERE interval = '{interval_name}'
            AND (symbol, candle_time) IN (
                SELECT symbol, candle_time FROM read_parquet('{staging_source}')
            )
        """)

        # Insert new data
        print("   üì• Inserting new data...")
        con.execute(f"""
            INSERT INTO fact_orderbook_features
            SELECT * FROM read_parquet('{staging_source}')
        """)
        print("   ‚úÖ Merge Complete.")

    except Exception as e:
        if "No files found" in str(e):
            print(f"   ‚ÑπÔ∏è No new data for {interval_name}.")
        else:
            print(f"   ‚ö†Ô∏è DuckDB Error: {e}")
    finally:
        con.close()

if __name__ == "__main__":
    spark = get_spark()
    spark.sparkContext.setLogLevel("ERROR")

    intervals = [
        ("1m", "1 minute"),
        ("5m", "5 minutes"),
        ("15m", "15 minutes"),
        ("1h", "1 hour"),
        ("4h", "4 hours")
    ]

    print(f"STARTING ORDERBOOK BATCH PIPELINE...")

    for name, window in intervals:
        has_data = calculate_orderbook_features(spark, name, window)
        if has_data:
            merge_to_duckdb(name)

    spark.stop()

insert data tables

In [None]:
import boto3
import duckdb
from datetime import datetime, timezone, timedelta

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, to_timestamp, date_format, lit, current_timestamp
from pyspark.sql.types import (StructType, StructField, StringType, ArrayType, DoubleType)
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
def get_spark():
    return SparkSession.builder \
        .appName("ETL_Silver_Loader") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .getOrCreate()
def get_duckdb_conn():
    con = duckdb.connect(DUCKDB_PATH)
    with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
         sql_script = f.read()
    con.execute(sql_script)
    return con
def get_schemas():
    trade_element = StructType([
        StructField("instId", StringType(), True),
        StructField("tradeId", StringType(), True),
        StructField("px", StringType(), True),
        StructField("sz", StringType(), True),
        StructField("side", StringType(), True),
        StructField("ts", StringType(), True)
    ])

    funding_element = StructType([
        StructField("instId", StringType(), True),
        StructField("fundingRate", StringType(), True),
        StructField("nextFundingRate", StringType(), True),
        StructField("fundingTime", StringType(), True),
        StructField("nextFundingTime", StringType(), True)
    ])

    ohlc_element = ArrayType(StringType())

    book_element = StructType([

        StructField("asks", ArrayType(ArrayType(StringType())), True),
        StructField("bids", ArrayType(ArrayType(StringType())), True),
        #StructField("instId", StringType(), True),
        StructField("ts", StringType(), True)
    ])

    return {
        "trades": trade_element,
        "funding": funding_element,
        "ohlc": ohlc_element,
        "book": book_element
    }
def get_raw_schema(data_schema):
    return StructType([
        StructField("received_at", StringType(), True),
        StructField("payload", StructType([
            StructField("data", ArrayType(data_schema), True),
            StructField("arg", StructType([
                StructField("channel", StringType(), True),
                StructField("instId", StringType(), True)
            ]), True)
        ]), True)
    ])

def transform_trades(df):
    return df.select(
        col("received_at"), explode(col("payload.data")).alias("data")
    ).select(
        col("data.instId").alias("symbol"),
        col("data.tradeId").alias("tradeId"),
        col("data.side").alias("side"),
        col("data.px").cast("double").alias("price"),
        col("data.sz").cast("double").alias("quantity"),
        (col("data.ts").cast("long") / 1000).cast("timestamp").alias("trade_time"),
        col("received_at").cast("timestamp").alias("ingestion_time")
    ).dropDuplicates(["tradeId"])
def transform_funding(df_raw):
    return df_raw.select(
        col("received_at"), explode(col("payload.data")).alias("data")
    ).select(
        col("data.instId").alias("symbol"),
        lit("SWAP").alias("instrument_type"), # Funding th∆∞·ªùng l√† SWAP
        col("data.fundingRate").cast("double").alias("funding_rate"),
        col("data.nextFundingRate").cast("double").alias("next_funding_rate"),
        (col("data.fundingTime").cast("long") / 1000).cast("timestamp").alias("funding_time"),
        (col("data.nextFundingTime").cast("long") / 1000).cast("timestamp").alias("next_funding_time"),
        col("received_at").cast("timestamp").alias("ingestion_time")
    )
def transform_ohlc(df_raw, candle_type="mark"):
    # Index mapping: 0:ts, 1:o, 2:h, 3:l, 4:c
    return df_raw.select(
        col("received_at"),
        col("payload.arg.instId").alias("symbol"),
        col("payload.arg.channel").alias("channel"),
        explode(col("payload.data")).alias("c")
    ).select(
        col("symbol"),
        col("channel"),
        (col("c")[0].cast("long") / 1000).cast("timestamp").alias("candle_time"),
        col("c")[1].cast("double").alias("open"),
        col("c")[2].cast("double").alias("high"),
        col("c")[3].cast("double").alias("low"),
        col("c")[4].cast("double").alias("close"),
        col("received_at").cast("timestamp").alias("ingestion_time")
        #lit(candle_type).alias("type")
    )
def transform_orderbook(df_raw):
    # Logic ph·ª©c t·∫°p cho orderbook (Explode Asks/Bids -> Union)
    df_exploded = df_raw.select(
        col("received_at"),
        col("payload.arg.instId").alias("symbol"), # <--- L·∫§Y T·ª™ ARG
        explode(col("payload.data")).alias("book")
    )
    # Process Asks
    df_asks = df_exploded.select(
        col("symbol"),
        col("book.ts").alias("ts"),
        col("received_at"),  # Gi·ªØ l·∫°i c·ªôt n√†y
        explode(col("book.asks")).alias("asks"),
        lit("ask").alias("side")
    ).select(
        col("symbol"), col("side"),
        col("asks")[0].cast("double").alias("price"),
        col("asks")[1].cast("double").alias("quantity"),
        col("ts"), col("received_at")
    )

    # Process Bids
    df_bids = df_exploded.select(
        col("symbol"),
        col("book.ts").alias("ts"),
        col("received_at"),  # Gi·ªØ l·∫°i c·ªôt n√†y
        explode(col("book.bids")).alias("bids"),
        lit("bid").alias("side")
    ).select(
        col("symbol"), col("side"),
        col("bids")[0].cast("double").alias("price"),
        col("bids")[1].cast("double").alias("quantity"),
        col("ts"), col("received_at")
    )

    # Union v√† chu·∫©n h√≥a time
    return df_asks.union(df_bids) \
        .withColumn("snapshot_time", (col("ts").cast("long") / 1000).cast("timestamp")) \
        .withColumn("ingestion_time", col("received_at").cast("timestamp")) \
        .drop("ts", "received_at")
    # Quan tr·ªçng: D√≤ng tr√™n ƒë·ªïi t√™n received_at -> ingestion_time

def load_gold_layer(con, stg_table):
    print(f"   ‚ú® Loading Gold Layer for {stg_table}...")

    # 1. Update Dim Symbol
    con.execute(f"""
        INSERT INTO dim_symbol (symbol_code, base_currency, quote_currency)
        SELECT DISTINCT symbol, split_part(symbol, '-', 1), split_part(symbol, '-', 2)
        FROM {stg_table}
        WHERE symbol NOT IN (SELECT symbol_code FROM dim_symbol)
    """)

    time_col_map = {
        "trades": "trade_time",
        "funding_rate": "funding_time",
        "ohlc_mark": "candle_time",
        "order_books": "snapshot_time"
    }
    t_col = time_col_map.get(stg_table, "ingestion_time")

    con.execute(f"""
        INSERT INTO dim_time
        SELECT DISTINCT
            CAST(strftime({t_col}, '%Y%m%d') AS INTEGER) as date_key,
            CAST({t_col} AS DATE),
            EXTRACT(YEAR FROM {t_col}), EXTRACT(QUARTER FROM {t_col}),
            EXTRACT(MONTH FROM {t_col}), EXTRACT(DAY FROM {t_col}),
            ISODOW({t_col}), CASE WHEN ISODOW({t_col}) IN (6, 7) THEN TRUE ELSE FALSE END
        FROM {stg_table}
        WHERE CAST(strftime({t_col}, '%Y%m%d') AS INTEGER) NOT IN (SELECT date_key FROM dim_time)
    """)

    # 3. Insert Fact Tables
    if stg_table == "trades":
        con.execute("""
                    INSERT INTO fact_trades (symbol_key, date_key, trade_id, price, quantity, trade_time)
                    SELECT d.symbol_key,
                           CAST(strftime(s.trade_time, '%Y%m%d') AS INTEGER),
                           s.tradeId,
                           s.price,
                           s.quantity,
                           s.trade_time
                    FROM trades s
                             JOIN dim_symbol d ON s.symbol = d.symbol_code
                    WHERE s.trade_time > (SELECT COALESCE(MAX(trade_time), '1970-01-01'::TIMESTAMP) FROM fact_trades)
                    """)
    elif stg_table == "funding_rate":
        con.execute("""
                    INSERT INTO fact_funding_rate (symbol_key, date_key, funding_rate, next_funding_rate, funding_time)
                    SELECT d.symbol_key,
                           CAST(strftime(s.funding_time, '%Y%m%d') AS INTEGER),
                           s.funding_rate,
                           s.next_funding_rate,
                           s.funding_time
                    FROM funding_rate s
                             JOIN dim_symbol d ON s.symbol = d.symbol_code
                    WHERE s.funding_time >
                          (SELECT COALESCE(MAX(funding_time), '1970-01-01'::TIMESTAMP) FROM fact_funding_rate)
                    """)

    print("   ‚úÖ Gold Load Success")

In [None]:
spark=get_spark()
config_key='book'
bronze_prefix="bronze/okx_orderbook/"
stg_table= "order_books"
#transform_orderbook_func=transform_orderbook()
data_schema=get_schemas()["book"]
print(f"\nüöÄ Processing: {config_key} -> {stg_table}")

df_raw = spark.read.schema(get_raw_schema(data_schema)).json(f"s3a://{S3_BUCKET}/{bronze_prefix}*/*/*.jsonl.gz")
    # 2. Transform
df_silver = transform_orderbook(df_raw)
time_col = None
for c in ["trade_time", "funding_time", "candle_time", "snapshot_time"]:
    if c in df_silver.columns: time_col = col(c); break
df_silver = df_silver.withColumn(
    "date_part",
    date_format(time_col if time_col is not None else current_timestamp(), "yyyy-MM-dd"))
#df_silver.show()
silver_path = f"s3a://{S3_BUCKET}/silver/{stg_table}/"
df_silver.write.mode("append").partitionBy("date_part").format("parquet").save(silver_path)

index-price

In [None]:
import duckdb
from datetime import datetime, timedelta
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, TimestampType

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
STAGING_OUTPUT_PATH = f"s3a://{S3_BUCKET}/silver/indexPriceKlines/"
def get_spark():
    return SparkSession.builder \
        .appName("IndexPriceKlines_Aggregator") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .getOrCreate()

def parse_interval_to_minutes(interval_str):
    unit = interval_str[-1].lower()
    try:
        value = int(interval_str[:-1])
    except:
        return 60
    if unit == 'm': return value
    if unit == 'h': return value * 60
    if unit == 'd': return value * 1440
    return 60

def parse_interval_to_ms(interval_str):
    """ƒê·ªïi sang ms"""
    return parse_interval_to_minutes(interval_str) * 60 * 1000

def get_last_processed_time(interval_name):
    con = duckdb.connect(DUCKDB_PATH)
    try:
        table_exists = con.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'fact_indexpriceklines_features'").fetchone()[0]
        if table_exists == 0: return None
        query = f"SELECT MAX(close_time) FROM fact_indexpriceklines_features WHERE interval = '{interval_name}'"
        result = con.execute(query).fetchone()[0]
        return result
    except:
        return None
    finally:
        con.close()
def calculate_indexpriceklines_features(spark, interval_name, interval_window_ms):
    print(f"\nüìà Processing IndexPriceKlines: {interval_name} (window: {interval_window_ms / 1000 / 60} minutes)")
    # 1. SETUP TH·ªúI GIAN (Incremental + Buffer)
    interval_ms = parse_interval_to_ms(interval_name)
    buffer_ms = interval_ms * 2  # Buffer g·∫•p ƒë√¥i interval ƒë·ªÉ an to√†n
    last_time = get_last_processed_time(interval_name)
    cutoff_time = datetime.now() - timedelta(minutes=1)  # Kh√¥ng l·∫•y n·∫øn hi·ªán t·∫°i
    input_path = f"s3a://{S3_BUCKET}/silver/ohlc_index/"
    # ƒê·ªçc d·ªØ li·ªáu t·ª´ MinIO bronze (ƒë√£ n√©n jsonl.gz, partitioned date/hour)
    df = spark.read.parquet(input_path)

    df = df.filter(F.col("channel") == f"index-candle{interval_name}")

    # Transform raw th√†nh 1m klines (gi·ªëng ETL)
    df = df.withColumnRenamed("candle_time", "close_time")
    # Gi·∫£ ƒë·ªãnh time_col l√† close_time
    time_col = col("close_time")
    if last_time:
        start_timestamp = last_time - timedelta(milliseconds=buffer_ms)
        start_date_str = start_timestamp.strftime("%Y-%m-%d")
        prev_date_str = (start_timestamp - timedelta(days=1)).strftime("%Y-%m-%d")
        print(f" ‚ÑπÔ∏è Incremental Mode: Reading >= {start_timestamp}, including prev day {prev_date_str}")
        # Filter theo th·ªùi gian (Spark s·∫Ω pruning n·∫øu partitioned ƒë√∫ng)
        df = df.filter(time_col >= F.lit(start_timestamp))
    else:
        print(" ‚ÑπÔ∏è Mode: Full Load")
    # 2. CHECK MIN DATA TIME (ƒê·ªÉ ch·∫∑n n·∫øn thi·∫øu ƒë·∫ßu - gi·ªëng Polars strict filter)
    try:
        min_row = df.select(F.min(time_col).alias("min_time")).collect()[0]
        min_data_time = min_row["min_time"]
        if min_data_time is None:
            print(" ‚ö†Ô∏è No data found in this range.")
            return False
        print(f" üïí Batch Min Data Time: {min_data_time}")
    except Exception as e:
        print(f" ‚ö†Ô∏è Error checking min time: {e}")
        return False
    # 3. WINDOW AGGREGATION (Resample gi·ªëng Polars)
    interval_minutes = int(interval_ms / 1000 / 60)
    interval_window = f"{interval_minutes} minutes"
    features_df = df.groupBy(
        F.col("symbol"),
        F.window(time_col.cast(TimestampType()), interval_window).alias("window")
    ).agg(
        # OHLC - gi·ªëng h·ªát Polars
        F.first("open").alias("open"),  # First open in window
        F.max("high").alias("high"),
        F.min("low").alias("low"),
        F.last("close").alias("close"),  # Last close in window

        # Stats - gi·ªëng Polars
        F.avg("close").alias("mean"),
        F.stddev("close").alias("std")
    )

    # 4. STRICT FILTER (Closed candles only - gi·ªëng Polars)
#    cutoff_ms = cutoff_time.timestamp() * 1000
 #   min_data_ms = min_data_time if isinstance(min_data_time, int) else min_data_time.timestamp() * 1000
    cutoff_timestamp = cutoff_time
    features_df = features_df.filter(
        (F.col("window.end") <= F.lit(cutoff_timestamp)) &
        (F.col("window.start") >= F.lit(min_data_time))
    )

    if features_df.rdd.isEmpty():
        print(f" ‚ö†Ô∏è No closed candles found for {interval_name}.")
        return False

    # Th√™m metadata v√† partition column
    final_df = features_df.select(
        F.col("symbol"),
        F.col("window.end").alias("close_time"),  # S·ª≠ d·ª•ng end l√†m timestamp_dt
        F.lit(interval_name).alias("interval"),
        #F.col('channel').alias("interval"),
        "open", "high", "low", "close", "mean", "std",
        F.current_timestamp().alias("ingestion_time")
    ).withColumn(
        "date_part",
        F.date_format(F.col("close_time"), "yyyy-MM-dd")
    )

    # 5. WRITE TO STAGING (MinIO) - partitioned by date_part gi·ªëng silver ETL
    staging_path = f"{STAGING_OUTPUT_PATH}/{interval_name}"
    print(f" üíæ Writing to Staging: {staging_path}")
    final_df.write.mode("append").partitionBy("date_part").parquet(staging_path)
    return True

def merge_to_duckdb(interval_name):
    print(f"ü¶Ü Merging IndexPriceKlines {interval_name} into DuckDB...")
    con = duckdb.connect(DUCKDB_PATH)

    # Config MinIO cho DuckDB
    with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
            sql_script = f.read()
    con.execute(sql_script)

    staging_source = f"s3://{S3_BUCKET}/silver/indexPriceKlines/{interval_name}/*/*.parquet"  # ƒê·ªçc partitioned parquet

    try:
        # Create Fact Table n·∫øu ch∆∞a c√≥
        con.execute("""
            CREATE TABLE IF NOT EXISTS fact_indexpriceklines_features (
                symbol VARCHAR,
                close_time TIMESTAMP,
                interval VARCHAR,
                open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE,
                mean DOUBLE, std DOUBLE,
                ingestion_time TIMESTAMP
            )
        """)

        # Upsert Logic (Delete old + Insert new) - gi·ªëng incremental Polars
        print(" üîÑ Cleaning overlapping data...")
        con.execute(f"""
            DELETE FROM fact_indexpriceklines_features
            WHERE interval = '{interval_name}'
            AND (symbol, close_time) IN (
                SELECT symbol, close_time FROM read_parquet('{staging_source}', hive_partitioning=1)
            )
        """)

        print(" üì• Inserting new data...")
        con.execute(f"""
            INSERT INTO fact_indexpriceklines_features
            SELECT
                symbol,
                close_time,
                interval,
                open, high, low, close,
                mean, std,
                ingestion_time
            FROM read_parquet('{staging_source}', hive_partitioning=1)
        """)
        print(" ‚úÖ Merge Complete.")
    except Exception as e:
        if "No files found" in str(e):
            print(f" ‚ÑπÔ∏è No new data for {interval_name}.")
        else:
            print(f" ‚ö†Ô∏è DuckDB Error: {e}")
    finally:
        con.close()

if __name__ == "__main__":
    spark = get_spark()
    spark.sparkContext.setLogLevel("ERROR")

    intervals = [
        ("1m", parse_interval_to_ms("1m")),
        ("5m", parse_interval_to_ms("5m")),
        ("15m", parse_interval_to_ms("15m")),
        ("1h", parse_interval_to_ms("1h")),
        ("4h", parse_interval_to_ms("4h")),
        ("1d", parse_interval_to_ms("1d"))
    ]

    print("STARTING INDEXPRICEKLINES BATCH PIPELINE...")
    for name, window_ms in intervals:
        has_data = calculate_indexpriceklines_features(spark, name, window_ms)
        if has_data:
            merge_to_duckdb(name)

    spark.stop()

mark-price

In [None]:
import duckdb
from datetime import datetime, timedelta
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, TimestampType

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
STAGING_OUTPUT_PATH = f"s3a://{S3_BUCKET}/silver/markindex/"
def get_spark():
    return SparkSession.builder \
        .appName("IndexPriceKlines_Aggregator") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .getOrCreate()

def parse_interval_to_minutes(interval_str):
    unit = interval_str[-1].lower()
    try:
        value = int(interval_str[:-1])
    except:
        return 60
    if unit == 'm': return value
    if unit == 'h': return value * 60
    if unit == 'd': return value * 1440
    return 60

def parse_interval_to_ms(interval_str):
    """ƒê·ªïi sang ms"""
    return parse_interval_to_minutes(interval_str) * 60 * 1000

def get_last_processed_time(interval_name):
    con = duckdb.connect(DUCKDB_PATH)
    try:
        table_exists = con.execute("SELECT count(*) FROM information_schema.tables WHERE table_name = 'fact_indexpriceklines_features'").fetchone()[0]
        if table_exists == 0: return None
        query = f"SELECT MAX(close_time) FROM fact_indexpriceklines_features WHERE interval = '{interval_name}'"
        result = con.execute(query).fetchone()[0]
        return result
    except:
        return None
    finally:
        con.close()

In [None]:
path='s3a://trading-okx/silver/ohlc_index/'
spark=get_spark()
df=spark.read.parquet(path)
df.sort("ingestion_time",ascending=False).show(truncate=False)
out='s3a://trading-okx/silver/123/'
df.coalesce(1).write.mode('overwrite').csv(out)

In [None]:
import duckdb
from datetime import datetime, timedelta
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, TimestampType

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
STAGING_OUTPUT_PATH = f"s3a://{S3_BUCKET}/silver/markindex/"


def get_spark():
    return SparkSession.builder \
        .appName("IndexPriceKlines_Aggregator") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .getOrCreate()


In [None]:
input_path = f"s3a://trading-okx/silver/agg_orderbook/*/*"
spark=get_spark()
df = spark.read.parquet(input_path)
df.show(truncate=False)

In [None]:
import sys
import duckdb
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import broadcast
from functools import reduce
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
PATH_SILVER_BASE = f"s3a://trading-okx/silver"
DUCKDB_INIT_SCRIPT= '/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql'
#SYMBOL= "btc-usdt-swap"
SYMBOL= "btc-usdt"
PATH_GOLD_STAGING = "s3a://trading-okx/gold/staging_merged_features"
PATH_GOLD_FINAL = "s3a://trading-okx/gold/fact_merged_features"
SOURCE_MAPPING = {
    "trade": "agg_trades",
    "book": "agg_orderbook",
    "index": "indexPriceKlines",
    "mark": "markPriceKlines"
}
INTERVALS = ["1m","5m", "15m", "1h", "4h", "1d"]
# --- 1. UTILS ---
def get_spark_session():
    return SparkSession.builder \
        .appName("MultiTimeframe_Merger") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .getOrCreate()
def get_last_processed_time(interval):
    """L·∫•y timestamp cu·ªëi c√πng t·ª´ DuckDB ƒë·ªÉ ch·∫°y Incremental"""
    con = duckdb.connect(DUCKDB_PATH)
    try:
        # Load MinIO config
        with open(DUCKDB_INIT_SCRIPT, 'r') as f:
            con.execute(f.read())

        # Check table existence
        exists = con.execute(
            "SELECT count(*) FROM information_schema.tables WHERE table_name = 'fact_merged_features'"
        ).fetchone()[0]

        if exists == 0: return None

        # L·∫•y max time c·ªßa interval t∆∞∆°ng ·ª©ng
        query = f"SELECT MAX(timestamp) FROM fact_merged_features WHERE interval = '{interval}'"
        result = con.execute(query).fetchone()[0]
        return result
    except Exception as e:
        print(f"‚ö†Ô∏è DuckDB Info: {e}")
        return None
    finally:
        con.close()
def load_agg_source(spark, source_alias, table_name, interval, last_time):
    # C·∫•u tr√∫c path gi·∫£ ƒë·ªãnh: silver/agg_trade/btc-usdt/5m/
    # B·∫°n c·∫ßn ƒë·∫£m b·∫£o c·∫•u tr√∫c folder n√†y kh·ªõp v·ªõi pipeline silver c·ªßa b·∫°n
    path = f"{PATH_SILVER_BASE}/{table_name}/{interval}/"

    try:
        df = spark.read.parquet(path + "*")
    except Exception:
        print(f"   ‚ö†Ô∏è Path not found: {path}")
        return None
    if last_time:
        # L·∫•y l√πi 2 interval ƒë·ªÉ ƒë·∫£m b·∫£o kh·ªõp bi√™n (Boundary)
        buffer_time = last_time - timedelta(hours=1) # Buffer an to√†n
        start_date_str = buffer_time.strftime("%Y-%m-%d")

        print(f"   ‚ÑπÔ∏è Incr Load {source_alias}: >= {buffer_time}")
        df = df.filter(F.col("date_part") >= F.lit(start_date_str))

        # X√°c ƒë·ªãnh c·ªôt time (Th∆∞·ªùng c√°c b·∫£ng agg ƒë√£ chu·∫©n h√≥a t√™n c·ªôt time, v√≠ d·ª•: window_end ho·∫∑c timestamp)
        # ·ªû ƒë√¢y gi·∫£ ƒë·ªãnh c·ªôt time t√™n l√† 'timestamp' ho·∫∑c 'close_time'
        time_col = "candle_time" if "candle_time" in df.columns else "close_time"
        df = df.filter(F.col(time_col) >= F.lit(buffer_time))

    if df.rdd.isEmpty():
        return None

    # --- Standardization ---
    # 1. Chu·∫©n h√≥a c·ªôt time v·ªÅ 'timestamp'
    for t_col in ["close_time", "window_end", "candle_time"]:
        if t_col in df.columns:
            df = df.withColumnRenamed(t_col, "timestamp")
            break

    df = df.withColumn("timestamp", F.col("timestamp").cast(TimestampType()))

    # 2. ƒê·ªïi t√™n c√°c c·ªôt features (tr·ª´ timestamp v√† c√°c c·ªôt key)
    # V√≠ d·ª•: open -> trade_open, close -> trade_close
    exclude_cols = ["timestamp", "symbol", "interval", "date_part", "ingestion_time"]
    rename_mapping = {}

    for col_name in df.columns:
        if col_name not in exclude_cols:
            new_name = f"{source_alias}_{col_name}"
            rename_mapping[col_name] = new_name

    for old, new in rename_mapping.items():
        df = df.withColumnRenamed(old, new)
# Remove Silver metadata columns
    for col in [ "symbol", "interval", "date_part", "ingestion_time"]:
        if col in df.columns:
            df = df.drop(col)

    return df
def load_funding_rate(spark, last_time):
    # ƒê∆∞·ªùng d·∫´n funding rate
    path = f"s3a://{S3_BUCKET}/silver/funding_rate/*/*"
    try:
        df = spark.read.parquet(path)
    except:
        print("   ‚ö†Ô∏è No Funding Rate data found")
        return None

    if last_time:
        # Buffer l·ªõn h∆°n cho funding v√¨ n√≥ th∆∞a (8h/l·∫ßn)
        buffer_funding = last_time - timedelta(days=1)
        # funding_time ƒë√£ l√† Timestamp, so s√°nh tr·ª±c ti·∫øp ƒë∆∞·ª£c
        df = df.filter(F.col("funding_time") >= F.lit(buffer_funding))

    df = df.withColumnRenamed("funding_time", "timestamp")

    # ƒê·∫£m b·∫£o ki·ªÉu d·ªØ li·ªáu l√† TimestampType (cho ch·∫Øc ch·∫Øn)
    df = df.withColumn("timestamp", F.col("timestamp").cast(TimestampType()))

    # Ch·ªâ l·∫•y c·ªôt c·∫ßn thi·∫øt
    return df.select("timestamp", F.col("funding_rate").alias("funding_rate"))

def merge_features_for_interval(spark, interval):
    print(f"\nüöÄ Processing Interval: {interval}")
    last_time = get_last_processed_time(interval)
    data_frames = []

    # 1. Load Sources Agg (Loop)
    for alias, table_name in SOURCE_MAPPING.items():
        print(f"   üìÇ Loading {table_name}...")
        df = load_agg_source(spark, alias, table_name, interval, last_time)
        if df is not None:
            data_frames.append(df)

    # 2. Load Funding (Load 1 l·∫ßn duy nh·∫•t b√™n ngo√†i v√≤ng l·∫∑p)
    #funding_df = load_funding_rate(spark, last_time)

    if not data_frames:
        print(f"   ‚ö†Ô∏è No agg data found for {interval}. Skipping.")
        return False

    # 3. Merge Agg Data tr∆∞·ªõc
    print(f"   üîó Merging {len(data_frames)} agg sources...")

    # H√†m merge t·ªëi ∆∞u (b·ªè .count())
    def join_dfs(df1, df2):
        return df1.join(df2, on="timestamp", how="full_outer") \
                  .withColumn("timestamp", F.coalesce(df1["timestamp"], df2["timestamp"]))

    merged_df = reduce(join_dfs, data_frames)

    # 4. Join & Forward Fill Funding Rate
    # if funding_df is not None:
    #     print("   üîó Merging Funding Rate (with Forward Fill)...")
    #     # Full Outer Join v·ªõi Funding
    #     merged_df = merged_df.join(funding_df, on="timestamp", how="full_outer") \
    #                          .withColumn("timestamp", F.coalesce(merged_df["timestamp"], funding_df["timestamp"]))
    #
    #     # S·∫Øp x·∫øp ƒë·ªÉ window function ch·∫°y ƒë√∫ng
    #     merged_df = merged_df.orderBy("timestamp")
    #
    #     # K·ªπ thu·∫≠t Forward Fill trong Spark
    #     window_ffill = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    #     merged_df = merged_df.withColumn(
    #         "funding_rate",
    #         F.last("funding_rate", ignorenulls=True).over(window_ffill)
    #     )

    # 5. Clean & Enrich
    merged_df = merged_df \
        .withColumn("interval", F.lit(interval)) \
        .withColumn("symbol", F.lit(SYMBOL)) \
        .withColumn("date_part", F.date_format("timestamp", "yyyy-MM-dd")) \
        .withColumn("processed_at", F.current_timestamp())

    # 6. Write Staging
    output_path = f"{PATH_GOLD_STAGING}/{interval}"
    print(f"   üíæ Writing to Staging: {output_path}")
    merged_df.write.mode("overwrite").partitionBy("date_part").parquet(output_path)

    # 7. Sync DuckDB
    sync_to_duckdb(output_path, interval)
    return True
def process_and_merge_all(spark):
    print("\nüöÄ Starting Unified Merger Process...")

    all_interval_dfs = []

    # 1. Loop qua c√°c interval v√† gom DF v√†o list
    for interval in INTERVALS:
        df = merge_features_for_interval(spark, interval)
        if df is not None:
            all_interval_dfs.append(df)

    if not all_interval_dfs:
        print("‚ùå No data found for any interval.")
        return False

    print(f"\nüîó Unioning {len(all_interval_dfs)} intervals into one DataFrame...")

    # 2. Union All (G·ªôp t·∫•t c·∫£ interval l·∫°i)
    # unionByName: An to√†n h∆°n union th∆∞·ªùng n·∫øu th·ª© t·ª± c·ªôt b·ªã l·ªách
    final_big_df = reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), all_interval_dfs)

    # 3. Write to Gold (Partition by date_part AND interval)
    # Partition theo c·∫£ interval gi√∫p query nhanh h∆°n: WHERE interval='5m'
    print(f"üíæ Writing unified dataset to: {PATH_GOLD_STAGING}")

    final_big_df.write \
        .mode("overwrite") \
        .partitionBy("date_part", "interval") \
        .parquet(PATH_GOLD_STAGING)

    # 4. Sync DuckDB (1 l·∫ßn duy nh·∫•t)
    sync_to_duckdb(PATH_GOLD_STAGING)
    return True
def sync_to_duckdb(staging_path, interval):
    """Sync d·ªØ li·ªáu ƒë√£ merge v√†o DuckDB Fact Table"""
    print("   ü¶Ü Syncing to DuckDB...")
    con = duckdb.connect(DUCKDB_PATH)

    # Path ƒë·ªçc cho DuckDB (s3:// thay v√¨ s3a://)
    duck_read_path = staging_path.replace("s3a://", "s3://") + "/*/*/*.parquet"

    try:
        with open(DUCKDB_INIT_SCRIPT, 'r') as f:
            con.execute(f.read())

        # 1. T·∫°o b·∫£ng Fact (Schema Evolution)
        # S·ª≠ d·ª•ng m·∫πo: T·∫°o table t·ª´ file parquet r·ªóng ƒë·ªÉ l·∫•y schema t·ª± ƒë·ªông
        con.execute(f"""
            CREATE TABLE IF NOT EXISTS fact_merged_features AS
            SELECT * FROM read_parquet('{duck_read_path}', hive_partitioning=1) LIMIT 0
        """)

        # 2. X√≥a d·ªØ li·ªáu c≈© tr√πng l·∫∑p (Idempotency)
        con.execute(f"""
            DELETE FROM fact_merged_features
            WHERE interval = '{interval}'
            AND timestamp IN (
                SELECT timestamp FROM read_parquet('{duck_read_path}', hive_partitioning=1)
            )
        """)

        # 3. Insert d·ªØ li·ªáu m·ªõi
        # S·ª≠ d·ª•ng BY NAME ƒë·ªÉ map c·ªôt t·ª± ƒë·ªông b·∫•t k·ªÉ th·ª© t·ª±
        con.execute(f"""
            INSERT INTO fact_merged_features BY NAME
            SELECT * FROM read_parquet('{duck_read_path}', hive_partitioning=1)
        """)
        print("   ‚úÖ Sync Complete.")

    except Exception as e:
        if "No files found" in str(e):
            print("   ‚ö†Ô∏è No new files to sync.")
        else:
            print(f"   ‚ùå DuckDB Error: {e}")
    finally:
        con.close()

# ==========================================
# 4. MAIN ENTRYPOINT
# ==========================================
def main():
    spark = get_spark_session()
    spark.sparkContext.setLogLevel("ERROR")

    print("===========================================")
    print("   GOLD LAYER: FEATURE MERGER PIPELINE    ")
    print("===========================================")

    # for interval in INTERVALS:
    #     try:
    #         merge_features_for_interval(spark, interval)
    #     except Exception as e:
    #         print(f"‚ùå Error processing {interval}: {e}")
    process_and_merge_all(spark)
    spark.stop()
    print("\nüèÅ Pipeline Finished.")
if __name__ == "__main__":
    main()

create features

In [None]:
import duckdb
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, TimestampType

# --- CONFIG ---
S3_ENDPOINT = "http://localhost:9000"
S3_ACCESS = "minio"
S3_SECRET = "minio123"
S3_BUCKET = "trading-okx"
DUCKDB_PATH = '/mnt/d/learn/DE/Semina_project/datawarehouse.duckdb'
STAGING_OUTPUT_PATH = f"s3a://{S3_BUCKET}/gold/staging_derive"
FINAL_OUTPUT_PATH = f"s3a://{S3_BUCKET}/gold/derive_final"
INPUT_PATH = f"s3a://{S3_BUCKET}/gold/staging_merged_features/"
INTERVALS = ["5m", "15m", "1h", "4h", "1d"]
def get_spark_session():
    return SparkSession.builder \
        .appName("Derived_Features_Creator") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .getOrCreate()
def get_last_processed_time():
    con = duckdb.connect(DUCKDB_PATH)
    try:
        with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
            sql_script = f.read()
        con.execute(sql_script)

        table_exists = con.execute(
            "SELECT count(*) FROM information_schema.tables WHERE table_name = 'fact_derived_features'"
        ).fetchone()[0]

        if table_exists == 0:
            return None

        result = con.execute(
            "SELECT MAX(timestamp) FROM fact_derived_features"
        ).fetchone()[0]
        return result
    except Exception as e:
        print(f"Warning: Could not get last processed time: {e}")
        return None
    finally:
        con.close()
def load_data(spark):
    """Load merged data t·ª´ MinIO v·ªõi incremental n·∫øu c√≥"""
    last_time = get_last_processed_time()
    df = spark.read.parquet(INPUT_PATH)

    if last_time:
        start_timestamp = last_time - timedelta(days=1)  # Buffer 1 ng√†y
        print(f" ‚ÑπÔ∏è Incremental load: timestamp >= {start_timestamp}")
        df = df.filter(F.col("timestamp") >= F.lit(start_timestamp))

    if df.rdd.isEmpty():
        print("Error: No data loaded")
        return None

    # Chu·∫©n h√≥a timestamp
    df = df.withColumn("timestamp", F.col("timestamp").cast(TimestampType()))

    row_count = df.count()
    print(f"‚úì Loaded input data: {row_count} rows")
    return df
def create_macro_basis_features(df):
    """T·∫°o Macro & Basis features - gi·ªëng Polars"""
    print("üîß Creating Macro & Basis features...")
    for interval in INTERVALS:
        prefix_trade = f"trades_{interval}_"
        prefix_index = f"index_price_{interval}_"
        prefix_mark = f"mark_price_{interval}_"

        price_mean_trade = f"{prefix_trade}price_mean_trade"
        index_mean = f"{prefix_index}mean"
        mark_mean = f"{prefix_mark}mean"
        price_std_trade = f"{prefix_trade}price_std_trade"
        index_std = f"{prefix_index}std"

        if all(c in df.columns for c in [price_mean_trade, index_mean, mark_mean]):
            df = df.withColumn(f"feat_basis_spread_{interval}", F.col(price_mean_trade) - F.col(index_mean))
            df = df.withColumn(f"feat_basis_ratio_{interval}", (F.col(price_mean_trade) - F.col(index_mean)) / (F.col(index_mean) + 1e-9))
            df = df.withColumn(f"feat_premium_index_{interval}", (F.col(mark_mean) - F.col(index_mean)) / (F.col(index_mean) + 1e-9))
            if price_std_trade in df.columns and index_std in df.columns:
                df = df.withColumn(f"feat_volatility_spread_{interval}", F.col(price_std_trade) - F.col(index_std))
    return df
def create_log_returns(df):
    """T·∫°o log returns v√† trend divergence - gi·ªëng Polars"""
    print("üîß Creating log returns...")
    window = Window.orderBy("timestamp")

    for interval in INTERVALS:
        prefix_orderbook = f"orderbook_{interval}_"
        prefix_index = f"index_price_{interval}_"

        wmp_last = f"{prefix_orderbook}wmp_last"
        index_close = f"{prefix_index}close"

        if wmp_last in df.columns:
            df = df.withColumn(f"feat_log_return_trade_{interval}", F.log(F.col(wmp_last) / (F.lag(F.col(wmp_last), 1).over(window) + 1e-9)))

        if index_close in df.columns:
            df = df.withColumn(f"feat_log_return_index_{interval}", F.log(F.col(index_close) / (F.lag(F.col(index_close), 1).over(window) + 1e-9)))

        # Trend divergence sau khi c√≥ log returns
        trade_ret = f"feat_log_return_trade_{interval}"
        index_ret = f"feat_log_return_index_{interval}"
        if trade_ret in df.columns and index_ret in df.columns:
            df = df.withColumn(f"feat_trend_divergence_{interval}", F.col(trade_ret) - F.col(index_ret))

    return df
def create_funding_sentiment_features(df):
    """T·∫°o funding sentiment features - gi·ªëng Polars"""
    print("üîß Creating Funding & Sentiment features...")
    window_12 = Window.orderBy("timestamp").rowsBetween(-11, 0)  # 12 k·ª≥

    for interval in INTERVALS:
        prefix_orderbook = f"orderbook_{interval}_"

        wmp_last = f"{prefix_orderbook}wmp_last"
        funding_rate = "funding_funding_rate"
        basis_ratio = f"feat_basis_ratio_{interval}"

        if wmp_last in df.columns and funding_rate in df.columns:
            df = df.withColumn(f"feat_funding_cost_{interval}", F.col(funding_rate) * F.col(wmp_last))
            df = df.withColumn(f"feat_funding_trend_{interval}", F.col(funding_rate) - F.mean(funding_rate).over(window_12))

        if basis_ratio in df.columns:
            df = df.withColumn(f"feat_funding_basis_corr_{interval}", F.col(funding_rate) * F.col(basis_ratio))

    return df
def create_momentum_trend_features(df):
    """T·∫°o momentum features - gi·ªëng Polars"""
    print("üîß Creating Momentum & Trend features...")
    window_3 = Window.orderBy("timestamp").rowsBetween(-2, 0)  # 3 k·ª≥
    window_12 = Window.orderBy("timestamp").rowsBetween(-11, 0)  # 12 k·ª≥

    for interval in INTERVALS:
        prefix_orderbook = f"orderbook_{interval}_"

        wmp_last = f"{prefix_orderbook}wmp_last"
        wmp_min = f"{prefix_orderbook}wmp_min"
        wmp_max = f"{prefix_orderbook}wmp_max"

        if all(c in df.columns for c in [wmp_last, wmp_min, wmp_max]):
            df = df.withColumn(f"feat_price_velocity_{interval}", (F.col(wmp_last) - F.lag(F.col(wmp_last), 3)) / 3)
            df = df.withColumn(f"feat_ma_divergence_{interval}", F.col(wmp_last) / (F.mean(wmp_last).over(window_12) + 1e-9) - 1)
            df = df.withColumn(f"feat_rsi_proxy_{interval}", (F.col(wmp_last) - F.col(wmp_min)) / (F.col(wmp_max) - F.col(wmp_min) + 1e-9))

            # Acceleration sau velocity
            velocity_col = f"feat_price_velocity_{interval}"
            df = df.withColumn(f"feat_price_accel_{interval}", F.col(velocity_col) - F.lag(F.col(velocity_col), 1))

    return df
def create_order_flow_features(df):
    """T·∫°o order flow features - gi·ªëng Polars"""
    print("üîß Creating Order Flow features...")
    for interval in INTERVALS:
        prefix_trades = f"trades_{interval}_"
        prefix_orderbook = f"orderbook_{interval}_"

        volume_buy = f"{prefix_trades}volume_buy"
        volume_sell = f"{prefix_trades}volume_sell"
        volume_total = f"{prefix_trades}volume_total"
        count_buy = f"{prefix_trades}count_buy"
        count_sell = f"{prefix_trades}count_sell"

        sum_bid_50 = f"{prefix_orderbook}sum_bid_50"
        sum_ask_50 = f"{prefix_orderbook}sum_ask_50"
        total_depth_50 = f"{prefix_orderbook}total_depth_50"

        if all(c in df.columns for c in [volume_buy, volume_sell, volume_total]):
            df = df.withColumn(f"feat_trade_imbalance_{interval}", (F.col(volume_buy) - F.col(volume_sell)) / (F.col(volume_total) + 1e-9))

        if all(c in df.columns for c in [sum_bid_50, sum_ask_50, total_depth_50]):
            df = df.withColumn(f"feat_depth_imbalance_{interval}", (F.col(sum_bid_50) - F.col(sum_ask_50)) / (F.col(total_depth_50) + 1e-9))
            df = df.withColumn(f"feat_buy_consumption_{interval}", F.col(volume_buy) / (F.col(sum_ask_50) + 1e-9))
            df = df.withColumn(f"feat_sell_consumption_{interval}", F.col(volume_sell) / (F.col(sum_bid_50) + 1e-9))

        if all(c in df.columns for c in [count_buy, count_sell]):
            df = df.withColumn(f"feat_aggressiveness_{interval}", F.col(count_buy) / (F.col(count_sell) + 1e-9))

        # Smart money divergence sau imbalance
        trade_imbal = f"feat_trade_imbalance_{interval}"
        depth_imbal = f"feat_depth_imbalance_{interval}"
        if trade_imbal in df.columns and depth_imbal in df.columns:
            df = df.withColumn(f"feat_smart_money_div_{interval}", F.col(trade_imbal) - F.col(depth_imbal))

    return df
def create_volatility_liquidity_features(df):
    """T·∫°o volatility & liquidity - gi·ªëng Polars"""
    print("üîß Creating Volatility & Liquidity features...")
    for interval in INTERVALS:
        prefix_orderbook = f"orderbook_{interval}_"
        prefix_trades = f"trades_{interval}_"

        spread_mean = f"{prefix_orderbook}spread_mean"
        wmp_mean = f"{prefix_orderbook}wmp_mean"
        total_depth_50 = f"{prefix_orderbook}total_depth_50"

        price_max_trade = f"{prefix_trades}price_max_trade"
        price_min_trade = f"{prefix_trades}price_min_trade"
        price_mean_trade = f"{prefix_trades}price_mean_trade"
        price_last_trade = f"{prefix_trades}price_last_trade"

        if spread_mean in df.columns and wmp_mean in df.columns:
            df = df.withColumn(f"feat_rel_spread_{interval}", F.col(spread_mean) / (F.col(wmp_mean) + 1e-9))

        if total_depth_50 in df.columns and spread_mean in df.columns:
            df = df.withColumn(f"feat_liq_density_{interval}", F.col(total_depth_50) / (F.col(spread_mean) + 1e-9))

        if all(c in df.columns for c in [price_max_trade, price_min_trade, price_mean_trade, price_last_trade]):
            df = df.withColumn(f"feat_candle_range_{interval}", (F.col(price_max_trade) - F.col(price_min_trade)) / (F.col(price_mean_trade) + 1e-9))
            df = df.withColumn(f"feat_tail_extension_{interval}", (F.col(price_max_trade) - F.col(price_last_trade)) / (F.col(price_max_trade) - F.col(price_min_trade) + 1e-9))

    return df
def create_time_features(df):
    """T·∫°o time features - gi·ªëng Polars"""
    print("üîß Creating Time features...")
    df = df.withColumn("hour", F.hour("timestamp"))
    df = df.withColumn("feat_hour_sin", F.sin(2 * 3.141592653589793 * F.col("hour") / 24))
    df = df.withColumn("feat_hour_cos", F.cos(2 * 3.141592653589793 * F.col("hour") / 24))
    return df.drop("hour")
def create_efficiency_features(df):
    """T·∫°o efficiency features - gi·ªëng Polars"""
    print("üîß Creating Market Efficiency features...")
    window_12 = Window.orderBy("timestamp").rowsBetween(-11, 0)

    for interval in INTERVALS:
        prefix_orderbook = f"orderbook_{interval}_"
        wmp_last = f"{prefix_orderbook}wmp_last"

        if wmp_last in df.columns:
            df = df.withColumn("price_diff", F.abs(F.col(wmp_last) - F.lag(F.col(wmp_last), 1)))
            df = df.withColumn(f"feat_efficiency_ratio_{interval}", F.abs(F.col(wmp_last) - F.lag(F.col(wmp_last), 12)) / (F.sum("price_diff").over(window_12) + 1e-9))
            df = df.withColumn(f"feat_price_entropy_{interval}", F.stddev("wmp_last").over(window_12) / (F.mean("wmp_last").over(window_12) + 1e-9))
    return df.drop("price_diff") if "price_diff" in df.columns else df
def create_orderbook_shape_features(df):
    """T·∫°o orderbook shape - gi·ªëng Polars"""
    print("üîß Creating Orderbook Shape features...")
    for interval in INTERVALS:
        prefix = f"orderbook_{interval}_"

        sum_bid_20 = f"{prefix}sum_bid_20"
        sum_ask_20 = f"{prefix}sum_ask_20"
        sum_bid_5 = f"{prefix}sum_bid_5"
        sum_bid_50 = f"{prefix}sum_bid_50"
        best_bid = f"{prefix}best_bid"
        best_ask = f"{prefix}best_ask"
        bid_px_20 = f"{prefix}bid_px_20"
        ask_px_20 = f"{prefix}ask_px_20"

        if all(c in df.columns for c in [sum_bid_20, sum_ask_20, best_bid, best_ask, bid_px_20, ask_px_20]):
            df = df.withColumn(f"feat_bid_slope_{interval}", F.col(sum_bid_20) / (F.col(best_bid) - F.col(bid_px_20) + 1e-9))
            df = df.withColumn(f"feat_ask_slope_{interval}", F.col(sum_ask_20) / (F.col(ask_px_20) - F.col(best_ask) + 1e-9))
            bid_slope = f"feat_bid_slope_{interval}"
            ask_slope = f"feat_ask_slope_{interval}"
            df = df.withColumn(f"feat_slope_imbalance_{interval}", F.col(bid_slope) / (F.col(ask_slope) + 1e-9))

        if all(c in df.columns for c in [sum_bid_5, sum_bid_50]):
            df = df.withColumn(f"feat_depth_convexity_{interval}", F.col(sum_bid_5) / (F.col(sum_bid_50) + 1e-9))

    return df
def create_statistical_normalization(df):
    """T·∫°o Z-scores - gi·ªëng Polars"""
    print("üîß Creating Statistical Normalization features...")
    window_1h = Window.orderBy("timestamp").rowsBetween(-11, 0)  # 12 k·ª≥ = 1h cho 5m base

    for interval in INTERVALS:
        prefix_trades = f"trades_{interval}_"
        prefix_orderbook = f"orderbook_{interval}_"

        volume_total = f"{prefix_trades}volume_total"
        spread_mean = f"{prefix_orderbook}spread_mean"
        trade_imbalance = f"feat_trade_imbalance_{interval}"

        if volume_total in df.columns:
            df = df.withColumn(f"feat_z_volume_{interval}", (F.col(volume_total) - F.mean(volume_total).over(window_1h)) / (F.stddev(volume_total).over(window_1h) + 1e-9))

        if spread_mean in df.columns:
            df = df.withColumn(f"feat_z_spread_{interval}", (F.col(spread_mean) - F.mean(spread_mean).over(window_1h)) / (F.stddev(spread_mean).over(window_1h) + 1e-9))

        if trade_imbalance in df.columns:
            df = df.withColumn(f"feat_z_imbalance_{interval}", (F.col(trade_imbalance) - F.mean(trade_imbalance).over(window_1h)) / (F.stddev(trade_imbalance).over(window_1h) + 1e-9))

    return df
def create_vwap_pivot_features(df):
    """T·∫°o VWAP & Pivot features - gi·ªëng Polars"""
    print("üîß Creating VWAP & Pivot features...")
    window_1h = Window.orderBy("timestamp").rowsBetween(-11, 0)

    for interval in INTERVALS:
        prefix_orderbook = f"orderbook_{interval}_"
        prefix_trades = f"trades_{interval}_"

        wmp_last = f"{prefix_orderbook}wmp_last"
        vwap = f"{prefix_trades}vwap"  # Gi·∫£ ƒë·ªãnh c√≥ vwap t·ª´ trades

        if wmp_last in df.columns and vwap in df.columns:
            df = df.withColumn(f"feat_dist_vwap_{interval}", (F.col(wmp_last) - F.col(vwap)) / (F.col(vwap) + 1e-9))
            df = df.withColumn(f"feat_dist_max_{interval}", (F.col(wmp_last) - F.max(wmp_last).over(window_1h)) / (F.max(wmp_last).over(window_1h) + 1e-9))
            df = df.withColumn(f"feat_dist_min_{interval}", (F.col(wmp_last) - F.min(wmp_last).over(window_1h)) / (F.min(wmp_last).over(window_1h) + 1e-9))

    return df
def create_all_features(df):
    """G·ªçi tu·∫ßn t·ª± t·∫•t c·∫£ feature creator - gi·ªëng Polars"""
    print("üöÄ Creating derived features...")
    df = df.orderBy("timestamp")

    df = create_log_returns(df)
    df = create_macro_basis_features(df)
    df = create_funding_sentiment_features(df)
    df = create_momentum_trend_features(df)
    df = create_order_flow_features(df)
    df = create_volatility_liquidity_features(df)
    df = create_time_features(df)
    df = create_efficiency_features(df)
    df = create_orderbook_shape_features(df)
    df = create_statistical_normalization(df)
    df = create_vwap_pivot_features(df)

    return df
def save_features_data(df):
    """L∆∞u staging + upsert DuckDB + l∆∞u final gold - gi·ªëng merge code"""
    if df is None:
        return

    # Th√™m date_part ƒë·ªÉ partition
    #df = df.withColumn("date_part", F.date_format("timestamp", "yyyy-MM-dd"))

    # L∆∞u staging
    print(f" üíæ Writing staging to {STAGING_OUTPUT_PATH}")
    df.write.mode("overwrite").partitionBy("date_part").parquet(STAGING_OUTPUT_PATH)

    # Upsert v√†o DuckDB
    con = duckdb.connect(DUCKDB_PATH)
    try:
        with open('/mnt/d/learn/DE/Semina_project/SQL_db/config_dw/warehouse_source.sql', 'r') as f:
            sql_script = f.read()
        con.execute(sql_script)

        # T·∫°o b·∫£ng fact n·∫øu ch∆∞a c√≥
        con.execute(f"""
            CREATE TABLE IF NOT EXISTS fact_derived_features AS
            SELECT *  EXCLUDE date_part FROM read_parquet('{STAGING_OUTPUT_PATH}/*/*.parquet', hive_partitioning=1) LIMIT 0
        """)

        # X√≥a d·ªØ li·ªáu c≈© tr√πng timestamp
        print(" üîÑ Cleaning overlapping data...")
        con.execute(f"""
            DELETE FROM fact_derived_features
            WHERE timestamp IN (
                SELECT timestamp FROM read_parquet('{STAGING_OUTPUT_PATH}/*/*.parquet', hive_partitioning=1)
            )
        """)

        # Insert d·ªØ li·ªáu m·ªõi
        print(" üì• Inserting new data...")
        con.execute(f"""
            INSERT INTO fact_derived_features
            SELECT * EXCLUDE date_part
            FROM read_parquet('{STAGING_OUTPUT_PATH}/*/*.parquet', hive_partitioning=1)
        """)
        print(" ‚úÖ DuckDB upsert completed.")
    except Exception as e:
        print(f" ‚ö†Ô∏è DuckDB Error: {e}")
    finally:
        con.close()

    # L∆∞u final gold (kh√¥ng partition)
    print(f" üíæ Final gold saved to {FINAL_OUTPUT_PATH}")
    df.drop("date_part").write.mode("overwrite").parquet(FINAL_OUTPUT_PATH)


def main():
    spark = get_spark_session()
    spark.sparkContext.setLogLevel("ERROR")

    print("STARTING DERIVED FEATURES PIPELINE...")

    try:
        df = load_data(spark)
        if df is None:
            print("No data loaded. Exiting.")
            sys.exit(1)

        enhanced_df = create_all_features(df)
        save_features_data(enhanced_df)
        print("PIPELINE COMPLETED SUCCESSFULLY!")
    except Exception as e:
        print(f"Pipeline failed: {e}")
        sys.exit(1)
    finally:
        spark.stop()
if __name__ == "__main__":
    main()

In [None]:
from dotenv import load_dotenv