In [1]:
from smartpool_config import *

spark = create_spark("smartpool-silver")

print("Spark OK:", spark.version)
print("BASE:", BASE)
print("BRONZE:", BRONZE)
print("SILVER:", SILVER)
print("GOLD:", GOLD)
print("STATE:", STATE)
print("JDBC:", JDBC_URL)


:: loading settings :: url = jar:file:/opt/conda/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/jovyan/.ivy2.5.2/cache
The jars for the packages stored in: /home/jovyan/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b0779132-aeb9-4591-9e6a-0c418b228499;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 119ms :: artifacts dl 4ms
	:: modules in use:
	io.delta#delta-spark_2.13;4.0.0 from central in [default]
	io.delta#delta-storage;4.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.13.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf  

Spark OK: 4.0.1
BASE: s3a://spark/medallion
BRONZE: s3a://spark/medallion/bronze
SILVER: s3a://spark/medallion/silver
GOLD: s3a://spark/medallion/gold
STATE: s3a://spark/medallion/_state
JDBC: jdbc:sqlserver://sqlserver:1433;databaseName=smartpool;encrypt=true;trustServerCertificate=true;


In [19]:
POOLS_BRONZE = f"{BRONZE}/pools_dim"
EVENTS_BRONZE = f"{BRONZE}/maintenance_events"

POOLS_SILVER = f"{SILVER}/pools_dim"
EVENTS_SILVER = f"{SILVER}/maintenance_events"

POOLS_SILVER : s3a://spark/medallion/silver/pools_dim
EVENTS_SILVER: s3a://spark/medallion/silver/maintenance_events
GOLD_DAILY   : s3a://spark/medallion/gold/pool_daily_metrics
GOLD_LATEST  : s3a://spark/medallion/gold/pool_latest_event


In [11]:
def delta_exists(path: str) -> bool:
    try:
        return DeltaTable.isDeltaTable(spark, path)
    except Exception:
        return False


In [12]:
bronze_pools = spark.read.format("delta").load(POOLS_BRONZE)

# Normaliza tipos (por si acaso)
pools = (bronze_pools
    .select(
        F.col("pool_id").cast("int").alias("pool_id"),
        F.col("pool_name").cast("string").alias("pool_name"),
        F.col("location").cast("string").alias("location"),
        F.col("volume_liters").cast("int").alias("volume_liters"),
        F.col("is_heated").cast("boolean").alias("is_heated"),
        F.col("owner_type").cast("string").alias("owner_type"),
        F.col("updated_at").cast("timestamp").alias("updated_at"),
    )
    .filter(F.col("pool_id").isNotNull())
)

# Última versión por pool_id (si empatan timestamps, desempata por pool_id)
w = Window.partitionBy("pool_id").orderBy(F.col("updated_at").desc(), F.col("pool_id").desc())

