In [29]:
import pandas as pd
import re
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, explode, from_json, col, current_date,current_timestamp, lit, to_date, month

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, StructType, StructField, ArrayType


from pyspark.sql import SparkSession
import pyspark

In [30]:
CATALOG_URI = "http://nessie:19120/api/v1"
WAREHOUSE = "s3a://silver/"              
STORAGE_URI = "http://minio:9000"
AWS_ACCESS_KEY = "admin"
AWS_SECRET_KEY = "password"

In [31]:
conf = (
    pyspark.SparkConf()
        .setAppName('silver_transform')

        # 📦 Dependencias necesarias
        .set("spark.jars.packages", ",".join([
            "org.postgresql:postgresql:42.7.3",
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0",
            "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1",
            "software.amazon.awssdk:bundle:2.24.8",
            "software.amazon.awssdk:url-connection-client:2.24.8",
            "org.apache.hadoop:hadoop-aws:3.3.4"
        ]))

        # 🧩 Extensiones Iceberg + Nessie
        .set("spark.sql.extensions", ",".join([
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
            "org.projectnessie.spark.extensions.NessieSparkSessionExtensions"
        ]))

        # 🗂️ Catálogo Nessie
        .set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
        .set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
        .set("spark.sql.catalog.nessie.uri", CATALOG_URI)
        .set("spark.sql.catalog.nessie.ref", "main")
        .set("spark.sql.catalog.nessie.authentication.type", "NONE")
        .set("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
        .set("spark.sql.catalog.nessie.warehouse", WAREHOUSE)

        # ☁️ Configuración S3A para MinIO
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("spark.hadoop.fs.s3a.endpoint", STORAGE_URI)
        .set("spark.hadoop.fs.s3a.path.style.access", "true")
        .set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY)
        .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY)
        .set("spark.hadoop.fs.s3a.aws.credentials.provider",
             "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
        .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")

        # ⚡ Optimizaciones de ejecución
        .set("spark.sql.execution.arrow.pyspark.enabled", "true")
        .set("spark.sql.parquet.filterPushdown", "true")
        .set("spark.sql.parquet.mergeSchema", "false")
        .set("spark.sql.shuffle.partitions", "64")  # 🔧 más particiones para distribuir carga
        .set("spark.sql.files.maxPartitionBytes", "64MB")  # ⚖️ reduce tamaño de tarea para evitar saturación

        .set("spark.driver.memory", "5g")                     # Driver usa hasta 5 GB (de los 6g disponibles)
        .set("spark.executor.memory", "6g")                   # Cada executor usa hasta 6 GB (de los 8g disponibles)
        .set("spark.executor.cores", "4")                     # Más núcleos por executor para paralelismo
        .set("spark.driver.maxResultSize", "2g")              # Aumenta el límite de resultados del driver
        .set("spark.network.timeout", "600s")                 # Timeout más generoso para cargas grandes
        .set("spark.executor.heartbeatInterval", "60s")       # Latido coherente con el timeout
        
        # ⚙️ Escritura
        .set("spark.sql.parquet.compression.codec", "snappy")

)

In [32]:
spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()
print("Spark Session Started")

Spark Session Started


In [33]:
try:
    from py4j.java_gateway import java_import

    # Import Hadoop classes
    java_import(spark._jvm, 'org.apache.hadoop.fs.FileSystem')
    java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
    java_import(spark._jvm, 'java.net.URI')

    # ✅ Create S3A-aware FileSystem
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark._jvm.java.net.URI("s3a://bronze/"),
        spark._jsc.hadoopConfiguration()
    )

    def list_parquet_files(base_path):
        results = []
        path = spark._jvm.org.apache.hadoop.fs.Path(base_path)

        if not fs.exists(path):
            print(f"⚠️ Path does not exist: {base_path}")
            return results

        def recurse(p):
            try:
                status_list = fs.listStatus(p)
                for f in status_list:
                    if f.isDirectory():
                        recurse(f.getPath())
                    elif f.getPath().getName().endswith(".parquet"):
                        results.append(f.getPath().toString())
            except Exception as e:
                print(f"⚠️ Error listing {p}: {e}")

        recurse(path)
        return results

    # ===============================
    # List posts and votes folders
    # ===============================
    for subdir in ["posts", "votes"]:
        base = f"s3a://bronze/{subdir}/"
        print(f"\n🔍 Revisando {base}")

        try:
            path = spark._jvm.org.apache.hadoop.fs.Path(base)
            status = fs.listStatus(path)

            if len(status) == 0:
                print("⚠️ The bucket exists but is empty.")
            else:
                parquet_files = list_parquet_files(base)
                if parquet_files:
                    print(f"📦 {len(parquet_files)} Parquet files found in{subdir}:")
                    for f in parquet_files:
                        print("   📄", f)
                else:
                    print("⚠️ Not .parquet files found", base)

        except Exception as e:
            print("❌ Error listing bucket:", e)

