In [0]:
from pyspark.sql.functions import col, regexp_replace, when, lit, max as spark_max, round
from pyspark.sql.types import StringType, IntegerType, LongType, DoubleType, DecimalType, TimestampType

path = "/Volumes/workspace/raw-zone/taxi_amarelo/"
arquivos = dbutils.fs.ls(path)

# Dicionário para renomear colunas
renomear_colunas = {
    "vendorid": "cod_motorista",
    "tpep_pickup_datetime": "dt_hr_inicio",
    "tpep_dropoff_datetime": "dt_hr_fim",
    "passenger_count": "qtd_pessoas",
    "trip_distance": "dist_percorrida",
    "ratecodeid": "cod_taxa",
    "store_and_fwd_flag": "ind_armazenamento",
    "pulocationid": "cod_bairro_origem",
    "dolocationid": "cod_bairro_destino",
    "payment_type": "forma_pagamento",
    "fare_amount": "vlr_taxa_corrida",
    "extra": "vlr_taxa_extra",
    "mta_tax": "vlr_taxa_mta",
    "tip_amount": "vlr_troco",
    "tolls_amount": "vlr_pedagio",
    "improvement_surcharge": "cod_taxa_melhoria",
    "total_amount": "vlr_total",
    "congestion_surcharge": "vlr_taxa_congestao",
    "airport_fee": "vlr_taxa_aeroporto",
}

# Dicionário de tipos das colunas
tipos_colunas = {
    "cod_motorista": StringType(),
    "dt_hr_inicio": TimestampType(),
    "dt_hr_fim": TimestampType(),
    "qtd_pessoas": IntegerType(),
    "dist_percorrida": DecimalType(10, 2),
    "cod_taxa": IntegerType(),
    "ind_armazenamento": StringType(),
    "cod_bairro_origem": StringType(),
    "cod_bairro_destino": StringType(),
    "forma_pagamento": StringType(),
    "vlr_taxa_corrida": DecimalType(10, 2),
    "vlr_taxa_extra": DecimalType(10, 2),
    "vlr_taxa_mta": DecimalType(10, 2),
    "vlr_troco": DecimalType(10, 2),
    "vlr_pedagio": DecimalType(10, 2),
    "cod_taxa_melhoria": IntegerType(),
    "vlr_total": DecimalType(10, 2),
    "vlr_taxa_congestao": DecimalType(10, 2),
    "vlr_taxa_aeroporto": DecimalType(10, 2),
}

# Garantir que não haverá duplicatas - sempre reprocessa todos os dados
spark.sql("DROP TABLE IF EXISTS `trusted-zone`.tb_corrida_taxi_amarelo")

# Lista para armazenar DataFrames processados
dfs = []

# Processar cada arquivo parquet encontrado
for arquivo in arquivos:
    if arquivo.name.endswith(".parquet"):
        # Ler o arquivo
        df = spark.read.parquet(arquivo.path)

        # Padronizar nomes das colunas para minúsculo
        df = df.toDF(*[c.lower() for c in df.columns])

        # Renomear colunas usando dicionário
        for coluna_antiga, coluna_nova in renomear_colunas.items():
            if coluna_antiga in df.columns:
                df = df.withColumnRenamed(coluna_antiga, coluna_nova)

        # Converter todas as colunas para string para facilitar limpeza
        df = df.select([col(c).cast(StringType()).alias(c) for c in df.columns])

        # Substituir vírgulas por pontos nos valores decimais
        df = df.select([regexp_replace(col(c), ",", ".").alias(c) for c in df.columns])

        # Converter para os tipos corretos usando conversão segura
        for coluna in df.columns:
            if coluna in tipos_colunas:
                tipo_desejado = tipos_colunas[coluna]
                
                if isinstance(tipo_desejado, (IntegerType, LongType)):
                    # Para inteiros: converte via double primeiro para aceitar "1.0"
                    df = df.withColumn(
                        coluna,
                        when(col(coluna).rlike(r"^[0-9]+(\.[0-9]+)?$"), 
                             col(coluna).cast("double").cast(tipo_desejado))
                        .otherwise(None)
                    )
                elif isinstance(tipo_desejado, (DecimalType, DoubleType)):
                    # Para decimais: converte e força arredondamento para 2 casas
                    df = df.withColumn(
                        coluna,
                        when(col(coluna).rlike(r"^[0-9]+(\.[0-9]+)?$"), 
                             round(col(coluna).cast("double"), 2).cast(tipo_desejado))
                        .otherwise(None)
                    )
                elif isinstance(tipo_desejado, TimestampType):
                    # Para timestamps
                    df = df.withColumn(coluna, col(coluna).cast(TimestampType()))
                else:
                    # Para strings
                    df = df.withColumn(coluna, col(coluna).cast(StringType()))

        # Adicionar coluna identificadora da origem
        df = df.withColumn("origem_taxi", lit("taxi_amarelo"))

        # Adicionar DataFrame processado à lista
        dfs.append(df)

