1-Download da fonte de dados e armazenamento

In [None]:
# === RAW INGESTION: Download de arquivos públicos e envio para o bucket GCS ===

from google.cloud import storage
import requests, io

# Configurações do projeto e bucket
PROJECT_ID  = "potent-poetry-295317"
BUCKET_NAME = "teste_ifood_nyc"
RAW_PREFIX  = "nyc_tlc_raw"  # Prefixo raiz da camada raw no bucket
BASE_URL    = "https://d37ci6vzurychx.cloudfront.net/trip-data"  # Fonte pública de dados

# Escopo da ingestão (ano, meses e tipos de serviço)
YEARS  = [2023]
MONTHS = [1, 2, 3, 4, 5]  # Janeiro a Maio
TYPES  = ["yellow", "green", "fhv", "fhvhv"]

# Inicializa o cliente GCS e acessa o bucket
gcs = storage.Client(project=PROJECT_ID)
bucket = gcs.bucket(BUCKET_NAME)

# Valida existência do bucket
if not bucket.exists():
    raise RuntimeError(f"Bucket '{BUCKET_NAME}' não existe no projeto '{PROJECT_ID}'")

# Função para enviar conteúdo binário para o GCS
def upload_bytes(path: str, content: bytes):
    blob = bucket.blob(path)
    blob.upload_from_file(io.BytesIO(content), rewind=True)
    print(f"Arquivo enviado para: gs://{BUCKET_NAME}/{path}")

# Função que faz download do arquivo público e envia para o GCS
def fetch_and_push(filetype: str, year: int, month: int):
    mm = f"{month:02d}"
    src_url = f"{BASE_URL}/{filetype}_tripdata_{year}-{mm}.parquet"
    dst_path = f"{RAW_PREFIX}/{year}/{mm}/{filetype}_tripdata_{year}-{mm}.parquet"

    print(f"Processando: {src_url} → gs://{BUCKET_NAME}/{dst_path}")
    r = requests.get(src_url, timeout=120)
    r.raise_for_status()
    upload_bytes(dst_path, r.content)

# Loop principal que faz a ingestão para cada combinação de tipo, ano e mês
for y in YEARS:
    for m in MONTHS:
        for t in TYPES:
            try:
                fetch_and_push(t, y, m)
            except Exception as e:
                print(f"Aviso: falha ao processar {t} {y}-{m:02d}: {e}")


2-Criação dos Datasets

In [None]:
from google.cloud import bigquery

# === CONFIGURAÇÃO DO PROJETO E LOCALIZAÇÃO ===
PROJECT  = "potent-poetry-295317"
LOCATION = "us-east1"  # Região onde os datasets serão criados

# Inicializa o cliente do BigQuery
bq_client = bigquery.Client(project=PROJECT)

# Lista de datasets a serem criados (bronze, silver, gold)
datasets = [
    f"{PROJECT}.nyc_tlc_bronze",
    f"{PROJECT}.nyc_tlc_silver",
    f"{PROJECT}.nyc_tlc_gold"
]

# Criação dos datasets com descrição e localização
for ds_id in datasets:
    ds = bigquery.Dataset(ds_id)
    ds.location = LOCATION
    ds.description = f"Camada {ds_id.split('_')[-1].upper()} do case NYC TLC."

    # Cria o dataset se não existir
    bq_client.create_dataset(ds, exists_ok=True)
    print(f"Dataset criado ou existente: {ds_id} (location: {LOCATION})")


3-Criação da Sessão Spark

In [None]:
# Inicia sessão Spark Serverless (via Spark Connect no GCP)
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.getOrCreate()
print("Spark inicializado. Versão:", spark.version)

4-Ingestão Camada Staging

In [None]:
from pyspark.sql import functions as F
from google.cloud import storage

# Configurações gerais
PROJECT_ID   = "potent-poetry-295317"
BUCKET_NAME  = "teste_ifood_nyc"
RAW_PREFIX   = "nyc_tlc_raw"
STG_PREFIX   = "nyc_tlc_staging"

YEARS  = [2023]
MONTHS = [1, 2, 3, 4, 5]
TYPES  = ["yellow", "green", "fhv", "fhvhv"]

# Cliente GCS para acesso ao bucket
gcs = storage.Client(project=PROJECT_ID)
bucket = gcs.bucket(BUCKET_NAME)

