## Importações e Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col
from pyspark.sql.functions import lower, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import os

# carregando os jars necessários para se conectar ao MinIO (nosso S3)
spark = SparkSession.builder \
    .appName("silver_oo1_breweries") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("MINIO_ROOT_USER")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("MINIO_ROOT_PASSWORD")) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.ui.showConsoleProgress", "true") \
    .getOrCreate()

## Lendo da camada bronze e jogando na silver em parquet

In [2]:
# define schema esperado
schema = StructType([
    StructField("address_1", StringType(), True),
    StructField("address_2", StringType(), True),
    StructField("address_3", StringType(), True),
    StructField("brewery_type", StringType(), True),
    StructField("city", StringType(), True),
    StructField("country", StringType(), True),
    StructField("id", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("name", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("state", StringType(), True),
    StructField("state_province", StringType(), True),
    StructField("street", StringType(), True),
    StructField("website_url", StringType(), True),
])

try:
    
    df = spark.read.schema(schema).json("s3a://datalake/1_bronze/001_breweries/breweries.json")

    # normalização dos nomes das colunas
    df_cleaned = df.select([
        col(c).alias(c.lower().replace(" ", "_")) for c in df.columns
    ])

    # deduplicação (baseado no ID da cervejaria)
    df_dedup = df_cleaned.dropDuplicates(["id"])

    # adiciona timestamp de ingestão
    df_with_ts = df_dedup.withColumn("ingestion_timestamp", current_timestamp())

    # escreve particionado por estado
    df_with_ts.write \
        .mode("overwrite") \
        .partitionBy("country", "state") \
        .parquet("s3a://datalake/2_silver/001_breweries/breweries/")
    
    print("✅ Pipeline executado com sucesso!")

except Exception as e:
    print(f"❌ Falha na execução do pipeline: {str(e)}", file=sys.stderr)
    raise

✅ Pipeline executado com sucesso!