except Exception as e:
    print("❌ Error:", e)





🔍 Revisando s3a://bronze/posts/
📦 2 Parquet files found inposts:
   📄 s3a://bronze/posts/2022/2022.parquet
   📄 s3a://bronze/posts/2023/2023.parquet

🔍 Revisando s3a://bronze/votes/
📦 2 Parquet files found invotes:
   📄 s3a://bronze/votes/_2022/1760303320.9018185.377ef04ff9.parquet
   📄 s3a://bronze/votes/_2023/1760303382.8533797.761f32b28e.parquet


In [34]:
posts_paths = list_parquet_files("s3a://bronze/posts/")
votes_paths = list_parquet_files("s3a://bronze/votes/")

In [35]:
def filter_by_year(paths, year):
    pattern = re.compile(rf"[_/\.]{year}([_/\.]|$)")
    return [p for p in paths if pattern.search(p)]

votes_2022_paths = filter_by_year(votes_paths, 2022)
votes_2023_paths = filter_by_year(votes_paths, 2023)
posts_2022_paths = filter_by_year(posts_paths, 2022)
posts_2023_paths = filter_by_year(posts_paths, 2023)


In [36]:
votes_2022 = spark.read.parquet(*votes_2022_paths)
votes_2023 = spark.read.parquet(*votes_2023_paths)
posts_2022 = spark.read.parquet(*posts_2022_paths)
posts_2023 = spark.read.parquet(*posts_2023_paths)

In [37]:
def clean_votes_sql(df):
    """
    Cleans and enriches the VOTES dataset using Spark SQL.
    - Removes duplicates by 'id'
    - Fills missing values (bounty_amount, user_id, vote_type_id, post_id)
    - Normalizes timestamps and adds derived columns with distinct names
    - Filters out invalid records
    """
    try:
        print("🧹 Cleaning & enriching VOTES dataset with Spark SQL...")

        df.createOrReplaceTempView("raw_votes")

        # Compute median bounty_amount
        median_bounty = 0
        if df.filter(F.col("bounty_amount").isNotNull()).count() > 0:
            median_bounty = df.approxQuantile("bounty_amount", [0.5], 0.1)[0]

        cleaned_df = spark.sql(f"""
            WITH base AS (
                SELECT DISTINCT id, post_id, vote_type_id, creation_date, user_id, bounty_amount
                FROM raw_votes
                WHERE id IS NOT NULL AND creation_date IS NOT NULL
            )
            SELECT
                id,
                COALESCE(post_id, -1) AS post_id_clean,
                COALESCE(vote_type_id, -1) AS vote_type_id_clean,
                TO_TIMESTAMP(creation_date) AS creation_ts,
                DATE_FORMAT(TO_TIMESTAMP(creation_date), 'yyyy-MM-dd') AS creation_date_str,
                YEAR(TO_TIMESTAMP(creation_date)) AS creation_year,
                CURRENT_TIMESTAMP() AS load_date,
                COALESCE(user_id, -1) AS user_id_clean,
                COALESCE(bounty_amount, {median_bounty}) AS bounty_amount_clean
            FROM base
        """)

        print("✅ VOTES cleaning complete with Spark SQL.\n")
        return cleaned_df

    except Exception as e:
        print(f"❌ Error cleaning VOTES with Spark SQL: {e}")
        return df

In [38]:
votes_2022_clean = clean_votes_sql(votes_2022)