pools_latest = (pools
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

(pools_latest.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(POOLS_SILVER)
)

print("[SILVER:pools_dim] OK ->", POOLS_SILVER, "rows:", pools_latest.count())
display(pools_latest.orderBy("pool_id"))


[SILVER:pools_dim] OK -> s3a://spark/medallion/silver/pools_dim rows: 6


DataFrame[pool_id: int, pool_name: string, location: string, volume_liters: int, is_heated: boolean, owner_type: string, updated_at: timestamp]

In [13]:
bronze_events = spark.read.format("delta").load(EVENTS_BRONZE)

# Curación + tipos
events = (bronze_events
    .select(
        F.col("id").cast("int").alias("id"),
        F.col("pool_id").cast("int").alias("pool_id"),
        F.col("event_time").cast("timestamp").alias("event_time"),
        F.col("intervention_type").cast("string").alias("intervention_type"),
        F.col("product_type").cast("string").alias("product_type"),
        F.col("product_amount").cast("double").alias("product_amount"),
        F.col("notes").cast("string").alias("notes"),
        F.col("updated_at").cast("timestamp").alias("updated_at"),
    )
)

# Data quality mínima (puedes ampliar)
allowed = ["chlorine", "refill", "ph_correction", "filter_backwash"]
events = (events
    .filter(F.col("id").isNotNull())
    .filter(F.col("pool_id").isNotNull())
    .filter(F.col("event_time").isNotNull())
    .filter(F.col("intervention_type").isNotNull())
    .filter(F.col("intervention_type").isin(allowed))
)

# Enriquecimiento útil para particionar y queries
events = events.withColumn("event_date", F.to_date("event_time"))

# Integridad referencial: solo pools que existan en Silver pools_dim
silver_pools = spark.read.format("delta").load(POOLS_SILVER).select("pool_id").dropDuplicates(["pool_id"])
events = events.join(silver_pools, on="pool_id", how="inner")

# Dedup por id: última versión por updated_at (y desempate por id)
w = Window.partitionBy("id").orderBy(F.col("updated_at").desc(), F.col("id").desc())
events_latest = (events
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

# Crea tabla Silver si no existe (particionada por event_date)
if not delta_exists(EVENTS_SILVER):
    (events_latest.limit(0).write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .partitionBy("event_date")
        .save(EVENTS_SILVER)
    )
    print("[SILVER:maintenance_events] creada ->", EVENTS_SILVER)

tgt = DeltaTable.forPath(spark, EVENTS_SILVER)

(tgt.alias("t")
    .merge(events_latest.alias("s"), "t.id = s.id")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

print("[SILVER:maintenance_events] MERGE OK ->", EVENTS_SILVER, "rows(updates dataset):", events_latest.count())
display(events_latest.orderBy(F.col("updated_at").desc()).limit(50))


26/01/25 20:12:11 WARN MapPartitionsRDD: RDD 385 was locally checkpointed, its lineage has been truncated and cannot be recomputed after unpersisting


[SILVER:maintenance_events] MERGE OK -> s3a://spark/medallion/silver/maintenance_events rows(updates dataset): 16


DataFrame[pool_id: int, id: int, event_time: timestamp, intervention_type: string, product_type: string, product_amount: double, notes: string, updated_at: timestamp, event_date: date]

In [14]:
print("SILVER pools rows:", spark.read.format("delta").load(POOLS_SILVER).count())
print("SILVER events rows:", spark.read.format("delta").load(EVENTS_SILVER).count())

display(spark.read.format("delta").load(EVENTS_SILVER).orderBy(F.col("event_time").desc()).limit(30))

SILVER pools rows: 6
SILVER events rows: 16


DataFrame[pool_id: int, id: int, event_time: timestamp, intervention_type: string, product_type: string, product_amount: double, notes: string, updated_at: timestamp, event_date: date]

In [15]:
# ===============
# QA / VALIDACIONES
# ===============

pools_s = spark.read.format("delta").load(POOLS_SILVER)
events_s = spark.read.format("delta").load(EVENTS_SILVER)

print("SILVER pools:", POOLS_SILVER, "rows:", pools_s.count())
print("SILVER events:", EVENTS_SILVER, "rows:", events_s.count())

# 1) Unicidad de claves en Silver
dup_pools = (pools_s.groupBy("pool_id").count().filter("count > 1").count())
dup_events = (events_s.groupBy("id").count().filter("count > 1").count())

if dup_pools != 0:
    raise Exception(f"[QA] pools_dim tiene duplicados por pool_id: {dup_pools}")
if dup_events != 0:
    raise Exception(f"[QA] maintenance_events tiene duplicados por id: {dup_events}")

print("[QA] Unicidad OK (pool_id, id)")

# 2) Nulos críticos
null_pools = pools_s.filter("pool_id IS NULL OR pool_name IS NULL OR updated_at IS NULL").count()
null_events = events_s.filter("id IS NULL OR pool_id IS NULL OR event_time IS NULL OR intervention_type IS NULL OR updated_at IS NULL").count()

if null_pools != 0:
    raise Exception(f"[QA] pools_dim tiene nulos críticos: {null_pools}")
if null_events != 0:
    raise Exception(f"[QA] maintenance_events tiene nulos críticos: {null_events}")

print("[QA] Not-null críticos OK")

# 3) Dominio de intervention_type (por si se cuela algo)
allowed = set(["chlorine", "refill", "ph_correction", "filter_backwash"])
bad_types = events_s.filter(~F.col("intervention_type").isin(list(allowed))).count()
if bad_types != 0:
    raise Exception(f"[QA] intervention_type fuera de catálogo: {bad_types}")
print("[QA] Catálogo intervention_type OK")

# 4) Integridad referencial: todos los pool_id de events deben existir en pools_dim
missing_fk = (
    events_s.select("pool_id").distinct()
    .join(pools_s.select("pool_id").distinct(), on="pool_id", how="left_anti")
    .count()
)
if missing_fk != 0:
    raise Exception(f"[QA] Hay pool_id en maintenance_events que no existen en pools_dim: {missing_fk}")

print("[QA] FK events.pool_id -> pools_dim.pool_id OK")

# 5) Validación de particionado real (debería ser event_date)
detail = spark.sql(f"DESCRIBE DETAIL delta.`{EVENTS_SILVER}`").collect()[0]
print("[QA] Partition columns EVENTS_SILVER:", detail["partitionColumns"])
if "event_date" not in detail["partitionColumns"]:
    raise Exception("[QA] EVENTS_SILVER no está particionada por event_date (revisa .partitionBy('event_date'))")

print("[QA] Particionado OK")

# 6) “Smoke test” rápido: top-N para inspección visual
display(pools_s.orderBy(F.col("updated_at").desc(), F.col("pool_id")).limit(20))
display(events_s.orderBy(F.col("updated_at").desc(), F.col("id")).limit(50))

# 7) Historial Delta (útil para demostrar MERGE/overwrite)
print("[QA] Delta history pools_dim")
display(DeltaTable.forPath(spark, POOLS_SILVER).history(10))
print("[QA] Delta history maintenance_events")
display(DeltaTable.forPath(spark, EVENTS_SILVER).history(10))

print("QA FINAL OK ✅")


SILVER pools: s3a://spark/medallion/silver/pools_dim rows: 6
SILVER events: s3a://spark/medallion/silver/maintenance_events rows: 16


26/01/25 20:12:23 ERROR TaskSchedulerImpl: Lost executor 1 on 172.20.0.8: Command exited with code 137
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_361_1!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_461_11!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_10_1!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_73_8!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_14_36!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_10_45!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_365_7!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_361_39!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_457_46!
26/01/25 20:12:23 WARN BlockManagerMasterEndpoint: No more 

[QA] Unicidad OK (pool_id, id)


                                                                                

[QA] Not-null críticos OK
[QA] Catálogo intervention_type OK
[QA] FK events.pool_id -> pools_dim.pool_id OK
[QA] Partition columns EVENTS_SILVER: ['event_date']
[QA] Particionado OK


DataFrame[pool_id: int, pool_name: string, location: string, volume_liters: int, is_heated: boolean, owner_type: string, updated_at: timestamp]

DataFrame[pool_id: int, id: int, event_time: timestamp, intervention_type: string, product_type: string, product_amount: double, notes: string, updated_at: timestamp, event_date: date]

[QA] Delta history pools_dim


DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,jobRunId:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string, engineInfo: string]

[QA] Delta history maintenance_events


DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,jobRunId:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string, engineInfo: string]

QA FINAL OK ✅


In [16]:
# Métrica simple: nº eventos por piscina y último evento
events_s = spark.read.format("delta").load(EVENTS_SILVER)
agg = (events_s
    .groupBy("pool_id")
    .agg(
        F.count("*").alias("num_events"),
        F.max("event_time").alias("last_event_time")
    )
    .orderBy(F.col("num_events").desc())
)
display(agg)

DataFrame[pool_id: int, num_events: bigint, last_event_time: timestamp]

In [18]:
spark.stop()