In [0]:
from pyspark.sql.functions import col, year as y, month as m, current_timestamp

In [0]:
# controle de ingestão para teste de job de ingestao diária em caso de dados atualizados. Nesse caso foi usada a coluna de ano para testes 
controle = spark.sql("""
    SELECT MAX(last_year) as last_year
    FROM nyc_taxi_catalog.bronze.tb_ctrl_ingestao_raw_bronze
    WHERE dataset = 'yellow_taxi'
    """).collect()[0]

ano_atual = controle.last_year
proximo_ano = ano_atual + 1

In [0]:
# Origem dos dados
raw_path = f"/Volumes/nyc_taxi_catalog/raw/yellow_taxi/*.parquet"

#transformacao em um dataframe 
df_raw = spark.read.parquet(raw_path)

In [0]:
# Criacao da coluna de controle e particionamento de periodo dos dados

df_filtrado = (
    df_raw
    .withColumn("year", y(col("tpep_pickup_datetime")))
    .withColumn("month", m(col("tpep_pickup_datetime")))
    .filter(col("year") == proximo_ano)
)


In [0]:
if df_filtrado.limit(1).count() == 0:
    print(f"Nenhum dado encontrado para o ano {proximo_ano}. Encerrando.")
    dbutils.notebook.exit("NO_DATA")

print(f"Dados encontrados para o ano {proximo_ano}. Iniciando gravação.")


In [0]:
df_filtrado.groupBy("year").count().orderBy("year").show(20)


In [0]:
(
    df_filtrado
    .write
    .format("delta")
    .mode("append")
    .partitionBy("year", "month")
    .saveAsTable("nyc_taxi_catalog.bronze.yellow_taxi")
)

spark.sql(
    f"""
    UPDATE nyc_taxi_catalog.bronze.tb_ctrl_ingestao_raw_bronze
    SET last_year = {proximo_ano},
        update_at = current_timestamp()
    WHERE dataset = 'yellow_taxi'
    """
)

print(f"Ano {proximo_ano} processado com sucesso.")