# Função de normalização por tipo de dado
def normalize_by_type(df, t):
    def has(c): return c in df.columns

    if t == "yellow":
        pickup  = F.col("tpep_pickup_datetime") if has("tpep_pickup_datetime") else F.lit(None)
        dropoff = F.col("tpep_dropoff_datetime") if has("tpep_dropoff_datetime") else F.lit(None)
        vendor  = F.col("VendorID") if has("VendorID") else (F.col("vendorid") if has("vendorid") else F.lit(None))
        passeng = F.col("passenger_count") if has("passenger_count") else F.lit(None)
        total   = F.col("total_amount") if has("total_amount") else F.lit(None)

    elif t == "green":
        pickup  = F.col("lpep_pickup_datetime") if has("lpep_pickup_datetime") else F.lit(None)
        dropoff = F.col("lpep_dropoff_datetime") if has("lpep_dropoff_datetime") else F.lit(None)
        vendor  = F.col("VendorID") if has("VendorID") else (F.col("vendorid") if has("vendorid") else F.lit(None))
        passeng = F.col("passenger_count") if has("passenger_count") else F.lit(None)
        total   = F.col("total_amount") if has("total_amount") else F.lit(None)

    elif t == "fhv":
        pickup  = F.col("pickup_datetime") if has("pickup_datetime") else F.lit(None)
        dropoff = F.col("dropOff_datetime") if has("dropOff_datetime") else F.lit(None)
        return df.select(
            F.to_timestamp(pickup).alias("tpep_pickup_datetime"),
            F.to_timestamp(dropoff).alias("tpep_dropoff_datetime")
        )

    elif t == "fhvhv":
        pickup  = F.col("pickup_datetime") if has("pickup_datetime") else F.lit(None)
        dropoff = F.col("dropoff_datetime") if has("dropoff_datetime") else F.lit(None)
        return df.select(
            F.to_timestamp(pickup).alias("tpep_pickup_datetime"),
            F.to_timestamp(dropoff).alias("tpep_dropoff_datetime")
        )

    else:
        return df.select(
            F.lit(None).cast("timestamp").alias("tpep_pickup_datetime"),
            F.lit(None).cast("timestamp").alias("tpep_dropoff_datetime")
        )

    # Para yellow e green
    return df.select(
        vendor.cast("int").alias("VendorID"),
        passeng.cast("int").alias("passenger_count"),
        total.cast("double").alias("total_amount"),
        F.to_timestamp(pickup).alias("tpep_pickup_datetime"),
        F.to_timestamp(dropoff).alias("tpep_dropoff_datetime")
    )

# Função que move o arquivo gerado da pasta temporária para a pasta final
def promote_tmp_to_single_file(tmp_prefix: str, final_blob_name: str):
    part = None
    for b in bucket.list_blobs(prefix=tmp_prefix):
        tail = b.name.rsplit("/", 1)[-1]
        if tail.startswith("part-") and tail.endswith(".parquet"):
            part = b
            break

    if not part:
        raise RuntimeError(f"Arquivo part-*.parquet não encontrado em gs://{BUCKET_NAME}/{tmp_prefix}")

    bucket.copy_blob(part, bucket, new_name=final_blob_name)

    # Remove arquivos temporários após promoção
    for b in bucket.list_blobs(prefix=tmp_prefix):
        b.delete()

# Processo principal de leitura, normalização e escrita
def main():
    for y in YEARS:
        for m in MONTHS:
            for t in TYPES:
                raw_path   = f"gs://{BUCKET_NAME}/{RAW_PREFIX}/{y}/{m:02d}/{t}_tripdata_{y}-{m:02d}.parquet"
                tmp_prefix = f"{STG_PREFIX}/{y}/{m:02d}/{t}_tripdata_{y}-{m:02d}.parquet_tmp/"
                final_blob = f"{STG_PREFIX}/{y}/{m:02d}/{t}_tripdata_{y}-{m:02d}.parquet"

                try:
                    print(f"Processando: {t} {y}-{m:02d}")
                    df_raw = spark.read.parquet(raw_path)
                    df_stg = normalize_by_type(df_raw, t)

                    # Verifica consistência entre raw e staging
                    raw_count = df_raw.count()
                    stg_count = df_stg.count()

                    if raw_count != stg_count:
                        print(f"Aviso: {t} {y}-{m:02d} - raw: {raw_count} linhas ≠ staging: {stg_count} linhas")
                    else:
                        print(f"Linhas OK: {t} {y}-{m:02d} - {raw_count} linhas")

                    # Escreve no GCS em modo overwrite
                    df_stg.coalesce(1).write.mode("overwrite").parquet(f"gs://{BUCKET_NAME}/{tmp_prefix}")
                    promote_tmp_to_single_file(tmp_prefix, final_blob)

                except Exception as e:
                    print(f"Erro ao processar {t} {y}-{m:02d}: {e}")