🧹 Cleaning & enriching VOTES dataset with Spark SQL...
✅ VOTES cleaning complete with Spark SQL.



In [39]:
votes_2023_clean = clean_votes_sql(votes_2023)

🧹 Cleaning & enriching VOTES dataset with Spark SQL...
✅ VOTES cleaning complete with Spark SQL.



In [40]:
def clean_posts_sql(df):
    """
    Cleans and enriches the POSTS dataset using Spark SQL for the Silver layer.
    - Removes duplicates by Id
    - Normalizes numeric types and fills nulls
    - Converts date fields to timestamp
    - Adds derived columns: creation_date_str, year, load_date
    """
    try:
        print("🧹 Cleaning & enriching POSTS dataset with Spark SQL...")

        # Register temp view
        df.createOrReplaceTempView("raw_posts")

        # Run SQL transformation
        cleaned_df = spark.sql("""
            WITH base AS (
                SELECT DISTINCT * FROM raw_posts
            ),
            casted AS (
                SELECT
                    Id,
                    COALESCE(CAST(Score AS BIGINT), 0) AS Score,
                    COALESCE(CAST(ViewCount AS BIGINT), 0) AS ViewCount,
                    COALESCE(CAST(AnswerCount AS BIGINT), 0) AS AnswerCount,
                    COALESCE(CAST(CommentCount AS BIGINT), 0) AS CommentCount,
                    COALESCE(CAST(FavoriteCount AS BIGINT), 0) AS FavoriteCount,
                    CASE WHEN Body IS NULL THEN '' ELSE CAST(Body AS STRING) END AS Body,
                    TO_TIMESTAMP(CreationDate) AS CreationDate,
                    TO_TIMESTAMP(LastEditDate) AS LastEditDate,
                    TO_TIMESTAMP(LastActivityDate) AS LastActivityDate,
                    TO_TIMESTAMP(CommunityOwnedDate) AS CommunityOwnedDate,
                    TO_TIMESTAMP(ClosedDate) AS ClosedDate
                FROM base
            ),
            enriched AS (
                SELECT *,
                       CreationDate AS creation_date,
                       DATE_FORMAT(CreationDate, 'yyyy-MM-dd') AS creation_date_str,
                       YEAR(CreationDate) AS year,
                       CURRENT_TIMESTAMP() AS load_date
                FROM casted
                WHERE Id IS NOT NULL AND CreationDate IS NOT NULL
            )
            SELECT * FROM enriched
        """)

        print("✅ POSTS cleaning complete with Spark SQL.\n")
        return cleaned_df

    except Exception as e:
        print(f"❌ Error cleaning POSTS with Spark SQL: {e}")
        return df

In [41]:
posts_2022_clean = clean_posts_sql(posts_2022)

🧹 Cleaning & enriching POSTS dataset with Spark SQL...
✅ POSTS cleaning complete with Spark SQL.



In [42]:
posts_2023_clean = clean_posts_sql(posts_2023)

🧹 Cleaning & enriching POSTS dataset with Spark SQL...
✅ POSTS cleaning complete with Spark SQL.



In [43]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.silver").show()

++
||
++
++