# Unir todos os DataFrames se houver arquivos processados
if dfs:
    df_final = dfs[0]
    for df_individual in dfs[1:]:
        df_final = df_final.unionByName(df_individual)

    # Salvar tabela final com todos os dados
    df_final.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("`trusted-zone`.tb_corrida_taxi_amarelo")

   

cod_motorista,dt_hr_inicio,dt_hr_fim,ind_armazenamento,cod_taxa,cod_bairro_origem,cod_bairro_destino,qtd_pessoas,dist_percorrida,vlr_taxa_corrida,vlr_taxa_extra,vlr_taxa_mta,vlr_troco,vlr_pedagio,vlr_gorjeta,cod_taxa_melhoria,vlr_total,tipo_pagamento,tipo_viagem,vlr_taxa_congestao,origem_taxi
2,2023-04-01T00:01:46.000Z,2023-04-01T00:11:42.000Z,N,1,75,42,1,2.62,13.5,1.0,0.5,3.0,0.0,,1.0,19.0,1,1,0.0,taxi_verde
2,2023-04-01T00:21:32.000Z,2023-04-01T00:53:16.000Z,N,1,166,107,1,6.51,33.8,1.0,0.5,7.81,0.0,,1.0,46.86,1,1,2.75,taxi_verde
2,2023-04-01T00:03:14.000Z,2023-04-01T00:16:54.000Z,N,1,74,238,1,2.59,15.6,1.0,0.5,2.5,0.0,,1.0,20.6,1,1,0.0,taxi_verde
1,2023-04-01T00:22:17.000Z,2023-04-01T00:27:34.000Z,N,1,66,33,1,0.7,6.5,1.0,1.5,2.65,0.0,,1.0,11.65,1,1,0.0,taxi_verde
2,2023-04-01T00:58:41.000Z,2023-04-01T01:21:20.000Z,N,1,255,225,1,4.96,25.4,1.0,0.5,5.58,0.0,,1.0,33.48,1,1,0.0,taxi_verde
1,2023-04-01T00:36:23.000Z,2023-04-01T00:57:57.000Z,N,1,112,140,1,7.5,32.4,3.75,1.5,5.0,0.0,,1.0,42.65,1,1,2.75,taxi_verde
2,2023-04-01T00:20:50.000Z,2023-04-01T00:45:07.000Z,N,1,66,158,1,4.26,24.7,1.0,0.5,3.0,0.0,,1.0,32.95,1,1,2.75,taxi_verde
2,2023-04-01T00:31:54.000Z,2023-04-01T00:36:29.000Z,N,1,75,263,1,0.63,6.5,1.0,0.5,1.8,0.0,,1.0,10.8,1,1,0.0,taxi_verde
2,2023-04-01T00:08:35.000Z,2023-04-01T00:11:14.000Z,N,1,7,7,1,0.5,5.1,1.0,0.5,1.52,0.0,,1.0,9.12,1,1,0.0,taxi_verde
2,2023-04-01T00:30:42.000Z,2023-04-01T00:37:11.000Z,N,1,7,179,1,1.11,7.9,1.0,0.5,2.08,0.0,,1.0,12.48,1,1,0.0,taxi_verde


In [0]:
from pyspark.sql.functions import col, regexp_replace, when, lit, max as spark_max
from pyspark.sql.types import StringType, IntegerType, DecimalType, TimestampType

# Configurações centralizadas
CONFIG = {
    "path": "/Volumes/workspace/raw-zone/taxi_verde/",
    "tabela_destino": "`trusted-zone`.tb_corrida_taxi_verde_2",
    "tabela_controle": "controle_arquivos_processados",
    "origem": "taxi_verde"
}

