In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
NESSIE_URI = os.getenv("NESSIE_URI")


def create_spark_session(appname):
    master = "spark://spark-master:7077"

    conf = (
        pyspark.SparkConf()
        .setAppName(appname)
        .set("spark.master", master)

        # EXTENSÕES ICEBERG + NESSIE
        .set(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
        )

        # REGISTRO DO CATÁLOGO NESSIE
        .set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
        .set("spark.sql.catalog.nessie.type", "nessie")
        .set("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .set("spark.sql.catalog.nessie.uri", NESSIE_URI)
        .set("spark.sql.catalog.nessie.ref", "main")
        .set("spark.sql.catalog.nessie.authentication.type", "NONE")
        .set("spark.sql.catalog.nessie.cache-enabled", "false")
        .set("spark.sql.catalog.nessie.warehouse", "s3a://lakehouse/")

        # CONFIG S3 -> ICEBERG
        .set("spark.sql.catalog.nessie.s3.path-style-access", "true")
        .set("spark.sql.catalog.nessie.s3.endpoint", S3_ENDPOINT)

        # CONFIG HADOOP S3A
        .set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY)
        .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY)
        .set("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT)
        .set("spark.hadoop.fs.s3a.path.style.access", "true")
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set(
                "spark.hadoop.fs.s3a.aws.credentials.provider",
                "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
            )
        .set("spark.hadoop.fs.defaultFS", "s3a://lakehouse")

        # RECURSOS
        .set("spark.executor.memory", "1g")
        .set("spark.executor.cores", "1")
        .set("spark.driver.memory", "1g")
        .set("spark.executor.instances", "1")
    )

    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    return spark



In [17]:
spark.stop()

In [19]:
spark = create_spark_session("bronze")

In [3]:
spark

In [14]:
from pyspark.sql import functions as F
from datetime import datetime
from uuid import uuid4
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


class Bronze:
    """
    Classe responsável pela camada Bronze do Lakehouse Tibia.

    A classe realiza:
    - Configuração do catálogo Nessie + Iceberg.
    - Criação dos namespaces e tabelas caso não existam.
    - Leitura dos arquivos CSV da camada Landing.
    - Padronização dos dados.
    - Registro de metadados operacionais.
    - Escrita incremental via Iceberg (append).

    Parameters
    ----------
    spark : SparkSession
        Sessão Spark ativa criada no job.
    date_str : str | None
        Data fornecida via CLI no formato 'YYYY-MM-DD'.
        Caso None, utiliza a data atual para construir o path da camada Landing.
    """

    def __init__(self, spark, date_str):
        self.spark = spark
        self.date_str = date_str

    # ============================================================
    #   MÉTODO: VOCATION
    # ============================================================
    def vocation(self):
        """
        Executa o job Bronze para dados de vocação (vocation).

        1. Configura catálogo Iceberg + warehouse.
        2. Cria namespace + tabela caso não existam.
        3. Lê arquivos CSV da camada landing.
        4. Valida colunas obrigatórias.
        5. Normaliza nomes e tipos de colunas.
        6. Gera batch_id e colunas de auditoria.
        7. Deduplica por (name, world).
        8. Insere incrementalmente na tabela Bronze.

        Tabela criada: nessie.bronze.vocation
        """

        # Namespace
        self.spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.bronze")

        # Criação da tabela Iceberg
        self.spark.sql("""
        CREATE TABLE IF NOT EXISTS nessie.bronze.vocation (
            name STRING,
            vocation STRING,
            level INT,
            world STRING,
            experience LONG,
            world_type STRING,
            ingestion_time TIMESTAMP,
            ingestion_date DATE,
            source_system STRING,
            batch_id STRING
        )
        USING iceberg
        PARTITIONED BY (world, ingestion_date)
        TBLPROPERTIES (
            'format-version' = '2',
            'write.format.default' = 'parquet',
            'write.metadata.compression' = 'gzip',
            'write.delete.mode' = 'merge-on-read'
        )
        """)

        # Define path da landing com base na data
        today_date = datetime.strptime(self.date_str, "%Y-%m-%d") if self.date_str else datetime.today()
        partition = f"year={today_date.year}/month={today_date.month}/day={today_date.day}"

        path = f"s3a://lakehouse/landing/{partition}/experience/"
        logging.info(f"Lendo dados de: {path}")

        # Lê arquivo CSV
        df_raw = self.spark.read.csv(path, header=True)

        # Validação de colunas esperadas
        colunas_esperadas = {"Rank", "Name", "Vocation", "World", "Level", "Points", "WorldType"}
        colunas_faltando = colunas_esperadas - set(df_raw.columns)

        if colunas_faltando:
            logging.error(f"Colunas ausentes no CSV: {colunas_faltando}")
            return

        # Gera batch_id para auditoria
        batch_id = str(uuid4())
        logging.info(f"Gerando batch_id: {batch_id}")

        df_raw.printSchema()

        # Normalização e padronização
        df_bronze = (
            df_raw.drop("Rank")
            .withColumnRenamed("Name", "name")
            .withColumnRenamed("Vocation", "vocation")
            .withColumnRenamed("Level", "level")
            .withColumnRenamed("World", "world")
            .withColumnRenamed("Points", "experience")
            .withColumnRenamed("WorldType", "world_type")
            .withColumn("ingestion_time", F.current_timestamp())
            .withColumn("ingestion_date", F.current_date())
            .withColumn("source_system", F.lit("highscore_tibia_page"))
            .withColumn("batch_id", F.lit(batch_id))
            .withColumn("experience", F.regexp_replace("experience", ",", "").cast("long"))
            .withColumn("level", F.col("level").cast("int"))
            .withColumn("vocation", F.trim(F.lower("vocation")))
            .withColumn("world", F.trim(F.lower("world")))
            .dropDuplicates(["name", "world"])
        )

        # Compressão padrão Parquet
        self.spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

        record_count = df_bronze.count()

        if record_count > 0:
            logging.info(f"Inserindo {record_count} registros na Bronze com batch_id {batch_id}...")
            df_bronze.writeTo("nessie.bronze.vocation").append()
        else:
            logging.warning("Nenhum registro encontrado para gravar na Bronze.")

