In [8]:
from pyspark.sql import SparkSession
import os

# Definimos los paquetes necesarios:
# 1. Iceberg Runtime para Spark 3.5
# 2. AWS Java SDK bundle (para conectar con S3/MinIO)
# 3. Hadoop AWS (puente entre Hadoop FS y AWS SDK)
DEPENDENCIES = "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2," \
               "org.apache.hadoop:hadoop-aws:3.3.4"

spark = SparkSession.builder \
    .appName("Iceberg Lab") \
    .config("spark.jars.packages", DEPENDENCIES) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.my_catalog.type", "hadoop") \
    .config("spark.sql.catalog.my_catalog.warehouse", "s3://warehouse/") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print("Spark Session creada con soporte Iceberg y MinIO")

Spark Session creada con soporte Iceberg y MinIO


In [9]:
# Crear base de datos y tabla usando el catálogo 'my_catalog' definido arriba
spark.sql("CREATE NAMESPACE IF NOT EXISTS my_catalog.iot")

# Nota el 'PARTITIONED BY (days(ts))'. ¡Iceberg maneja la partición oculta!
spark.sql("""
    CREATE TABLE IF NOT EXISTS my_catalog.iot.sensors (
        sensor_id STRING,
        temperature DOUBLE,
        ts TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (days(ts))
""")

# Insertar datos iniciales
spark.sql("""
    INSERT INTO my_catalog.iot.sensors VALUES 
    ('s1', 20.5, CAST('2023-10-01 11:00:00' AS TIMESTAMP)),
    ('s2', 15.0, CAST('2023-10-01 11:05:00' AS TIMESTAMP)),
    ('s3', 22.1, CAST('2023-10-01 11:10:00' AS TIMESTAMP))
""")

print("Datos insertados:")
spark.sql("SELECT * FROM my_catalog.iot.sensors").show()

Datos insertados:
+---------+-----------+-------------------+
|sensor_id|temperature|                 ts|
+---------+-----------+-------------------+
|       s1|       21.0|2023-10-01 10:00:00|
|       s2|       15.0|2023-10-01 10:05:00|
|       s3|       22.1|2023-10-01 10:10:00|
|       s4|       18.5|2023-10-01 12:00:00|
|       s1|       20.5|2023-10-01 11:00:00|
|       s2|       15.0|2023-10-01 11:05:00|
|       s3|       22.1|2023-10-01 11:10:00|
+---------+-----------+-------------------+



In [10]:
from pyspark.sql import functions as F

# Crear una vista temporal con los datos entrantes (CDC o Batch nuevo)
incoming_data = [
    ('s1', 21.0, "2023-10-01 10:00:00"), # Corrección de temperatura
    ('s4', 18.5, "2023-10-01 12:00:00")  # Nuevo dato
]
df_updates = spark.createDataFrame(incoming_data, ["sensor_id", "temperature", "ts_str"]) \
                  .withColumn("ts", F.to_timestamp("ts_str")) \
                  .drop("ts_str")

df_updates.createOrReplaceTempView("incoming_updates")

# Ejecutar MERGE INTO (Estándar SQL ANSI)
spark.sql("""
    MERGE INTO my_catalog.iot.sensors AS target
    USING incoming_updates AS source
    ON target.sensor_id = source.sensor_id
    WHEN MATCHED THEN 
        UPDATE SET target.temperature = source.temperature
    WHEN NOT MATCHED THEN 
        INSERT (sensor_id, temperature, ts) VALUES (source.sensor_id, source.temperature, source.ts)
""")

print("Tabla después del Upsert (s1 cambió a 21.0, s4 agregado):")
spark.sql("SELECT * FROM my_catalog.iot.sensors ORDER BY sensor_id").show()

Tabla después del Upsert (s1 cambió a 21.0, s4 agregado):
+---------+-----------+-------------------+
|sensor_id|temperature|                 ts|
+---------+-----------+-------------------+
|       s1|       21.0|2023-10-01 10:00:00|
|       s1|       21.0|2023-10-01 11:00:00|
|       s2|       15.0|2023-10-01 10:05:00|
|       s2|       15.0|2023-10-01 11:05:00|
|       s3|       22.1|2023-10-01 10:10:00|
|       s3|       22.1|2023-10-01 11:10:00|
|       s4|       18.5|2023-10-01 12:00:00|
+---------+-----------+-------------------+



In [11]:
# Listar los snapshots (versiones de la tabla)
df_history = spark.sql("SELECT * FROM my_catalog.iot.sensors.history")
df_history.show(truncate=False)

# Obtener el primer Snapshot ID
first_snapshot_id = df_history.sort("made_current_at").first()["snapshot_id"]

print(f"Viajando al pasado (Snapshot ID: {first_snapshot_id})...")

# Consultar usando 'VERSION AS OF'
spark.read \
    .option("snapshot-id", first_snapshot_id) \
    .table("my_catalog.iot.sensors") \
    .filter("sensor_id = 's1'") \
    .show()

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2025-12-01 20:15:34.161|9191955214042176975|NULL               |true               |
|2025-12-01 20:18:24.593|3866507153852542820|9191955214042176975|true               |
|2025-12-01 23:54:58.062|290670476390267970 |3866507153852542820|true               |
|2025-12-01 23:55:03.476|8872319056561868535|290670476390267970 |true               |
+-----------------------+-------------------+-------------------+-------------------+

Viajando al pasado (Snapshot ID: 9191955214042176975)...
+---------+-----------+-------------------+
|sensor_id|temperature|                 ts|
+---------+-----------+-------------------+
|       s1|       20.5|2023-10-01 10:00:00|
+---------+-----------+-------------------+



In [12]:
from pyspark.sql.functions import col, days

# 1. Creamos un DataFrame de ejemplo (tu fuente de datos)
data = [
    (1, "sensor_A", 25.5, "2023-10-01 10:00:00"),
    (2, "sensor_B", 30.1, "2023-10-01 11:00:00")
]
columns = ["id", "device_id", "temp", "ts"]

df = spark.createDataFrame(data, columns) \
          .withColumn("ts", col("ts").cast("timestamp"))

# 2. Escribir a Iceberg en S3 sin SQL
# Supongamos que tu catálogo configurado se llama 'my_catalog'
table_name = "my_catalog.iot.sensors_programmatic"

df.writeTo(table_name) \
    .tableProperty("format-version", "2") \
    .partitionedBy(col("device_id"), days(col("ts"))) \
    .create()  # Ojo: Esto falla si la tabla ya existe