In [0]:
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType

In [0]:
# Comentário: Define o caminho que o erro apontou como já existente
path_raw = "s3://aws-data-lakehouse/raw/kafka-crypto"  
path_bronze = "s3://aws-data-lakehouse/bronze/kafka-crypto"  


In [0]:
# Comentário: Verifica se a pasta '_spark_metadata' e os primeiros arquivos JSON surgiram
display(dbutils.fs.ls("s3://aws-data-lakehouse/raw/kafka-crypto/"))

In [0]:
# ==========================================
# 1. DEFINIÇÃO DO SCHEMA MANUAL (EVITA INFERÊNCIA)
# ==========================================
# Forçamos tipos String para timestamps para garantir a leitura sem erros de conversão inicial
schema_final = StructType([
    StructField("topic", StringType(), True),
    StructField("partition", LongType(), True),
    StructField("offset", LongType(), True),
    StructField("kafka_timestamp", StringType(), True),
    StructField("ingested_at", StringType(), True),
    StructField("id", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("name", StringType(), True),
    StructField("current_price", DoubleType(), True),
    StructField("last_updated", StringType(), True)
])

# ==========================================
# 2. LEITURA ROBUSTA NO UNITY CATALOG
# ==========================================
# Usamos o caminho com wildcard /*.json para garantir que não estamos tentando ler a pasta _spark_metadata
path_raw_glob = "s3://aws-data-lakehouse/raw/kafka-crypto/*.json"

df_raw = spark.read \
    .schema(schema_final) \
    .json(path_raw_glob)

# Comentário: Filtramos registros onde o 'id' é nulo (arquivos vazios ou corrompidos)
df_clean = df_raw.filter(col("id").isNotNull())

# Comentário: Adição de metadados de auditoria
df_bronze = df_clean.select(
    "*",
    col("_metadata.file_path").alias("origin_file")
).withColumn("processed_to_bronze", current_timestamp())

# ==========================================
# 3. ESCRITA DELTA
# ==========================================
df_bronze.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(path_bronze)

# ==========================================
# 4. VALIDAÇÃO REAL
# ==========================================
total_rows = spark.read.format("delta").load(path_bronze).count()
print(f"✅ Sucesso! {total_rows} linhas processadas na Bronze.")

In [0]:
# Cria uma visão temporária na memória para permitir consultas SQL
df_bronze.createOrReplaceTempView("v_bronze")

# Executa a query SQL sobre a visão criada
df_ultimos = spark.sql("""
    SELECT * FROM v_bronze 
    ORDER BY ingested_at desc 
    LIMIT 5
""")

display(df_ultimos)