# Execução do processo
main()

5-Ingestão Camada Bronze

In [None]:
from google.cloud import bigquery
from pyspark.sql import functions as F

# === CONFIGURAÇÕES GERAIS ===
PROJECT_ID  = "potent-poetry-295317"
DATASET_ID  = "nyc_tlc_bronze"
BUCKET_NAME = "teste_ifood_nyc"
STG_PREFIX  = "nyc_tlc_staging"
TYPES       = ["yellow", "green", "fhv", "fhvhv"]

bq_client = bigquery.Client(project=PROJECT_ID)

# === SCHEMAS COM DESCRIÇÕES (aparecem no esquema do BigQuery) ===
schemas = {
    "yellow": [
        bigquery.SchemaField("VendorID", "INTEGER",
            description="Código do provedor: 1=Creative Mobile Technologies, 2=Curb Mobility, 6=Myle Technologies."),
        bigquery.SchemaField("passenger_count", "INTEGER",
            description="Número de passageiros na corrida."),
        bigquery.SchemaField("total_amount", "FLOAT",
            description="Valor total da corrida (taxas e gorjetas incluídas)."),
        bigquery.SchemaField("tpep_pickup_datetime", "TIMESTAMP",
            description="Data/hora de início da corrida."),
        bigquery.SchemaField("tpep_dropoff_datetime", "TIMESTAMP",
            description="Data/hora de término da corrida."),
    ],
    "green": [
        bigquery.SchemaField("VendorID", "INTEGER",
            description="Código do provedor (1, 2 ou 6)."),
        bigquery.SchemaField("passenger_count", "INTEGER",
            description="Quantidade de passageiros."),
        bigquery.SchemaField("total_amount", "FLOAT",
            description="Valor total pago."),
        bigquery.SchemaField("tpep_pickup_datetime", "TIMESTAMP",
            description="Data/hora de início da corrida."),
        bigquery.SchemaField("tpep_dropoff_datetime", "TIMESTAMP",
            description="Data/hora de término da corrida."),
    ],
    "fhv": [
        bigquery.SchemaField("tpep_pickup_datetime", "TIMESTAMP",
            description="Data/hora de início da viagem FHV."),
        bigquery.SchemaField("tpep_dropoff_datetime", "TIMESTAMP",
            description="Data/hora de término da viagem FHV."),
    ],
    "fhvhv": [
        bigquery.SchemaField("tpep_pickup_datetime", "TIMESTAMP",
            description="Data/hora de início da viagem FHVHV."),
        bigquery.SchemaField("tpep_dropoff_datetime", "TIMESTAMP",
            description="Data/hora de término da viagem FHVHV."),
    ],
}

# === NORMALIZAÇÃO MÍNIMA POR TIPO (apenas tipos e nomes) ===
def normalize_by_type(df, t: str):
    if t in ["yellow", "green"]:
        return df.select(
            F.col("VendorID").cast("int").alias("VendorID"),
            F.col("passenger_count").cast("int").alias("passenger_count"),
            F.col("total_amount").cast("double").alias("total_amount"),
            F.col("tpep_pickup_datetime").cast("timestamp").alias("tpep_pickup_datetime"),
            F.col("tpep_dropoff_datetime").cast("timestamp").alias("tpep_dropoff_datetime"),
        )
    else:  # fhv / fhvhv
        return df.select(
            F.col("tpep_pickup_datetime").cast("timestamp").alias("tpep_pickup_datetime"),
            F.col("tpep_dropoff_datetime").cast("timestamp").alias("tpep_dropoff_datetime"),
        )