In [15]:
bronze = Bronze(spark, "2025-12-25")

In [16]:
bronze.vocation()

2025-12-25 23:57:23,117 - INFO - Lendo dados de: s3a://lakehouse/landing/year=2025/month=12/day=25/experience/
2025-12-25 23:57:23,797 - INFO - Gerando batch_id: e852ac20-c5d1-4d23-9c31-ae84045a88af


root
 |-- Rank: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Vocation: string (nullable = true)
 |-- World: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Points: string (nullable = true)
 |-- WorldType: string (nullable = true)



2025-12-25 23:57:25,394 - INFO - Inserindo 500 registros na Bronze com batch_id e852ac20-c5d1-4d23-9c31-ae84045a88af...
                                                                                

In [17]:
spark.stop()

In [20]:
df = spark.read.table("nessie.bronze.vocation")
df.count().show(5

[Stage 0:>                                                          (0 + 1) / 1]

+-------------+-----------+-----+-------+-----------+--------------+--------------------+--------------+--------------------+--------------------+
|         name|   vocation|level|  world| experience|    world_type|      ingestion_time|ingestion_date|       source_system|            batch_id|
+-------------+-----------+-----+-------+-----------+--------------+--------------------+--------------+--------------------+--------------------+
|   Appov Boss|elder druid| 1017|eclipta|17452496195|Retro Open PvP|2025-12-25 23:47:...|    2025-12-25|highscore_tibia_page|f782c035-7bfc-4a0...|
|        Aztev|elder druid| 1025|eclipta|17893581139|Retro Open PvP|2025-12-25 23:47:...|    2025-12-25|highscore_tibia_page|f782c035-7bfc-4a0...|
|   Bre no zin|elder druid|  874|eclipta|11084969190|Retro Open PvP|2025-12-25 23:47:...|    2025-12-25|highscore_tibia_page|f782c035-7bfc-4a0...|
|Brunon Brunon|elder druid| 1168|eclipta|26432758562|Retro Open PvP|2025-12-25 23:47:...|    2025-12-25|highscore_tibi

                                                                                

In [21]:
df.count()

                                                                                

1000

In [22]:
spark.stop()