In [44]:
posts_2023_clean.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Score: long (nullable = false)
 |-- ViewCount: long (nullable = false)
 |-- AnswerCount: long (nullable = false)
 |-- CommentCount: long (nullable = false)
 |-- FavoriteCount: long (nullable = false)
 |-- Body: string (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- LastEditDate: timestamp (nullable = true)
 |-- LastActivityDate: timestamp (nullable = true)
 |-- CommunityOwnedDate: timestamp (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- creation_date_str: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- load_date: timestamp (nullable = false)



In [45]:
votes_2023_clean.printSchema()

root
 |-- id: long (nullable = true)
 |-- post_id_clean: long (nullable = false)
 |-- vote_type_id_clean: long (nullable = false)
 |-- creation_ts: timestamp (nullable = true)
 |-- creation_date_str: string (nullable = true)
 |-- creation_year: integer (nullable = true)
 |-- load_date: timestamp (nullable = false)
 |-- user_id_clean: long (nullable = false)
 |-- bounty_amount_clean: decimal(21,1) (nullable = false)



In [46]:
# Registrar vistas temporales
posts_2022_clean.createOrReplaceTempView("posts_2022")
posts_2023_clean.createOrReplaceTempView("posts_2023")

# Filtrar por los primeros 4 meses usando el nombre correcto de la columna
filtered_posts_2022 = spark.sql("""
    SELECT * FROM posts_2022
    WHERE MONTH(CreationDate) BETWEEN 1 AND 4
""")

filtered_posts_2023 = spark.sql("""
    SELECT * FROM posts_2023
    WHERE MONTH(CreationDate) BETWEEN 1 AND 4
""")

In [47]:
# Verificar distribución mensual en los DataFrames filtrados
filtered_posts_2022.groupBy(month("CreationDate").alias("month")).count().orderBy("month").show()
filtered_posts_2023.groupBy(month("CreationDate").alias("month")).count().orderBy("month").show()

+-----+------+
|month| count|
+-----+------+
|    1|275404|
|    2|260478|
|    3|279094|
|    4|260778|
+-----+------+

+-----+------+
|month| count|
+-----+------+
|    1|225487|
|    2|198825|
|    3|203410|
|    4|173205|
+-----+------+



In [48]:
def merge_posts_sql(filtered_posts_2022, filtered_posts_2023, catalog="nessie.silver", table_name="posts"):
    """
    Merges filtered POSTS datasets into a Nessie/Iceberg table using Spark SQL with deduplication and transactional upsert.

    - Registers filtered views for each dataset (e.g., posts_2022 and posts_2023)
    - Combines both datasets using UNION ALL
    - Deduplicates by 'Id' keeping the most recent record based on 'LastActivityDate'
    - If the target Iceberg table does not exist, creates it automatically
    - Performs a MERGE INTO operation for transactional upsert

    Args:
        filtered_posts_2022 (pyspark.sql.DataFrame): Cleaned and filtered posts DataFrame for year 2022.
        filtered_posts_2023 (pyspark.sql.DataFrame): Cleaned and filtered posts DataFrame for year 2023.
        catalog (str, optional): Target Nessie catalog and layer (e.g., "nessie.silver"). Defaults to "nessie.silver".
        table_name (str, optional): Target table name for merge. Defaults to "posts".

    Returns:
        None

    Notes:
        - Ensures a single record per 'Id' (latest by 'LastActivityDate').
        - Supports incremental Silver-layer processing in a medallion data architecture.
    """
    try:
        # Registrar vistas temporales
        filtered_posts_2022.createOrReplaceTempView("posts_2022")
        filtered_posts_2023.createOrReplaceTempView("posts_2023")

        # Crear vista combinada deduplicada
        spark.sql("""
            CREATE OR REPLACE TEMP VIEW posts_updates AS
            WITH combined AS (
                SELECT * FROM posts_2022
                UNION ALL
                SELECT * FROM posts_2023
            ),
            ranked AS (
                SELECT *,
                       ROW_NUMBER() OVER (
                           PARTITION BY Id
                           ORDER BY LastActivityDate DESC
                       ) AS rank
                FROM combined
            )
            SELECT * FROM ranked WHERE rank = 1
        """)

        # Verificar si la tabla ya existe
        table_path = f"{catalog}.{table_name}"
        table_exists = spark.catalog.tableExists(table_path)

        if not table_exists:
            print(f"⚙️ Table {table_path} not found. Creating it now...")
            spark.table("posts_updates").writeTo(table_path).create()
            print(f"✅ Table {table_path} created successfully.")
        else:
            # Ejecutar MERGE INTO para actualizar/inserciones
            spark.sql(f"""
                MERGE INTO {table_path} AS target
                USING posts_updates AS source
                ON target.Id = source.Id
                WHEN MATCHED THEN UPDATE SET *
                WHEN NOT MATCHED THEN INSERT *
            """)
            print(f"✅ Posts merged successfully into {table_path} using Iceberg MERGE")

    except Exception as e:
        print(f"❌ Error merging posts with MERGE INTO: {e}")


In [49]:
merge_posts_sql(filtered_posts_2022, filtered_posts_2023, catalog="nessie.silver", table_name="posts")

✅ Posts merged successfully into nessie.silver.posts using Iceberg MERGE


In [50]:
# Registrar vistas temporales
votes_2022_clean.createOrReplaceTempView("votes_2022")
votes_2023_clean.createOrReplaceTempView("votes_2023")

# Ejecutar SQL para filtrar los primeros 4 meses
filtered_votes_2022 = spark.sql("""
    SELECT * FROM votes_2022
    WHERE MONTH(creation_ts) BETWEEN 1 AND 4
""")

filtered_votes_2023 = spark.sql("""
    SELECT * FROM votes_2023
    WHERE MONTH(creation_ts) BETWEEN 1 AND 4
""")

In [51]:
# Verificar distribución mensual en los DataFrames filtrados
filtered_votes_2022.groupBy(month("creation_ts").alias("month")).count().orderBy("month").show()
filtered_votes_2023.groupBy(month("creation_ts").alias("month")).count().orderBy("month").show()

+-----+-------+
|month|  count|
+-----+-------+
|    1|1445524|
|    2|1324142|
|    3|1387773|
|    4|1305101|
+-----+-------+

+-----+-------+
|month|  count|
+-----+-------+
|    1|1256117|
|    2|1129914|
|    3|1173429|
|    4| 942774|
+-----+-------+



In [52]:
def merge_votes_sql(filtered_votes_2022, filtered_votes_2023, catalog="nessie.silver", table_name="votes"):
    """
    Merges filtered VOTES datasets incrementally into an Iceberg/Nessie table using Spark SQL MERGE.

    - Registers filtered views (votes_2022, votes_2023)
    - Combines both datasets with UNION ALL
    - Deduplicates by 'id', keeping the most recent record by 'creation_ts'
    - Creates the table if it does not exist
    - Performs an upsert (update/insert) merge into the target table

    Args:
        filtered_votes_2022 (pyspark.sql.DataFrame): Cleaned and filtered votes DataFrame for year 2022.
        filtered_votes_2023 (pyspark.sql.DataFrame): Cleaned and filtered votes DataFrame for year 2023.
        catalog (str, optional): Target Nessie catalog and layer (e.g., "nessie.silver"). Defaults to "nessie.silver".
        table_name (str, optional): Target table name. Defaults to "votes".

    Returns:
        None

    Notes:
        - Ensures only one record per 'id' remains (latest by 'creation_ts').
        - Automatically creates the table if it does not exist.
        - Uses Iceberg MERGE for efficient incremental updates.
    """
    try:
        # Registrar vistas filtradas
        filtered_votes_2022.createOrReplaceTempView("votes_2022")
        filtered_votes_2023.createOrReplaceTempView("votes_2023")

        # Crear vista unificada con deduplicación
        merged_votes = spark.sql("""
            WITH combined AS (
                SELECT * FROM votes_2022
                UNION ALL
                SELECT * FROM votes_2023
            ),
            ranked AS (
                SELECT *,
                       ROW_NUMBER() OVER (
                           PARTITION BY id
                           ORDER BY creation_ts DESC
                       ) AS rank
                FROM combined
            )
            SELECT * FROM ranked WHERE rank = 1
        """)
        merged_votes.createOrReplaceTempView("votes_updates")

        # Validar existencia de tabla destino
        table_path = f"{catalog}.{table_name}"
        table_exists = spark.catalog.tableExists(table_path)

        if not table_exists:
            print(f"⚙️ Table {table_path} not found. Creating it now...")
            merged_votes.writeTo(table_path).create()
            print(f"✅ Table {table_path} created successfully.")
        else:
            # Ejecutar MERGE INTO para actualizar/inserciones
            spark.sql(f"""
                MERGE INTO {table_path} AS target
                USING votes_updates AS source
                ON target.id = source.id
                WHEN MATCHED THEN UPDATE SET *
                WHEN NOT MATCHED THEN INSERT *
            """)
            print(f"✅ Votes merged successfully into {table_path} using Iceberg MERGE")

    except Exception as e:
        print(f"❌ Error merging votes with MERGE INTO: {e}")


In [53]:
merge_votes_sql(filtered_votes_2022, filtered_votes_2023, catalog="nessie.silver", table_name="votes")

✅ Votes merged successfully into nessie.silver.votes using Iceberg MERGE


In [54]:
spark.stop()