# === CRIA/RECRIA TABELA COM DESCRIÇÕES, PARTICIONAMENTO E CLUSTERIZAÇÃO ===
def create_table_with_schema(t: str):
    table_id = f"{PROJECT_ID}.{DATASET_ID}.tbl_{t}_bronze"
    table = bigquery.Table(table_id, schema=schemas[t])
    table.description = f"Tabela BRONZE {t.upper()} normalizada a partir da staging (tipos e nomes padronizados)."
    table.time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.MONTH, field="tpep_pickup_datetime"
    )
    if t in ["yellow", "green"]:
        table.clustering_fields = ["VendorID"]

    # recria para garantir que descrições/partitioning/cluster estejam atualizados
    bq_client.delete_table(table_id, not_found_ok=True)
    bq_client.create_table(table)
    print(f"🆕 Tabela criada: {table_id}")

# === PIPELINE: LER STAGING → NORMALIZAR → CRIAR TABELA → GRAVAR NO BQ ===
for t in TYPES:
    try:
        print(f"\n🔄 Processando tipo: {t}")
        path = f"gs://{BUCKET_NAME}/{STG_PREFIX}/2023/*/{t}_tripdata_*.parquet"
        df_raw = spark.read.parquet(path)

        df_bronze = normalize_by_type(df_raw, t)
        print(f"Total de linhas ({t}): {df_bronze.count()}")

        create_table_with_schema(t)

        writer = (
            df_bronze.write
            .format("bigquery")
            .option("table", f"{PROJECT_ID}.{DATASET_ID}.tbl_{t}_bronze")
            .option("writeMethod", "direct")
            .mode("overwrite")  # substitui os dados; a estrutura/descrições ficam
        )
        if t in ["yellow", "green"]:
            writer = writer.option("clusteredFields", "VendorID") \
                           .option("partitionField", "tpep_pickup_datetime") \
                           .option("partitionType", "MONTH")

        writer.save()
        print(f"Dados gravados: {PROJECT_ID}.{DATASET_ID}.tbl_{t}_bronze")

    except Exception as e:
        print(f"Erro ao processar {t}: {e}")


6-Ingestão Camada Silver

In [None]:
from google.cloud import bigquery

client = bigquery.Client(project="potent-poetry-295317")

bronze = "potent-poetry-295317.nyc_tlc_bronze"
silver = "potent-poetry-295317.nyc_tlc_silver"

# Descrições por tabela
descriptions = {
    "yellow": "Tabela Silver com dados tratados dos táxis amarelos (Yellow Cabs) de NYC.",
    "green": "Tabela Silver com dados tratados dos táxis verdes (Green Cabs) de NYC.",
    "fhv": "Tabela Silver com dados tratados dos veículos FHV (For-Hire Vehicles).",
    "fhvhv": "Tabela Silver com dados tratados dos veículos FHVHV (High Volume FHV)."
}

# Descrições por coluna separadas por tabela
column_descriptions_by_table = {
    "yellow": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "datetime_dropoff": "Data e hora de término da corrida.",
        "count_passenger": "Quantidade de passageiros transportados na corrida.",
        "id_vendor": "Código da empresa de táxi que providenciou a corrida.",
        "nm_vendor": "Nome da empresa correspondente ao VendorID.",
        "vl_amount": "Valor total pago pela corrida, incluindo taxas e gorjetas.",
        "minutes_duration_trip": "Duração da corrida em minutos."
    },
    "green": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "datetime_dropoff": "Data e hora de término da corrida.",
        "count_passenger": "Quantidade de passageiros transportados na corrida.",
        "id_vendor": "Código da empresa de táxi que providenciou a corrida.",
        "nm_vendor": "Nome da empresa correspondente ao VendorID.",
        "vl_amount": "Valor total pago pela corrida, incluindo taxas e gorjetas.",
        "minutes_duration_trip": "Duração da corrida em minutos."
    },
    "fhv": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "datetime_dropoff": "Data e hora de término da corrida.",
        "minutes_duration_trip": "Duração da corrida em minutos."
    },
    "fhvhv": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "datetime_dropoff": "Data e hora de término da corrida.",
        "minutes_duration_trip": "Duração da corrida em minutos."
    }
}