SCHEMA_MAPPING = {
    "vendorid": ("cod_motorista", StringType()),
    "lpep_pickup_datetime": ("dt_hr_inicio", TimestampType()),
    "lpep_dropoff_datetime": ("dt_hr_fim", TimestampType()),
    "passenger_count": ("qtd_pessoas", IntegerType()),
    "trip_distance": ("dist_percorrida", DecimalType(10, 2)),
    "ratecodeid": ("cod_taxa", IntegerType()),
    "store_and_fwd_flag": ("ind_armazenamento", StringType()),
    "pulocationid": ("cod_bairro_origem", StringType()),
    "dolocationid": ("cod_bairro_destino", StringType()),
    "payment_type": ("tipo_pagamento", StringType()),
    "trip_type": ("tipo_viagem", IntegerType()),
    "fare_amount": ("vlr_taxa_corrida", DecimalType(10, 2)),
    "extra": ("vlr_taxa_extra", DecimalType(10, 2)),
    "mta_tax": ("vlr_taxa_mta", DecimalType(10, 2)),
    "tip_amount": ("vlr_troco", DecimalType(10, 2)),
    "tolls_amount": ("vlr_pedagio", DecimalType(10, 2)),
    "ehail_fee": ("vlr_gorjeta", DecimalType(10, 2)),
    "improvement_surcharge": ("cod_taxa_melhoria", IntegerType()),
    "total_amount": ("vlr_total", DecimalType(10, 2)),
    "congestion_surcharge": ("vlr_taxa_congestao", DecimalType(10, 2))
}

def padronizar_df(df):
    """Padroniza DataFrame: renomeia colunas, converte tipos e limpa dados"""
    # Lowercase e renomeação
    df = df.toDF(*[c.lower() for c in df.columns])
    for old_name, (new_name, _) in SCHEMA_MAPPING.items():
        if old_name in df.columns:
            df = df.withColumnRenamed(old_name, new_name)
    
    # Limpeza e conversão de tipos
    for c in df.columns:
        if c in {new_name for new_name, _ in SCHEMA_MAPPING.values()}:
            tipo = next((t for old, (new, t) in SCHEMA_MAPPING.items() if new == c), StringType())
            
            if isinstance(tipo, IntegerType):
                df = df.withColumn(c, 
                    regexp_replace(col(c), ",", ".").cast(StringType())
                ).withColumn(c, 
                    when(col(c).rlike(r"^[0-9]+(\.[0-9]+)?$"), 
                         col(c).cast("double").cast(IntegerType())).otherwise(None)
                )
            elif isinstance(tipo, DecimalType):
                df = df.withColumn(c, 
                    regexp_replace(col(c), ",", ".").cast(StringType())
                ).withColumn(c, 
                    when(col(c).rlike(r"^[0-9]+(\.[0-9]+)?$"), col(c).cast(tipo)).otherwise(None)
                )
            elif isinstance(tipo, TimestampType):
                df = df.withColumn(c, col(c).cast(TimestampType()))
            else:
                df = df.withColumn(c, col(c).cast(StringType()))
    
    # Adiciona origem se não existir
    if "origem_taxi" not in df.columns:
        df = df.withColumn("origem_taxi", lit(CONFIG["origem"]))
    
    return df

def obter_arquivos_processados():
    """Retorna lista de arquivos já processados"""
    try:
        return spark.table(CONFIG["tabela_controle"]).select("arquivo_nome").rdd.flatMap(lambda x: x).collect()
    except:
        return []

def atualizar_controle(novos_arquivos, arquivos_processados):
    """Atualiza tabela de controle com novos arquivos processados"""
    novos_df = spark.createDataFrame([(f.name,) for f in novos_arquivos], ["arquivo_nome"])
    
    if arquivos_processados:
        controle_atual = spark.table(CONFIG["tabela_controle"])
        controle_final = controle_atual.unionByName(novos_df).dropDuplicates()
    else:
        controle_final = novos_df
    
    controle_final.write.mode("overwrite").saveAsTable(CONFIG["tabela_controle"])

def processar_taxi_verde():
    """Função principal que executa todo o processamento"""
    # Listar arquivos e filtrar novos
    arquivos = dbutils.fs.ls(CONFIG["path"])
    arquivos_processados = obter_arquivos_processados()
    arquivos_novos = [f for f in arquivos if f.name.endswith(".parquet") and f.name not in arquivos_processados]
    
    if not arquivos_novos:
        return
    
    # Processar arquivos novos
    dfs_novos = []
    for arquivo in arquivos_novos:
        df = spark.read.parquet(arquivo.path)
        df_padronizado = padronizar_df(df)
        dfs_novos.append(df_padronizado)
    
    # União dos novos dados
    df_novos = dfs_novos[0]
    for df in dfs_novos[1:]:
        df_novos = df_novos.unionByName(df)
    
    # Combinar com dados existentes
    try:
        df_existente = spark.table(CONFIG["tabela_destino"])
        df_existente_padronizado = padronizar_df(df_existente)
        df_final = df_existente_padronizado.unionByName(df_novos)
    except:
        df_final = df_novos
    
    # Salvar dados e atualizar controle
    df_final.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(CONFIG["tabela_destino"])
    atualizar_controle(arquivos_novos, arquivos_processados)

# Executar processamento
processar_taxi_verde()