In [0]:
# %sql
# -- Tabela de origem: Lê os arquivos JSON do bucket S3
# CREATE OR REFRESH STREAMING LIVE TABLE bronze_customers
# COMMENT "Dados brutos dos novos clientes do bucket S3"
# AS SELECT *
# FROM cloud_files(
#     "s3a://databricks-bootcamp-julio",
#     "json",
#     MAP(
#         "inferSchema", "false",
#         "multiline", "true",
#         "cloudFiles.schemaHints", "customer_id STRING, name STRING, email STRING, btc_balance DOUBLE, usd_balance DOUBLE, last_update TIMESTAMP",
#         "cloudFiles.schemaLocation", "/FileStore/autoloader_schemas/bronze_customers"
#     )
# );

In [0]:
# Importa os tipos de dados necessários do PySpark SQL
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define o caminho do seu bucket S3
# Certifique-se de que este bucket está acessível pelo seu cluster Databricks.
# Isso geralmente é feito através de roles IAM (AWS) ou credenciais de acesso (Azure/GCP)
# configuradas no cluster.
s3_bucket_path = "s3a://databricks-bootcamp-julio/"

# 1. Definição explícita do esquema dos seus dados JSON
# É crucial definir o esquema explicitamente para garantir que os dados sejam lidos corretamente
# e para evitar a inferência de esquema (que pode ser inconsistente em datasets grandes ou com dados sujos).
# Baseado na sua definição anterior:
# customer_id STRING, name STRING, email STRING, btc_balance DOUBLE, usd_balance DOUBLE, last_update TIMESTAMP

customer_schema = StructType([
    StructField("customer_id", StringType(), True),   # customer_id como String
    StructField("name", StringType(), True),          # name como String
    StructField("email", StringType(), True),         # email como String
    StructField("btc_balance", DoubleType(), True),   # btc_balance como Double
    StructField("usd_balance", DoubleType(), True),   # usd_balance como Double
    StructField("last_update", TimestampType(), True) # last_update como Timestamp
])

# 2. Leitura dos arquivos JSON do S3 em modo batch
# Usamos spark.read para operações em lote.
# .option("multiline", "true") é importante se cada arquivo JSON contém um único objeto JSON formatado em várias linhas.
# .schema(customer_schema) aplica o esquema que definimos.
print(f"Lendo arquivos JSON do caminho S3: {s3_bucket_path}")
try:
    df_customers = spark.read \
        .format("json") \
        .option("multiline", "true") \
        .schema(customer_schema) \
        .load(s3_bucket_path)

    print("Pré-visualização dos dados lidos:")
    df_customers.show(5, truncate=False) # Mostra as 5 primeiras linhas sem truncar o conteúdo
    df_customers.printSchema() # Mostra o esquema inferido (que será o que definimos)

    # 3. Salvando os dados em uma tabela Delta Lake chamada 'bronze_customers'
    # Usamos o modo 'overwrite' para recriar a tabela a cada execução, ou 'append' para adicionar novos dados.
    # Para a primeira carga, 'overwrite' é comum. Se for reexecutar e quiser manter os dados, mude para 'append'.
    table_name = "bronze_customers"
    print(f"Salvando dados na tabela Delta: {table_name}")

    # Certifique-se de ter um catálogo e schema padrão selecionados ou especifique-os:
    # Por exemplo: spark.sql("USE CATALOG default;") ou spark.sql("USE default.my_schema;")
    # Em DLT, ele geralmente usa o catálogo/schema do pipeline. Para notebooks, pode ser 'default.default'.

    # Para garantir que você está escrevendo para o Hive Metastore ou Unity Catalog (se habilitado)
    # use saveAsTable.
    df_customers.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(table_name)

    print(f"Dados salvos com sucesso na tabela Delta '{table_name}'.")

    # 4. Verificando a tabela Delta criada
    print(f"Verificando os dados na tabela '{table_name}':")
    display(spark.sql(f"SELECT * FROM {table_name} LIMIT 10"))

except Exception as e:
    print(f"Ocorreu um erro ao processar os dados: {e}")