# Notebook de ingestão Glue — Bronze e Silver

Este notebook combina a lógica dos scripts `processamento_dados_bronze.py` e `processamento_dados_silver.py` para ser executado em um Glue Notebook (Glue Studio).

O notebook faz:
- Processamento Bronze: lê arquivos Parquet de um prefix S3, aplica casts explícitos conforme schema (yellow/green), adiciona `ano`, `mes` e `ingestion_ts` e grava em `s3://ifood-case-bronze/...` particionado por `ano`/`mes`.
- Processamento Silver: lê as tabelas Bronze do catálogo (`dados_taxi_bronze.taxi_yellow` e `dados_taxi_bronze.taxi_green`), seleciona/renomeia colunas, adiciona `tipo_taxi` e grava em `s3://ifood-case-silver/corridas_taxi/` particionado por `ano`/`mes`.

Instruções rápidas:
- Configure o Glue Notebook para usar um role com permissões S3/Glue/Catalog/Athena.
- Ajuste os caminhos `INPUT_*` / `OUTPUT_*` nas células abaixo se necessário.

In [None]:
# Imports e inicialização Glue/Spark
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
import boto3
import traceback

# Em notebooks Glue a célula de inicialização pode já prover glueContext; garantimos criação segura
try:
    sc = SparkContext.getOrCreate()
except Exception:
    sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = None
try:
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    job = Job(glue_context)
    job.init(args['JOB_NAME'], args)
except Exception:
    # Em alguns ambientes de notebook getResolvedOptions falha; seguimos sem Job para testes locais
    pass

s3 = boto3.client('s3')

In [None]:
# Schemas (copiados dos scripts)
yellow_schema = T.StructType([
    T.StructField('VendorID', T.LongType(), True),
    T.StructField('tpep_pickup_datetime', T.TimestampType(), True),
    T.StructField('tpep_dropoff_datetime', T.TimestampType(), True),
    T.StructField('passenger_count', T.LongType(), True),
    T.StructField('trip_distance', T.DoubleType(), True),
    T.StructField('RatecodeID', T.LongType(), True),
    T.StructField('store_and_fwd_flag', T.StringType(), True),
    T.StructField('PULocationID', T.LongType(), True),
    T.StructField('DOLocationID', T.LongType(), True),
    T.StructField('payment_type', T.LongType(), True),
    T.StructField('fare_amount', T.DoubleType(), True),
    T.StructField('extra', T.DoubleType(), True),
    T.StructField('mta_tax', T.DoubleType(), True),
    T.StructField('tip_amount', T.DoubleType(), True),
    T.StructField('tolls_amount', T.DoubleType(), True),
    T.StructField('improvement_surcharge', T.DoubleType(), True),
    T.StructField('total_amount', T.DoubleType(), True),
    T.StructField('congestion_surcharge', T.DoubleType(), True)
])

green_schema = T.StructType([
    T.StructField('VendorID', T.LongType(), True),
    T.StructField('lpep_pickup_datetime', T.TimestampType(), True),
    T.StructField('lpep_dropoff_datetime', T.TimestampType(), True),
    T.StructField('store_and_fwd_flag', T.StringType(), True),
    T.StructField('RatecodeID', T.LongType(), True),
    T.StructField('PULocationID', T.LongType(), True),
    T.StructField('DOLocationID', T.LongType(), True),
    T.StructField('passenger_count', T.LongType(), True),
    T.StructField('trip_distance', T.DoubleType(), True),
    T.StructField('fare_amount', T.DoubleType(), True),
    T.StructField('extra', T.DoubleType(), True),
    T.StructField('mta_tax', T.DoubleType(), True),
    T.StructField('tip_amount', T.DoubleType(), True),
    T.StructField('tolls_amount', T.DoubleType(), True),
    T.StructField('improvement_surcharge', T.DoubleType(), True),
    T.StructField('total_amount', T.DoubleType(), True),
    T.StructField('payment_type', T.LongType(), True),
    T.StructField('congestion_surcharge', T.DoubleType(), True)
])