# Queries para recriar tabelas Silver
queries = {
    "yellow": f"""
        CREATE OR REPLACE TABLE {silver}.tbl_yellow_silver
        PARTITION BY DATE(datetime_pickup)
        CLUSTER BY id_vendor
        OPTIONS(description=\"{descriptions['yellow']}\") AS
        SELECT
            CAST(tpep_pickup_datetime AS TIMESTAMP) AS datetime_pickup,
            CAST(tpep_dropoff_datetime AS TIMESTAMP) AS datetime_dropoff,
            IF(passenger_count < 0, 0, CAST(passenger_count AS INT64)) AS count_passenger,
            CAST(VendorID AS INT64) AS id_vendor,
            CASE
                WHEN VendorID = 1 THEN 'Creative Mobile Technologies, LLC'
                WHEN VendorID = 2 THEN 'Curb Mobility, LLC'
                WHEN VendorID = 6 THEN 'Myle Technologies Inc.'
                ELSE 'Desconhecido'
            END AS nm_vendor,
            IFNULL(CAST(total_amount AS FLOAT64), 0.0) AS vl_amount,
            ROUND(TIMESTAMP_DIFF(tpep_dropoff_datetime, tpep_pickup_datetime, SECOND) / 60, 2) AS minutes_duration_trip
        FROM {bronze}.tbl_yellow_bronze
        WHERE DATE(tpep_pickup_datetime) BETWEEN '2023-01-01' AND '2023-05-31'
    """,

    "green": f"""
        CREATE OR REPLACE TABLE {silver}.tbl_green_silver
        PARTITION BY DATE(datetime_pickup)
        CLUSTER BY id_vendor
        OPTIONS(description=\"{descriptions['green']}\") AS
        SELECT
            CAST(tpep_pickup_datetime AS TIMESTAMP) AS datetime_pickup,
            CAST(tpep_dropoff_datetime AS TIMESTAMP) AS datetime_dropoff,
            IF(passenger_count < 0, 0, CAST(passenger_count AS INT64)) AS count_passenger,
            CAST(VendorID AS INT64) AS id_vendor,
            CASE
                WHEN VendorID = 1 THEN 'Creative Mobile Technologies, LLC'
                WHEN VendorID = 2 THEN 'Curb Mobility, LLC'
                WHEN VendorID = 6 THEN 'Myle Technologies Inc.'
                ELSE 'Desconhecido'
            END AS nm_vendor,
            IFNULL(CAST(total_amount AS FLOAT64), 0.0) AS vl_amount,
            ROUND(TIMESTAMP_DIFF(tpep_dropoff_datetime, tpep_pickup_datetime, SECOND) / 60, 2) AS minutes_duration_trip
        FROM {bronze}.tbl_green_bronze
        WHERE DATE(tpep_pickup_datetime) BETWEEN '2023-01-01' AND '2023-05-31'
    """,

    "fhv": f"""
        CREATE OR REPLACE TABLE {silver}.tbl_fhv_silver
        PARTITION BY DATE(datetime_pickup)
        OPTIONS(description=\"{descriptions['fhv']}\") AS
        SELECT
            CAST(tpep_pickup_datetime AS TIMESTAMP) AS datetime_pickup,
            CAST(tpep_dropoff_datetime AS TIMESTAMP) AS datetime_dropoff,
            ROUND(TIMESTAMP_DIFF(tpep_dropoff_datetime, tpep_pickup_datetime, SECOND) / 60, 2) AS minutes_duration_trip
        FROM {bronze}.tbl_fhv_bronze
        WHERE DATE(tpep_pickup_datetime) BETWEEN '2023-01-01' AND '2023-05-31'
    """,

    "fhvhv": f"""
        CREATE OR REPLACE TABLE {silver}.tbl_fhvhv_silver
        PARTITION BY DATE(datetime_pickup)
        OPTIONS(description=\"{descriptions['fhvhv']}\") AS
        SELECT
            CAST(tpep_pickup_datetime AS TIMESTAMP) AS datetime_pickup,
            CAST(tpep_dropoff_datetime AS TIMESTAMP) AS datetime_dropoff,
            ROUND(TIMESTAMP_DIFF(tpep_dropoff_datetime, tpep_pickup_datetime, SECOND) / 60, 2) AS minutes_duration_trip
        FROM {bronze}.tbl_fhvhv_bronze
        WHERE DATE(tpep_pickup_datetime) BETWEEN '2023-01-01' AND '2023-05-31'
    """
}

