In [0]:
from pyspark.sql import functions as F

In [0]:
dbutils.widgets.text("dataset_prefix", "")
dbutils.widgets.text("month", "")

In [0]:
dataset_prefix = dbutils.widgets.get("dataset_prefix") #"yellow_tripdata"
month = dbutils.widgets.get("month") # "YYYY-MM"
source_table = f"workspace.ifood_bronze.{dataset_prefix}"
target_table = f"workspace.ifood_silver.{dataset_prefix}"

In [0]:
df_bronze = (spark
             .table(source_table)
             .where((F.col("source_month") == month) & (F.col("source_prefix") == dataset_prefix))
)

In [0]:
keys = ["VendorID","tpep_pickup_datetime","tpep_dropoff_datetime",
        "PULocationID","DOLocationID","total_amount"]

df_duplicates = df_bronze.dropDuplicates(keys)

In [0]:
df_sanity_clean = (
    df_duplicates
    # data de embarque e desembarque != null
    .where(F.col("tpep_pickup_datetime").isNotNull() & F.col("tpep_dropoff_datetime").isNotNull())
    # data de desembarque >= data de embarque
    .where(F.col("tpep_dropoff_datetime") >= F.col("tpep_pickup_datetime"))
    # duração da viagem > 0 e <= 12h
    .withColumn("trip_minutes", (F.col("tpep_dropoff_datetime").cast("long") - F.col("tpep_pickup_datetime").cast("long"))/60.0)
    .where((F.col("trip_minutes") > 0) & (F.col("trip_minutes") <= 12*60))
    # valor da corrida >= 0
    .where((F.col("total_amount") >= 0))
    # distância da corrida > 0
    .where(F.col("trip_distance") > 0)
    # total de passageiros > 0
    .where(F.col("passenger_count") > 0)
)

In [0]:
month_int = int(month[-2:])
df_time_window_clean = (df_sanity_clean
                        .where(F.month(F.col("tpep_pickup_datetime")) == month_int)
                )

In [0]:
df_time_window_clean.select("trip_distance", "fare_amount", "trip_minutes", "passenger_count").display()

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
df_new_cols = (
    df_time_window_clean
    .withColumn("pickup_date", F.to_date("tpep_pickup_datetime"))
    .withColumn("pickup_hour", F.hour("tpep_pickup_datetime"))
)


In [0]:
df_new_cols.write.mode("append").saveAsTable(target_table)