In [0]:
# ====================================================================
# ================= SILVER OHLCV 1m (STATEFUL STREAMING) ==============
# ====================================================================

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, lag, when, log
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StructType, StructField,
    TimestampType, StringType,
    DoubleType, IntegerType
)
import math
from pyspark.sql import Window
from pyspark.sql.functions import (
    col, when, from_utc_timestamp, regexp_replace,
    lead, sequence, explode, last, expr
)
from pyspark.sql import Window
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import (
    col, when, from_utc_timestamp, regexp_replace
)
from delta.tables import DeltaTable

In [0]:
# =========================================================
# ================= SILVER OHLCV 1m (STREAMING) ===========
# =========================================================

# ================= CONFIG =================

bronze_path = "abfss://datos@mastertfm002sta.dfs.core.windows.net/bronze/activos"
silver_path = "abfss://datos@mastertfm002sta.dfs.core.windows.net/silver/activos"
checkpoint_path = silver_path + "/_checkpoint"

# ================= SPARK =================

spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

#spark.conf.set("spark.sql.session.timeZone", "UTC")

In [0]:
# # ================= DIMENSIÓN DE ACTIVOS =================
# # symbol | asset_class | asset_name

asset_rows = [
    # Acciones
    Row("TSLA","Acciones","Tesla"),
    Row("NVDA","Acciones","Nvidia"),
    Row("AMD","Acciones","AMD"),
    Row("COIN","Acciones","Coinbase"),
    Row("PLTR","Acciones","Palantir"),
    Row("RIVN","Acciones","Rivian"),
    Row("SHOP","Acciones","Shopify"),
    Row("LCID","Acciones","Lucid Motors"),
    Row("ZM","Acciones","Zoom"),
    Row("SPCE","Acciones","Virgin Galactic"),
    Row("KO","Acciones","Coca Cola Company"),
    Row("PG","Acciones","Procter & Gamble"),
    Row("JNJ","Acciones","Johnson & Johnson"),
    Row("PEP","Acciones","PepsiCo"),
    Row("WMT","Acciones","Walmart"),
    Row("MCD","Acciones","McDonalds"),
    Row("VZ","Acciones","Verizon"),
    Row("DUK","Acciones","Duke Energy"),
    Row("UL","Acciones","Unilever"),
    Row("V","Acciones","VISA"),

    # Fondos
    Row("SPY","Fondos","SPDR S&P 500 ETF Trust"),
    Row("QQQ","Fondos","Invesco QQQ Trust"),
    Row("EEM","Fondos","iShares MSCI Emerging Markets ETF"),
    Row("VGK","Fondos","Vanguard FTSE Europe ETF"),
    Row("AGG","Fondos","iShares Core U.S. Aggregate Bond ETF"),
    Row("VNQ","Fondos","Vanguard Real Estate ETF"),
    Row("ARKK","Fondos","ARK Innovation ETF"),
    Row("VUG","Fondos","Vanguard Growth ETF"),
    Row("SCHD","Fondos","Schwab US Dividend Equity ETF"),
    Row("SOXX","Fondos","iShares Semiconductor ETF"),

    # Forex
    Row("EURUSD","Forex","Euro / Dólar"),
    Row("USDJPY","Forex","Dólar / Yen"),
    Row("GBPUSD","Forex","Libra / Dólar"),
    Row("USDCHF","Forex","Dólar / Franco Suizo"),
    Row("AUDUSD","Forex","Dólar Australiano / Dólar"),
    Row("USDCAD","Forex","Dólar / Dólar Canadiense"),
    Row("NZDUSD","Forex","Dólar Nueva Zelanda / Dólar"),
    Row("EURGBP","Forex","Euro / Libra"),
    Row("EURJPY","Forex","Euro / Yen"),
    Row("GBPJPY","Forex","Libra / Yen"),

    # Cripto
    Row("BTC-USD","Cripto","Bitcoin"),
    Row("ETH-USD","Cripto","Ethereum"),
    Row("BNB-USD","Cripto","Binance Coin"),
    Row("XRP-USD","Cripto","Ripple"),
    Row("SOL-USD","Cripto","Solana"),
    Row("TRX-USD","Cripto","Tron"),
    Row("DOGE-USD","Cripto","Dogecoin"),
    Row("ADA-USD","Cripto","Cardano"),
    Row("AVAX-USD","Cripto","Avalanche"),
    Row("LTC-USD","Cripto","Litecoin"),


    # Commodities
    Row("GLD","Commodities","Oro"),
    Row("SLV","Commodities","Plata"),
    Row("PPLT","Commodities","Platino"),
    Row("PALL","Commodities","Paladio"),
    Row("USO","Commodities","Petróleo"),
    Row("UNG","Commodities","Gas Natural"),
    Row("CORN","Commodities","Maiz"),
    Row("SOYB","Commodities","Soja"),
    Row("WEAT","Commodities","Trigo"),
    Row("CANE","Commodities","Azucar"),
]

