In [0]:
# LIBRERÍAS NECESARIAS
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    current_date, lit, col, trim, regexp_replace, when, to_date,
    current_timestamp, date_format, min as spark_min, max as spark_max
)
from delta.tables import DeltaTable
import pytz
from datetime import datetime

In [0]:
# Configuración de widgets
dbutils.widgets.dropdown("Ambiente", "Produccion", ["Desarrollo","Produccion"])
environment = dbutils.widgets.get("Ambiente")

In [0]:
if environment == "Produccion":
    storage_account = "stuaoprod003"
    catalog_gold   = "gold-shir"
    catalog_silver = "silver-shir"
    catalog_bronze = "bronze-shir"
    bucket_gold    = "gold"
    bucket_silver  = "silver"
    bucket_bronze  = "bronze"

elif environment == "Desarrollo":
    storage_account = "stuaoprod003"
    catalog_gold   = "gold-shir"
    catalog_silver = "silver-shir"
    catalog_bronze = "bronze-shir"
    bucket_gold    = "gold"
    bucket_silver  = "silver"
    bucket_bronze  = "bronze"

In [0]:
tzInfo = pytz.timezone('America/Bogota')
today = datetime.now(tz=tzInfo).strftime('%Y-%m-%d')

In [0]:
# SPARK SESSION
spark = (
    SparkSession.builder.appName("SilverTransform")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

In [0]:
# RUTAS A LOS DATOS EN BRONZE Y SILVER

ruta_posts_bronze = f"abfss://{bucket_bronze}@{storage_account}.dfs.core.windows.net/posts/posts_2020.parquet"
ruta_posts_silver = f"abfss://{bucket_silver}@{storage_account}.dfs.core.windows.net/posts"

# LECTURA DE PARQUETS DESDE BRONZE
df_posts = spark.read.format("parquet").load(ruta_posts_bronze)

In [0]:
print("=== POSTS BRONZE ===")
df_posts.printSchema()

=== POSTS BRONZE ===
root
 |-- Id: long (nullable = true)
 |-- PostTypeId: long (nullable = true)
 |-- AcceptedAnswerId: long (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- Score: long (nullable = true)
 |-- ViewCount: long (nullable = true)
 |-- Body: binary (nullable = true)
 |-- OwnerUserId: long (nullable = true)
 |-- OwnerDisplayName: binary (nullable = true)
 |-- LastEditorUserId: long (nullable = true)
 |-- LastEditorDisplayName: binary (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- Title: binary (nullable = true)
 |-- Tags: binary (nullable = true)
 |-- AnswerCount: long (nullable = true)
 |-- CommentCount: long (nullable = true)
 |-- FavoriteCount: long (nullable = true)
 |-- ContentLicense: binary (nullable = true)
 |-- ParentId: binary (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)



In [0]:
print(f"Total registros leídos desde Bronze: {df_posts.count()}")

Total registros leídos desde Bronze: 685915


In [0]:
# Rango de fechas
df_posts.select(
    spark_min("CreationDate").alias("min_fecha"),
    spark_max("CreationDate").alias("max_fecha")
).show(truncate=False)

+-----------------------+-----------------------+
|min_fecha              |max_fecha              |
+-----------------------+-----------------------+
|2020-01-01 00:00:04.637|2020-02-29 23:59:55.427|
+-----------------------+-----------------------+



In [0]:
df_posts.select("Id", "PostTypeId", "CreationDate", "Score", "ViewCount", "Title", "Tags").show(5, truncate=False)


+--------+----------+-----------------------+-----+---------+-----+----+
|Id      |PostTypeId|CreationDate           |Score|ViewCount|Title|Tags|
+--------+----------+-----------------------+-----+---------+-----+----+
|59959331|2         |2020-01-29 01:16:03.837|0    |0        |[]   |[]  |
|59959334|2         |2020-01-29 01:16:10.05 |0    |0        |[]   |[]  |
|59959336|2         |2020-01-29 01:16:15.39 |3    |0        |[]   |[]  |
|59959338|2         |2020-01-29 01:16:31.583|3    |0        |[]   |[]  |
|59959345|2         |2020-01-29 01:17:50.753|2    |0        |[]   |[]  |
+--------+----------+-----------------------+-----+---------+-----+----+
only showing top 5 rows


In [0]:
# FUNCIONES DE TRANSFORMACIÓN

def limpiar_columnas(df):
    cols_drop = [
        "OwnerDisplayName", "LastEditorDisplayName", "ContentLicense",
        "CommunityOwnedDate", "ClosedDate", "Body", "LastEditorUserId",
        "LastEditDate", "LastActivityDate", "ParentId", "AnswerCount",
        "CommentCount", "ViewCount", "FavoriteCount", "Title"
    ]
    existing = [c for c in cols_drop if c in df.columns]
    return df.drop(*existing)

def decodificar_columnas(df):
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType

    def force_decode(data):
        if data is None:
            return None
        try:
            if isinstance(data, (bytes, bytearray)):
                return data.decode('utf-8', errors='ignore')
            return str(data)
        except:
            return ""
    
    decode_udf = udf(force_decode, StringType())
    if "Tags" in df.columns:
        df = df.withColumn("Tags", when(col("Tags").isNotNull(), decode_udf(col("Tags"))).otherwise(lit("")))
        df = df.withColumn("Tags", regexp_replace(regexp_replace(trim(col("Tags")), "[<>]", " "), "\\s+", " "))
    return df

def normalizar_datos(df):
    df = df.dropDuplicates(["Id"])
    df = df.withColumn("Id", col("Id").cast("integer"))
    df = df.withColumn("AcceptedAnswerId", col("AcceptedAnswerId").cast("integer"))
    df = df.withColumn("CreationDate", to_date(col("CreationDate")))
    df = df.withColumn("Score", col("Score").cast("integer"))
    df = df.withColumn("OwnerUserId", col("OwnerUserId").cast("integer"))
    df = df.withColumn("PostTypeId", col("PostTypeId").cast("string"))
    df = df.withColumn(
        "PostTypeId",
        when(col("PostTypeId") == "1", "Question")
        .when(col("PostTypeId") == "2", "Answer")
        .when(col("PostTypeId") == "3", "Wiki")
        .when(col("PostTypeId") == "4", "TagWikiExcerpt")
        .when(col("PostTypeId") == "5", "TagWiki")
        .when(col("PostTypeId") == "6", "ModeratorNomination")
        .otherwise(col("PostTypeId"))
    )
    # Fecha de cargue
    df = df.withColumn("f_cargue", current_date())
    return df

In [0]:
 # APLICAR TRANSFORMACIONES

df_posts_clean = limpiar_columnas(df_posts)
df_posts_decoded = decodificar_columnas(df_posts_clean)
df_posts_silver = normalizar_datos(df_posts_decoded)


In [0]:
print("=== POSTS SILVER ===")
df_posts_silver.printSchema()

print(f"Total registros transformados: {df_posts_silver.count()}")


=== POSTS SILVER ===
root
 |-- Id: integer (nullable = true)
 |-- PostTypeId: string (nullable = true)
 |-- AcceptedAnswerId: integer (nullable = true)
 |-- CreationDate: date (nullable = true)
 |-- Score: integer (nullable = true)
 |-- OwnerUserId: integer (nullable = true)
 |-- Tags: string (nullable = true)
 |-- f_cargue: date (nullable = false)

Total registros transformados: 685915


In [0]:
df_posts_silver.select("Id", "PostTypeId", "CreationDate", "Score", "AcceptedAnswerId", "Tags", "f_cargue").show(5)

+--------+----------+------------+-----+----------------+----+----------+
|      Id|PostTypeId|CreationDate|Score|AcceptedAnswerId|Tags|  f_cargue|
+--------+----------+------------+-----+----------------+----+----------+
|59549218|    Answer|  2020-01-01|    2|               0|    |2025-11-16|
|59549225|    Answer|  2020-01-01|    1|               0|    |2025-11-16|
|59549243|    Answer|  2020-01-01|    2|               0|    |2025-11-16|
|59549257|    Answer|  2020-01-01|   19|               0|    |2025-11-16|
|59549289|    Answer|  2020-01-01|    2|               0|    |2025-11-16|
+--------+----------+------------+-----+----------------+----+----------+
only showing top 5 rows


In [0]:
# ESCRITURA DELTA CON MERGE (UPSERT)

if DeltaTable.isDeltaTable(spark, ruta_posts_silver):
    delta_table = DeltaTable.forPath(spark, ruta_posts_silver)

    df_updates = df_posts_silver.alias("updates")
    df_target = delta_table.toDF().alias("target")

    condition = "target.Id = updates.Id"

    # Calcular cantidad de registros nuevos y actualizados
    df_target_ids = df_target.select("Id")
    df_updates_ids = df_updates.select("Id")

    nuevos = df_updates_ids.subtract(df_target_ids).count()
    actualizados = df_updates_ids.intersect(df_target_ids).count()

    print(f"Registros nuevos: {nuevos}")
    print(f"Registros a actualizar: {actualizados}")

    # MERGE: inserta nuevos y actualiza existentes
    (
        delta_table.alias("target")
        .merge(
            df_updates.alias("updates"),
            condition
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

    print("Merge completado en tabla Delta Silver.")

else:
    print("Creando nueva tabla Delta Silver...")
    (
        df_posts_silver.write.format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .save(ruta_posts_silver)
    )

    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS `{catalog_silver}`.posts.posts_silver
        USING DELTA
        LOCATION '{ruta_posts_silver}'
    """)

    print("Tabla Delta Silver creada y registrada en el catálogo.")


Creando nueva tabla Delta Silver...
Tabla Delta Silver creada y registrada en el catálogo.


In [0]:
# VALIDACIÓN FINAL

final_df = spark.read.format("delta").load(ruta_posts_silver)
final_count = final_df.count()
print(f"Total registros finales en Silver: {final_count}")

Total registros finales en Silver: 685915


In [0]:
print("Fechas de cargue registradas:")
final_df.select("f_cargue").distinct().show()

Fechas de cargue registradas:
+----------+
|  f_cargue|
+----------+
|2025-11-16|
+----------+



In [0]:
%sql
-- Mostrar algunas filas de silver
SELECT * FROM `silver-shir`.posts.posts_silver LIMIT 10;


Id,PostTypeId,AcceptedAnswerId,CreationDate,Score,OwnerUserId,Tags,f_cargue
59549217,Answer,0,2020-01-01,10,2684539,,2025-11-16
59549218,Answer,0,2020-01-01,2,12046409,,2025-11-16
59549220,Answer,0,2020-01-01,0,8134799,,2025-11-16
59549221,Answer,0,2020-01-01,3,974045,,2025-11-16
59549222,Question,59549731,2020-01-01,0,32834,|android|spacing|android-gridlayout|,2025-11-16
59549224,Answer,0,2020-01-01,1,3744182,,2025-11-16
59549225,Answer,0,2020-01-01,1,9473764,,2025-11-16
59549229,Answer,0,2020-01-01,1,7366631,,2025-11-16
59549231,Answer,0,2020-01-01,2,12634364,,2025-11-16
59549232,Answer,0,2020-01-01,0,4518341,,2025-11-16


In [0]:
%sql
-- Verificar datos en silver
SELECT COUNT(*) FROM `silver-shir`.posts.posts_silver 

COUNT(*)
685915


Lo mismo para votes

In [0]:
# RUTAS A LOS DATOS EN BRONZE Y SILVER

ruta_votes_bronze = f"abfss://{bucket_bronze}@{storage_account}.dfs.core.windows.net/votes/votes_2020.parquet/part-00000-tid-1263076528647883380-99fe7e10-c7aa-4e65-aba5-9a01bcfc3d3e-124-1.c000.snappy.parquet"
ruta_votes_silver = f"abfss://{bucket_silver}@{storage_account}.dfs.core.windows.net/votes/votes_2020.parquet"

# LECTURA DE PARQUETS DESDE BRONZE 

df_votes = spark.read.format("parquet").load(ruta_votes_bronze)

In [0]:
print("=== Votes BRONZE ===")
df_votes.printSchema()

=== Votes BRONZE ===
root
 |-- Id: long (nullable = true)
 |-- PostId: long (nullable = true)
 |-- VoteTypeId: long (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- UserId: long (nullable = true)
 |-- BountyAmount: long (nullable = true)



In [0]:
print(f"Total registros leídos desde Bronze: {df_votes.count()}")

Total registros leídos desde Bronze: 3223079


In [0]:
# Rango de fechas
df_votes.select(
    spark_min("CreationDate").alias("min_fecha"),
    spark_max("CreationDate").alias("max_fecha")
).show(truncate=False)

+-------------------+-------------------+
|min_fecha          |max_fecha          |
+-------------------+-------------------+
|2020-01-01 00:00:00|2020-02-29 00:00:00|
+-------------------+-------------------+



In [0]:
from pyspark.sql.functions import col, when, current_date, to_date

def normalizar_votes(df):

    df = df.dropDuplicates(["Id"])
    df = df.withColumn("Id",           col("Id").cast("integer"))
    df = df.withColumn("PostId",       col("PostId").cast("integer"))
    df = df.withColumn("VoteTypeId",   col("VoteTypeId").cast("integer"))
    df = df.withColumn("UserId",       col("UserId").cast("integer"))
    df = df.withColumn("BountyAmount", col("BountyAmount").cast("integer"))
    df = df.withColumn("CreationDate", to_date(col("CreationDate")))

    df = df.withColumn(
        "VoteTypeName",
        when(col("VoteTypeId") == -1, "InformModerator")
        .when(col("VoteTypeId") == 0,  "UndoMod")
        .when(col("VoteTypeId") == 1,  "AcceptedByOriginator")
        .when(col("VoteTypeId") == 2,  "UpMod")                    # upvote
        .when(col("VoteTypeId") == 3,  "DownMod")                  # downvote
        .when(col("VoteTypeId") == 4,  "Offensive")
        .when(col("VoteTypeId") == 5,  "Favorite")                 # bookmark (legacy)
        .when(col("VoteTypeId") == 6,  "Close")
        .when(col("VoteTypeId") == 7,  "Reopen")
        .when(col("VoteTypeId") == 8,  "BountyStart")
        .when(col("VoteTypeId") == 9,  "BountyClose")
        .when(col("VoteTypeId") == 10, "Deletion")
        .when(col("VoteTypeId") == 11, "Undeletion")
        .when(col("VoteTypeId") == 12, "Spam")
        .when(col("VoteTypeId") == 15, "ModeratorReview")
        .when(col("VoteTypeId") == 16, "ApproveEditSuggestion")
        .when(col("VoteTypeId") == 17, "Reaction1")                # Teams: celebrate
        .when(col("VoteTypeId") == 18, "Helpful")
        .when(col("VoteTypeId") == 19, "ThankYou")
        .when(col("VoteTypeId") == 20, "WellWritten")
        .when(col("VoteTypeId") == 21, "Follow")
        .when(col("VoteTypeId") == 22, "Reaction2")                # Teams: smile
        .when(col("VoteTypeId") == 23, "Reaction3")                # Teams: mind blown
        .when(col("VoteTypeId") == 24, "Reaction4")                # Teams: clap
        .when(col("VoteTypeId") == 25, "Reaction5")                # Teams: heart
        .when(col("VoteTypeId") == 26, "Reaction6")                # Teams: fire
        .when(col("VoteTypeId") == 27, "Reaction7")                # Teams: trophy
        .when(col("VoteTypeId") == 28, "Reaction8")                # Teams: wave
        .when(col("VoteTypeId") == 29, "Outdated")
        .when(col("VoteTypeId") == 30, "NotOutdated")
        .when(col("VoteTypeId") == 31, "PreVote")
        .when(col("VoteTypeId") == 32, "CollectiveDiscussionUpvote")
        .when(col("VoteTypeId") == 33, "CollectiveDiscussionDownvote")
        .when(col("VoteTypeId") == 35, "privateAiAnswerCorrect")
        .when(col("VoteTypeId") == 36, "privateAiAnswerIncorrect")
        .when(col("VoteTypeId") == 37, "privateAiAnswerPartiallyCorrect")
        .otherwise("Unknown")
    )

        # 5) Fecha de cargue
    df = df.withColumn("f_cargue", current_date())


    return df

In [0]:
df_votes_silver = normalizar_votes(df_votes)

In [0]:
df_votes_silver.show(5)

+---------+--------+----------+------------+------+------------+------------+----------+
|       Id|  PostId|VoteTypeId|CreationDate|UserId|BountyAmount|VoteTypeName|  f_cargue|
+---------+--------+----------+------------+------+------------+------------+----------+
|202051180|12353288|         2|  2020-01-25|     0|           0|       UpMod|2025-11-16|
|202078426|15302448|         2|  2020-01-25|     0|           0|       UpMod|2025-11-16|
|202068088|17914105|         2|  2020-01-25|     0|           0|       UpMod|2025-11-16|
|202069635|18919091|         2|  2020-01-25|     0|           0|       UpMod|2025-11-16|
|202082244|23331548|         2|  2020-01-25|     0|           0|       UpMod|2025-11-16|
+---------+--------+----------+------------+------+------------+------------+----------+
only showing top 5 rows


In [0]:
if DeltaTable.isDeltaTable(spark, ruta_votes_silver):
    delta_table = DeltaTable.forPath(spark, ruta_votes_silver)

    df_updates = df_votes_silver.alias("updates")
    df_target = delta_table.toDF().alias("target")

    condition = "target.Id = updates.Id"

    df_target_ids = df_target.select("Id")
    df_updates_ids = df_updates.select("Id")

    nuevos = df_updates_ids.subtract(df_target_ids).count()
    actualizados = df_updates_ids.intersect(df_target_ids).count()

    print(f"Registros nuevos: {nuevos}")
    print(f"Registros a actualizar: {actualizados}")

    (
        delta_table.alias("target")
        .merge(
            df_updates.alias("updates"),
            condition
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

    print("Merge completado en tabla Delta Silver.")

else:
    print("Creando nueva tabla Delta Silver...")
    
    (
        df_votes_silver.write.format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .save(ruta_votes_silver)
    )

    # REGISTRO CORRECTO
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS `{catalog_silver}`.votes.votes_silver
        USING DELTA
        LOCATION '{ruta_votes_silver}'
    """)


    print("Tabla Delta Silver creada y registrada en el catálogo.")

Creando nueva tabla Delta Silver...
Tabla Delta Silver creada y registrada en el catálogo.


In [0]:
# VALIDACIÓN FINAL

final_df_votes = spark.read.format("delta").load(ruta_votes_silver)
final_count = final_df_votes.count()
print(f"Total registros finales en Silver: {final_count}")

Total registros finales en Silver: 3223079


In [0]:
print("Fechas de cargue registradas:")
final_df_votes.select("f_cargue").distinct().show()

Fechas de cargue registradas:
+----------+
|  f_cargue|
+----------+
|2025-11-16|
+----------+



In [0]:
%sql
-- Mostrar algunas filas de silver
SELECT * FROM `silver-shir`.votes.votes_silver LIMIT 10;

Id,PostId,VoteTypeId,CreationDate,UserId,BountyAmount,VoteTypeName,f_cargue
202051180,12353288,2,2020-01-25,0,0,UpMod,2025-11-16
202078426,15302448,2,2020-01-25,0,0,UpMod,2025-11-16
202068088,17914105,2,2020-01-25,0,0,UpMod,2025-11-16
202069635,18919091,2,2020-01-25,0,0,UpMod,2025-11-16
202082244,23331548,2,2020-01-25,0,0,UpMod,2025-11-16
202057816,48957722,2,2020-01-25,0,0,UpMod,2025-11-16
202069157,49894620,2,2020-01-25,0,0,UpMod,2025-11-16
201291049,10524651,2,2020-01-13,0,0,UpMod,2025-11-16
201251037,10668109,2,2020-01-13,0,0,UpMod,2025-11-16
201297107,12198445,2,2020-01-13,0,0,UpMod,2025-11-16


In [0]:
%sql
SELECT * FROM `silver-shir`.votes.votes_silver LIMIT 10;



Id,PostId,VoteTypeId,CreationDate,UserId,BountyAmount,VoteTypeName,f_cargue
202051180,12353288,2,2020-01-25,0,0,UpMod,2025-11-16
202078426,15302448,2,2020-01-25,0,0,UpMod,2025-11-16
202068088,17914105,2,2020-01-25,0,0,UpMod,2025-11-16
202069635,18919091,2,2020-01-25,0,0,UpMod,2025-11-16
202082244,23331548,2,2020-01-25,0,0,UpMod,2025-11-16
202057816,48957722,2,2020-01-25,0,0,UpMod,2025-11-16
202069157,49894620,2,2020-01-25,0,0,UpMod,2025-11-16
201291049,10524651,2,2020-01-13,0,0,UpMod,2025-11-16
201251037,10668109,2,2020-01-13,0,0,UpMod,2025-11-16
201297107,12198445,2,2020-01-13,0,0,UpMod,2025-11-16