In [None]:
# Helpers: casting, listing S3, e process_dataset (adaptado do seu script)
def cast_columns(df, table_name):
    if 'taxi_yellow' in table_name:
        datetime_cols = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']
        long_cols = ['VendorID', 'passenger_count', 'PULocationID', 'DOLocationID']
        double_cols = ['trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount','RatecodeID', 'payment_type', 'trip_type',
                       'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']
        string_cols = ['store_and_fwd_flag']
    else:  # taxi_green
        datetime_cols = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']
        long_cols = ['VendorID',  'PULocationID', 'DOLocationID', 'passenger_count']
        double_cols = ['trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount','RatecodeID', 'payment_type', 'trip_type',
                       'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge']
        string_cols = ['store_and_fwd_flag', 'ehail_fee']

    for col_name in datetime_cols:
        df = df.withColumn(col_name, F.to_timestamp(F.col(col_name)))
    for col_name in long_cols:
        df = df.withColumn(col_name, F.col(col_name).cast(T.LongType()))
    for col_name in double_cols:
        df = df.withColumn(col_name, F.col(col_name).cast(T.DoubleType()))
    for col_name in string_cols:
        df = df.withColumn(col_name, F.col(col_name).cast(T.StringType()))
    return df

def cast_df_to_structtype(df, struct):
    # Aplica casts conforme StructType e garante colunas presentes na ordem do schema
    for field in struct.fields:
        if field.name in df.columns:
            df = df.withColumn(field.name, F.col(field.name).cast(field.dataType))
        else:
            df = df.withColumn(field.name, F.lit(None).cast(field.dataType))
    schema_cols = [f.name for f in struct.fields]
    other_cols = [c for c in df.columns if c not in schema_cols]
    return df.select(*(schema_cols + other_cols))

def list_s3_files(bucket_path):
    client = boto3.client('s3')
    bucket = bucket_path.replace('s3://', '').split('/')[0]
    prefix = '/'.join(bucket_path.replace('s3://', '').split('/')[1:])
    paginator = client.get_paginator('list_objects_v2')
    files = []
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get('Contents', []) or []:
            if obj['Key'].endswith('.parquet'):
                files.append(f's3://{bucket}/{obj['Key']}')
    return files

def process_dataset(input_path: str, output_path: str, table_name: str, schema):
    print(f'Iniciando processamento do dataset: {table_name}')
    try:
        files = list_s3_files(input_path)
        if not files:
            print(f'Nenhum arquivo encontrado em {input_path}')
            return

        for file in files:
            print(f'Processando arquivo: {file}')
            df = spark.read.parquet(file)
            df = cast_columns(df, table_name)
            # extrai partições do nome do arquivo (formato YYYY-MM no nome do arquivo)
            df = df.withColumn('source_file', F.lit(file))
            df = df.withColumn('ano', F.regexp_extract(F.col('source_file'), r'({4})-({2})', 1).cast(T.IntegerType())) \
                   .withColumn('mes', F.regexp_extract(F.col('source_file'), r'({4})-({2})', 2).cast(T.IntegerType())) \
                   .withColumn('ingestion_ts', F.current_timestamp()) \
                   .drop('source_file')

            # Cast explícito para o schema fornecido (evita incompatibilidades no catálogo)
            try:
                df = cast_df_to_structtype(df, schema)
            except Exception as e:
                print(f'Falha ao aplicar cast para o schema: {e}; seguindo sem cast explícito')

            # Grava em append particionado por ano/mes (escolha append para evitar sobrescrever todo o prefix)
            df.write.mode('append') \
                  .format('parquet') \
                  .partitionBy('ano', 'mes') \
                  .option('compression', 'snappy') \
                  .save(output_path)

        # Atualiza partições no Glue Catalog
        spark.sql(f'MSCK REPAIR TABLE {table_name}')
        print(f'Processamento finalizado com sucesso para {table_name}')

    except Exception as e:
        print('--------------------------------------------------')
        print(f'Erro ao processar {table_name}')
        print(f'Input: {input_path}')
        print(f'Destino: {output_path}')
        print('Mensagem:', str(e))
        traceback.print_exc()

