In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Configurar acceso a ADLS Gen2
spark.conf.set(
    "fs.azure.account.key.stmeteoend2end.dfs.core.windows.net",
    "TU_KEY"
)

# Rutas Data Lake
BRONZE_BATCH_PATH = "abfss://datalake@stmeteoend2end.dfs.core.windows.net/bronze/batch/weather"
SILVER_PATH       = "abfss://datalake@stmeteoend2end.dfs.core.windows.net/silver/weather"

In [0]:
bronze_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("ip", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("pm25", DoubleType(), True),
    StructField("light", IntegerType(), True),
    StructField("uv_level", IntegerType(), True),
    StructField("pressure", DoubleType(), True),
    StructField("rain_raw", IntegerType(), True),
    StructField("wind_raw", DoubleType(), True),
    StructField("vibration", IntegerType(), True)
])

In [0]:
df_bronze = (
    spark.read
        .format("csv")
        .option("header", "true")
        .schema(bronze_schema)
        .load(BRONZE_BATCH_PATH)
)

In [0]:
df_silver = (
    df_bronze
    # parsear timestamp
    .withColumn("timestamp", to_timestamp("timestamp"))

    # eliminar duplicados
    .dropDuplicates(["timestamp", "ip"])

    # a√±adir metadata
    .withColumn("source", lit("batch"))
    .withColumn("ingestion_date", current_date())
    .withColumn("ingestion_timestamp", current_timestamp())
)

In [0]:
df_silver.write.format("delta").mode("append").save(SILVER_PATH)