# Executa criação + update de descrições de colunas
for tipo, query in queries.items():
    table_id = f"{silver}.tbl_{tipo}_silver"
    print(f"Recriando tabela Silver para: {tipo}")

    client.delete_table(table_id, not_found_ok=True)
    print(f"Tabela deletada (se existia): {table_id}")

    client.query(query).result()
    print(f"Tabela {table_id} criada com sucesso.")

    table = client.get_table(table_id)
    new_schema = []
    column_descriptions = column_descriptions_by_table.get(tipo, {})
    for field in table.schema:
        desc = column_descriptions.get(field.name, field.description)
        new_schema.append(bigquery.SchemaField(field.name, field.field_type, mode=field.mode, description=desc))
    table.schema = new_schema
    client.update_table(table, ["schema"])
    print(f"Descrições de colunas aplicadas para {table_id}")


6-Ingestão Camada Gold

In [None]:
from google.cloud import bigquery

client = bigquery.Client(project="potent-poetry-295317")

silver = "potent-poetry-295317.nyc_tlc_silver"
gold = "potent-poetry-295317.nyc_tlc_gold"

# Descrições por coluna por tabela
column_descriptions_by_table = {
    "tbl_yellow_gold": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "month_reference": "Mês e ano da corrida (primeiro dia do mês).",
        "avg_vl_amount": "Valor médio total pago por corrida.",
        "count_rides": "Número total de corridas no dia.",
        "avg_minutes_duration": "Duração média das corridas no dia (em minutos)."
    },
    "tbl_green_gold": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "month_reference": "Mês e ano da corrida (primeiro dia do mês).",
        "avg_vl_amount": "Valor médio total pago por corrida.",
        "count_rides": "Número total de corridas no dia.",
        "avg_minutes_duration": "Duração média das corridas no dia (em minutos)."
    },
    "tbl_fhv_gold": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "month_reference": "Mês e ano da corrida (primeiro dia do mês).",
        "count_rides": "Número total de corridas no dia.",
        "avg_minutes_duration": "Duração média das corridas no dia (em minutos)."
    },
    "tbl_fhvhv_gold": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "month_reference": "Mês e ano da corrida (primeiro dia do mês).",
        "count_rides": "Número total de corridas no dia.",
        "avg_minutes_duration": "Duração média das corridas no dia (em minutos)."
    },
    "tbl_passenger_hour_gold": {
        "datetime_pickup": "Data e hora de início da corrida.",
        "date_reference": "Data da corrida.",
        "month_reference": "Mês e ano de referência (primeiro dia do mês).",
        "hour_trip": "Hora do dia formatada no formato HH:00 (24h) em que a corrida iniciou.",
        "type_service": "Tipo de serviço (yellow ou green).",
        "avg_count_passenger": "Média de passageiros por corrida naquela hora."
    }
}

descriptions = {
    "tbl_yellow_gold": "Tabela Gold com métricas agregadas por corrida dos táxis amarelos.",
    "tbl_green_gold": "Tabela Gold com métricas agregadas por corrida dos táxis verdes.",
    "tbl_fhv_gold": "Tabela Gold com métricas agregadas dos veículos FHV.",
    "tbl_fhvhv_gold": "Tabela Gold com métricas agregadas dos veículos FHVHV.",
    "tbl_passenger_hour_gold": "Tabela Gold com média de passageiros por hora no mês de maio."
}