asset_dim = spark.createDataFrame(
    asset_rows,
    ["symbol", "asset_class", "asset_name"]
)

In [0]:
# ================= FOREACH BATCH =================

def process_batch(df, batch_id):
    try:
        print(f"[SILVER] Procesando batch_id={batch_id}")

        df_clean = (
            df
            .dropDuplicates(["symbol", "timestamp"])
            .filter(col("timestamp").isNotNull())
            .filter(col("close").isNotNull())
            .filter(col("symbol").isNotNull())
            .filter(col("source").isNotNull())
        )

        # 🔹 LIMPIAR =X EN FOREX
        df_clean = df_clean.withColumn(
            "symbol",
            regexp_replace(col("symbol"), "=X$", "")
        )

        # ✅ COMPROBAR BATCH VACÍO
        if not df_clean.take(1):
            print(f"[SILVER] batch_id={batch_id} vacío")
            return

        # 🔹 JOIN CON DIMENSIÓN DE ACTIVOS
        silver_df = (
            df_clean
            .join(asset_dim, on="symbol", how="left")
            .withColumn(
                "coste_opera_h",
                when(col("asset_class") == "Acciones", 0.01)
                .when(col("asset_class") == "Fondos", 0.0001)
                .when(col("asset_class") == "Forex", 0.0002)
                .when(col("asset_class") == "Cripto", 0.01)
                .when(col("asset_class") == "Commodities", 0.01)
                .otherwise(None)
            )
            .select(
                "timestamp",
                "symbol",
                "asset_class",
                "asset_name",
                "coste_opera_h",
                "open",
                "high",
                "low",
                "close",
                "volume",
                "timezone",
                "source",
                "year",
                "month",
                "day",
            )
        )

        # =========================================================
        # 🔧 COMPLETAR HUECOS DE MINUTOS (AUTOMÁTICO POR SYMBOL)
        # =========================================================

        w_sym = Window.partitionBy("symbol").orderBy("timestamp")

        silver_filled = (
            silver_df
            .withColumn("next_ts", lead("timestamp").over(w_sym))
            .withColumn(
                "timestamp_filled",
                explode(
                    sequence(
                        col("timestamp"),
                        when(
                            col("next_ts").isNotNull(),
                            col("next_ts") - expr("INTERVAL 1 MINUTE")
                        ).otherwise(col("timestamp")),
                        expr("INTERVAL 1 MINUTE")
                    )
                )
            )
            .drop("timestamp", "next_ts")
            .withColumnRenamed("timestamp_filled", "timestamp")
        )

        # 🔹 FORWARD FILL DEL CLOSE
        w_fill = (
            Window.partitionBy("symbol")
            .orderBy("timestamp")
            .rowsBetween(Window.unboundedPreceding, 0)
        )

        silver_filled = silver_filled.withColumn(
            "close",
            last("close", ignorenulls=True).over(w_fill)
        )

        # =========================================================
        # 🔹 ESCRITURA SILVER
        # =========================================================

        if not DeltaTable.isDeltaTable(spark, silver_path):

            silver_filled.write \
                .format("delta") \
                .partitionBy("year", "month", "day") \
                .mode("overwrite") \
                .save(silver_path)

        else:

            delta_silver = DeltaTable.forPath(spark, silver_path)

            (
                delta_silver.alias("t")
                .merge(
                    silver_filled.alias("s"),
                    "t.symbol = s.symbol AND t.timestamp = s.timestamp"
                )
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute()
            )


        print(f"[SILVER] batch_id={batch_id} escrito correctamente")

    except Exception as e:
        print("🔥 ERROR REAL EN FOREACH BATCH 🔥")
        import traceback
        traceback.print_exc()
        raise e