In [None]:
# Configurações (ajuste conforme ambiente)
DATASETS = [
    {
        'input': 's3://ifood-case-repo/yellow/',
        'table': 'dados_taxi_bronze.taxi_yellow',
        'output': 's3://ifood-case-bronze/taxi_yellow/',
        'schema': yellow_schema
    },
    {
        'input': 's3://ifood-case-repo/green/',
        'table': 'dados_taxi_bronze.taxi_green',
        'output': 's3://ifood-case-bronze/taxi_green/',
        'schema': green_schema
    }
]

# Executa o processamento Bronze para cada dataset
for ds in DATASETS:
    process_dataset(ds['input'], ds['output'], ds['table'], ds['schema'])

In [None]:
# --- Passo Silver: ler Bronze do catálogo e escrever Silver particionado ---
def process_bronze_table_to_silver(input_table: str, taxi_type: str):
    print(f'Processando tabela Bronze: {input_table}')
    try:
        df = spark.table(input_table)
        if taxi_type == 'yellow':
            df_silver = df.select(
                F.col('VendorID').alias('id_fornecedor'),
                F.col('passenger_count').cast(T.IntegerType()).alias('quantidade_passageiros'),
                F.col('total_amount').alias('valor_total'),
                F.col('tpep_pickup_datetime').alias('data_inicio_viagem'),
                F.col('tpep_dropoff_datetime').alias('data_fim_viagem'),
                F.lit('yellow').alias('tipo_taxi'),
                F.col('ano'),
                F.col('mes')
            )
        else:
            df_silver = df.select(
                F.col('VendorID').alias('id_fornecedor'),
                F.col('passenger_count').cast(T.IntegerType()).alias('quantidade_passageiros'),
                F.col('total_amount').alias('valor_total'),
                F.col('lpep_pickup_datetime').alias('data_inicio_viagem'),
                F.col('lpep_dropoff_datetime').alias('data_fim_viagem'),
                F.lit('green').alias('tipo_taxi'),
                F.col('ano'),
                F.col('mes')
            )
        return df_silver
    except Exception as e:
        print('Erro ao processar', input_table, e)
        traceback.print_exc()
        return None

# Concatena e grava Silver
bronze_tables = [
    {'table': 'dados_taxi_bronze.taxi_yellow', 'tipo': 'yellow'},
    {'table': 'dados_taxi_bronze.taxi_green', 'tipo': 'green'}
]
df_silver_all = None
for bt in bronze_tables:
    df_silver = process_bronze_table_to_silver(bt['table'], bt['tipo'])
    if df_silver is not None:
        if df_silver_all is None:
            df_silver_all = df_silver
        else:
            df_silver_all = df_silver_all.unionByName(df_silver)

output_silver_path = 's3://ifood-case-silver/corridas_taxi/'
if df_silver_all is not None:
    df_silver_all.write.mode('overwrite') \
        .format('parquet') \
        .partitionBy('ano', 'mes') \
        .option('compression', 'snappy') \
        .save(output_silver_path)

    # Atualiza partições no Glue Catalog (MSCK REPAIR)
    spark.sql('MSCK REPAIR TABLE dados_taxi_silver.corridas_taxi')
    print('Ingestão para Silver finalizada com sucesso.')

## Notas finais e próximos passos

- Se encontrar erros HIVE_BAD_DATA (incompatibilidade de tipos), apague a partição S3 afetada e reexecute a célula Bronze após garantir que os casts corretos são aplicados.
- Para testes locais, você pode apontar `DATASETS` para prefixes/paths de um bucket de teste e rodar célula-a-célula.
- Quando terminar, se criou um `Job` via getResolvedOptions, chame `job.commit()` em uma célula final (opcional).