queries = {
    "tbl_yellow_gold": f"""
        CREATE OR REPLACE TABLE {gold}.tbl_yellow_gold
        PARTITION BY DATE_TRUNC(datetime_pickup, MONTH)
        OPTIONS(description=\"{descriptions['tbl_yellow_gold']}\") AS
        SELECT
            datetime_pickup,
            DATE_TRUNC(datetime_pickup, MONTH) AS month_reference,
            ROUND(AVG(vl_amount), 2) AS avg_vl_amount,
            COUNT(*) AS count_rides,
            ROUND(AVG(minutes_duration_trip), 2) AS avg_minutes_duration
        FROM {silver}.tbl_yellow_silver
        GROUP BY datetime_pickup, month_reference
    """,

    "tbl_green_gold": f"""
        CREATE OR REPLACE TABLE {gold}.tbl_green_gold
        PARTITION BY DATE_TRUNC(datetime_pickup, MONTH)
        OPTIONS(description=\"{descriptions['tbl_green_gold']}\") AS
        SELECT
            datetime_pickup,
            DATE_TRUNC(datetime_pickup, MONTH) AS month_reference,
            ROUND(AVG(vl_amount), 2) AS avg_vl_amount,
            COUNT(*) AS count_rides,
            ROUND(AVG(minutes_duration_trip), 2) AS avg_minutes_duration
        FROM {silver}.tbl_green_silver
        GROUP BY datetime_pickup, month_reference
    """,

    "tbl_fhv_gold": f"""
        CREATE OR REPLACE TABLE {gold}.tbl_fhv_gold
        PARTITION BY DATE_TRUNC(datetime_pickup, MONTH)
        OPTIONS(description=\"{descriptions['tbl_fhv_gold']}\") AS
        SELECT
            datetime_pickup,
            DATE_TRUNC(datetime_pickup, MONTH) AS month_reference,
            COUNT(*) AS count_rides,
            ROUND(AVG(minutes_duration_trip), 2) AS avg_minutes_duration
        FROM {silver}.tbl_fhv_silver
        GROUP BY datetime_pickup, month_reference
    """,

    "tbl_fhvhv_gold": f"""
        CREATE OR REPLACE TABLE {gold}.tbl_fhvhv_gold
        PARTITION BY DATE_TRUNC(datetime_pickup, MONTH)
        OPTIONS(description=\"{descriptions['tbl_fhvhv_gold']}\") AS
        SELECT
            datetime_pickup,
            DATE_TRUNC(datetime_pickup, MONTH) AS month_reference,
            COUNT(*) AS count_rides,
            ROUND(AVG(minutes_duration_trip), 2) AS avg_minutes_duration
        FROM {silver}.tbl_fhvhv_silver
        GROUP BY datetime_pickup, month_reference
    """,

    "tbl_passenger_hour_gold": f"""
        CREATE OR REPLACE TABLE {gold}.tbl_passenger_hour_gold
        PARTITION BY DATE(datetime_pickup)
        CLUSTER BY type_service, hour_trip
        OPTIONS(description=\"{descriptions['tbl_passenger_hour_gold']}\") AS
        SELECT
            datetime_pickup,
            DATE(datetime_pickup) AS date_reference,
            DATE_TRUNC(datetime_pickup, MONTH) AS month_reference,
            LPAD(CAST(EXTRACT(HOUR FROM datetime_pickup) AS STRING), 2, '0') || ':00' AS hour_trip,
            'yellow' AS type_service,
            ROUND(AVG(count_passenger), 2) AS avg_count_passenger
        FROM {silver}.tbl_yellow_silver
        WHERE EXTRACT(MONTH FROM datetime_pickup) = 5
        GROUP BY datetime_pickup, date_reference, month_reference, hour_trip

        UNION ALL

        SELECT
            datetime_pickup,
            DATE(datetime_pickup) AS date_reference,
            DATE_TRUNC(datetime_pickup, MONTH) AS month_reference,
            LPAD(CAST(EXTRACT(HOUR FROM datetime_pickup) AS STRING), 2, '0') || ':00' AS hour_trip,
            'green' AS type_service,
            ROUND(AVG(count_passenger), 2) AS avg_count_passenger
        FROM {silver}.tbl_green_silver
        WHERE EXTRACT(MONTH FROM datetime_pickup) = 5
        GROUP BY datetime_pickup, date_reference, month_reference, hour_trip
    """
}

for tbl, query in queries.items():
    table_id = f"{gold}.{tbl}"
    print(f"Criando tabela GOLD: {table_id}")

    client.delete_table(table_id, not_found_ok=True)
    print(f"Tabela deletada (se existia): {table_id}")

    client.query(query).result()
    print(f"Tabela criada com sucesso: {table_id}")

    table = client.get_table(table_id)
    schema = []
    col_descriptions = column_descriptions_by_table.get(tbl, {})
    for field in table.schema:
        desc = col_descriptions.get(field.name, field.description)
        schema.append(bigquery.SchemaField(field.name, field.field_type, mode=field.mode, description=desc))
    table.schema = schema
    client.update_table(table, ["schema"])
    print(f"Descrições de colunas atualizadas: {table_id}")