In [0]:
# ================= STREAM =================

bronze_stream = (
    spark.readStream
    .format("delta")
    .option("ignoreDeletes", "true")
    .load(bronze_path)
)

query = (
    bronze_stream.writeStream
    .foreachBatch(process_batch)
    .option("checkpointLocation", checkpoint_path)
    .start()
)

query.awaitTermination()

[SILVER] Procesando batch_id=0
[SILVER] batch_id=0 escrito correctamente
[SILVER] Procesando batch_id=1
[SILVER] batch_id=1 escrito correctamente
[SILVER] Procesando batch_id=2
[SILVER] batch_id=2 escrito correctamente
[SILVER] Procesando batch_id=3
[SILVER] batch_id=3 escrito correctamente
[SILVER] Procesando batch_id=4
[SILVER] batch_id=4 escrito correctamente
[SILVER] Procesando batch_id=5
[SILVER] batch_id=5 escrito correctamente
[SILVER] Procesando batch_id=6
[SILVER] batch_id=6 escrito correctamente
[SILVER] Procesando batch_id=7
[SILVER] batch_id=7 escrito correctamente
[SILVER] Procesando batch_id=8
[SILVER] batch_id=8 escrito correctamente
[SILVER] Procesando batch_id=9
[SILVER] batch_id=9 escrito correctamente
[SILVER] Procesando batch_id=10
[SILVER] batch_id=10 escrito correctamente
[SILVER] Procesando batch_id=11
[SILVER] batch_id=11 escrito correctamente
[SILVER] Procesando batch_id=12
[SILVER] batch_id=12 escrito correctamente
[SILVER] Procesando batch_id=13
[SILVER] batc

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:190)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# dbutils.fs.rm(
#     "abfss://datos@mastertfm002sta.dfs.core.windows.net/silver/activos",
#     True
# )

# dbutils.fs.rm(
#     "abfss://datos@mastertfm002sta.dfs.core.windows.net/silver/activos/_checkpoint",
#     True
# )


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:190)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.data

In [0]:
#================= INIT SILVER (DELTA VACÍO) =================

# (
#     spark.read
#     .format("delta")
#     .load(bronze_path)
#     .limit(0)
#     .join(asset_dim, on="symbol", how="left")
#     .withColumn(
#         "coste_opera_h",
#         when(col("asset_class") == "Acciones", 0.01)
#         .when(col("asset_class") == "Fondos", 0.0001)
#         .when(col("asset_class") == "Forex", 0.0002)
#         .when(col("asset_class") == "Cripto", 0.01)
#         .when(col("asset_class") == "Commodities", 0.01)
#         .otherwise(None)
#     )
#     .select(
#         "timestamp",
#         "symbol",
#         "asset_class",
#         "asset_name",
#         "coste_opera_h",
#         "open",
#         "high",
#         "low",
#         "close",
#         "volume",
#         "timezone",
#         "source",
#         "year",
#         "month",
#         "day",
#     )
#     .write
#     .format("delta")
#     .mode("overwrite")
#     .partitionBy("year", "month", "day")
#     .save(silver_path)
# )



com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:190)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:465)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.data