Notebook responsável por ler as tabelas bronze do taxi do tipo YELLOW, realizar as limpezas e transformações, agregar os dados em uma única tabela fato e salvar na camada silver.

As tabelas "dimensão" são criadas em outros notebooks dentro desta mesma pasta.

In [0]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
import yaml

In [0]:
# método responsável por incluir os metadados nas colunas da tabela. 
# ele lê um arquivo yaml com a configuração das colunas e atualiza os metadados
def create_update_column_metadata(table_name, file_path):
    try:
        with open(file_path, "r") as file:
            column_comments = yaml.safe_load(file)["columns"]

            for column, comment in column_comments.items():
                spark.sql(f"ALTER TABLE {table_name} ALTER COLUMN {column} COMMENT '{comment}'")
    except Exception as e:
        print(f"Error during update column metadata: {e}")

# método responsável por filtrar registros onde a data de embarque é menor que a data de desembarque e
# e também retira registros que a data de embarque está fora do mês daquela fonte (exemplo: um registro com a data de 
# embarque de 2022-01 não pode estar junto com os registros do mês 2023-02)
def remove_out_range_date(df, month_str):
    df = (
        df.filter(
            (F.date_format(F.date_trunc("month", F.col("tpep_pickup_datetime")), "yyyy-MM") == month_str) &
            (F.col("tpep_dropoff_datetime") >= F.col("tpep_pickup_datetime"))
        )
    )
    return df

In [0]:
source_table_name_2023_01 = "ifood_case.bronze.tb_yellow_2023_01"
source_table_name_2023_02 = "ifood_case.bronze.tb_yellow_2023_02"
source_table_name_2023_03 = "ifood_case.bronze.tb_yellow_2023_03"
source_table_name_2023_04 = "ifood_case.bronze.tb_yellow_2023_04"
source_table_name_2023_05 = "ifood_case.bronze.tb_yellow_2023_05"

table_name = "tb_ft_trip_yellow"
silver_table_name = f"ifood_case.silver.{table_name}"

In [0]:
df_202301 = spark.read.table(source_table_name_2023_01)
df_202302 = spark.read.table(source_table_name_2023_02)
df_202303 = spark.read.table(source_table_name_2023_03)
df_202304 = spark.read.table(source_table_name_2023_04)
df_202305 = spark.read.table(source_table_name_2023_05)

In [0]:
df_202301 = remove_out_range_date(df_202301, "2023-01")
df_202302 = remove_out_range_date(df_202302, "2023-02")
df_202303 = remove_out_range_date(df_202303, "2023-03")
df_202304 = remove_out_range_date(df_202304, "2023-04")
df_202305 = remove_out_range_date(df_202305, "2023-05")

In [0]:
column_store_and_fwd_flag_expression = (
    F.when(F.col("store_and_fwd_flag") == "N", False)
     .when(F.col("store_and_fwd_flag") == "Y", True)
     .otherwise(None)
)

In [0]:
# os dados de 2023-01 possuem um schema diferente dos meses subsequentes. Por isso são carregados e processados de forma separada
df_202301 = (
    df_202301
        .withColumn("fl_store_and_fwd", column_store_and_fwd_flag_expression)
        .select(
            F.col("VendorID").alias("id_vendor").cast(T.IntegerType()),
            F.col("tpep_pickup_datetime").alias("ts_pickup"),
            F.col("tpep_dropoff_datetime").alias("ts_dropoff"),
            F.col("passenger_count").alias("nb_passenger_count"),
            F.col("trip_distance").alias("vl_trip_distance"),
            F.col("RatecodeID").alias("id_rate_code").cast(T.IntegerType()),
            F.col("fl_store_and_fwd"),
            F.col("PULocationID").alias("id_pickup_location").cast(T.IntegerType()),
            F.col("DOLocationID").alias("id_dropoff_location").cast(T.IntegerType()),
            F.col("payment_type").alias("id_payment_type").cast(T.IntegerType()),
            F.col("fare_amount").alias("vl_fare_amount"),
            F.col("extra").alias("vl_extra"),
            F.col("mta_tax").alias("vl_mta_tax"),
            F.col("tip_amount").alias("vl_tip_amount"),
            F.col("tolls_amount").alias("vl_tolls_amount"),
            F.col("improvement_surcharge").alias("vl_improvement_surcharge"),
            F.col("total_amount").alias("vl_total_amount"),
            F.col("congestion_surcharge").alias("vl_congestion_surcharge"),
            F.col("airport_fee").alias("vl_airport_fee")
        )
)

df_after_202301 = (
    df_202302.union(df_202303).union(df_202304).union(df_202305)
        .withColumn("fl_store_and_fwd", column_store_and_fwd_flag_expression)
        .select(
            F.col("VendorID").alias("id_vendor").cast(T.IntegerType()),
            F.col("tpep_pickup_datetime").alias("ts_pickup"),
            F.col("tpep_dropoff_datetime").alias("ts_dropoff"),
            F.col("passenger_count").alias("nb_passenger_count"),
            F.col("trip_distance").alias("vl_trip_distance"),
            F.col("RatecodeID").alias("id_rate_code").cast(T.IntegerType()),
            F.col("fl_store_and_fwd"),
            F.col("PULocationID").alias("id_pickup_location").cast(T.IntegerType()),
            F.col("DOLocationID").alias("id_dropoff_location").cast(T.IntegerType()),
            F.col("payment_type").alias("id_payment_type").cast(T.IntegerType()),
            F.col("fare_amount").alias("vl_fare_amount"),
            F.col("extra").alias("vl_extra"),
            F.col("mta_tax").alias("vl_mta_tax"),
            F.col("tip_amount").alias("vl_tip_amount"),
            F.col("tolls_amount").alias("vl_tolls_amount"),
            F.col("improvement_surcharge").alias("vl_improvement_surcharge"),
            F.col("total_amount").alias("vl_total_amount"),
            F.col("congestion_surcharge").alias("vl_congestion_surcharge"),
            F.col("Airport_fee").alias("vl_airport_fee")
        )
)

df_silver = df_after_202301.union(df_202301).coalesce(1)

In [0]:
df_silver = (
    df_silver
        .dropDuplicates()
        .dropna(how="all")
)

In [0]:
df_silver.write.format("delta").mode("overwrite").saveAsTable(f"{silver_table_name}")

In [0]:
create_update_column_metadata(silver_table_name, f"./metadata/{table_name}.yaml")