In [0]:
from pyspark.sql.functions import col, lit, coalesce, when, year, month, date_format, regexp_extract
from pyspark.sql.types import TimestampType, DoubleType, IntegerType, StringType



aws_access_key_id = "suacredencial"
aws_secret_access_key = "suacredencial"
s3_bucket_name = "ifood-case-data-lake-vitor"

bronze_path = f"s3a://{s3_bucket_name}/bronze_layer/"
silver_path = f"s3a://{s3_bucket_name}/silver_layer/"

read_options = {"fs.s3a.access.key": aws_access_key_id, "fs.s3a.secret.key": aws_secret_access_key}
write_options = {"fs.s3a.access.key": aws_access_key_id, "fs.s3a.secret.key": aws_secret_access_key}


In [0]:
# Processamento e unificação dos dados de táxis Yellow
df_yellow_jan = (spark.read.options(**read_options).parquet(f"{bronze_path}yellow/year=2023/month=1")
    .withColumn("year", lit(2023))
    .withColumn("month", lit(1))
)
df_yellow_feb = (spark.read.options(**read_options).parquet(f"{bronze_path}yellow/year=2023/month=2")
    .withColumn("year", lit(2023))
    .withColumn("month", lit(2))
)
df_yellow_mar = (spark.read.options(**read_options).parquet(f"{bronze_path}yellow/year=2023/month=3")
    .withColumn("year", lit(2023))
    .withColumn("month", lit(3))
)
df_yellow_apr = (spark.read.options(**read_options).parquet(f"{bronze_path}yellow/year=2023/month=4")
    .withColumn("year", lit(2023))
    .withColumn("month", lit(4))
)
df_yellow_may = (spark.read.options(**read_options).parquet(f"{bronze_path}yellow/year=2023/month=5")
    .withColumn("year", lit(2023))
    .withColumn("month", lit(5))
)

df_yellow_unified = (df_yellow_jan.unionByName(df_yellow_feb, allowMissingColumns=True)
                             .unionByName(df_yellow_mar, allowMissingColumns=True)
                             .unionByName(df_yellow_apr, allowMissingColumns=True)
                             .unionByName(df_yellow_may, allowMissingColumns=True))

df_yellow_silver = (df_yellow_unified
    .withColumn("taxi_type", lit("yellow"))
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
    .withColumnRenamed("VendorID", "vendor_id")
    .select(
        col("taxi_type").cast(StringType()),
        col("vendor_id").cast(StringType()),
        coalesce(col("passenger_count").cast(DoubleType()), lit(0.0)).cast(IntegerType()).alias("passenger_count"),
        col("total_amount").cast(DoubleType()),
        col("pickup_datetime").cast(TimestampType()),
        col("dropoff_datetime").cast(TimestampType()),
        col("year").cast(IntegerType()),
        col("month").cast(IntegerType())
    )
    .withColumn("is_valid_total_amount", when(col("total_amount") > 0, True).otherwise(False))
    .withColumn("is_valid_passenger_count", when(col("passenger_count") > 0, True).otherwise(False))
    .withColumn("is_valid_trip_time", when(col("dropoff_datetime") > col("pickup_datetime"), True).otherwise(False))
)
# Escrita em tabelas separadas na camada Silver
#df_yellow_silver.write.options(**write_options).option("overwriteSchema", "true").format("delta").mode("overwrite").save(f"{silver_path}yellow/")
(df_yellow_silver.coalesce(1).write.options(**write_options)
    .option("overwriteSchema", "true")
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "month")
    .save(f"{silver_path}yellow/")
)
print("Dados de yellow padronizados e salvos na camada Silver.")


Dados de yellow padronizados e salvos na camada Silver.


In [0]:
# Leitura e padronização dos dados de táxis Green
df_green_jan = (spark.read.options(**read_options).parquet(f"{bronze_path}green/year=2023/month=1")
    .withColumn("year", lit(2023)).withColumn("month", lit(1))
)
df_green_feb = (spark.read.options(**read_options).parquet(f"{bronze_path}green/year=2023/month=2")
    .withColumn("year", lit(2023)).withColumn("month", lit(2))
)
df_green_mar = (spark.read.options(**read_options).parquet(f"{bronze_path}green/year=2023/month=3")
    .withColumn("year", lit(2023)).withColumn("month", lit(3))
)
df_green_apr = (spark.read.options(**read_options).parquet(f"{bronze_path}green/year=2023/month=4")
    .withColumn("year", lit(2023)).withColumn("month", lit(4))
)
df_green_may = (spark.read.options(**read_options).parquet(f"{bronze_path}green/year=2023/month=5")
    .withColumn("year", lit(2023)).withColumn("month", lit(5))
)

df_green_unified = (df_green_jan.unionByName(df_green_feb, allowMissingColumns=True)
                             .unionByName(df_green_mar, allowMissingColumns=True)
                             .unionByName(df_green_apr, allowMissingColumns=True)
                             .unionByName(df_green_may, allowMissingColumns=True))

df_green_silver = (df_green_unified
    .withColumn("taxi_type", lit("green"))
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
    .withColumnRenamed("VendorID", "vendor_id")
    .select(
        col("taxi_type").cast(StringType()),
        col("vendor_id").cast(StringType()),
        coalesce(col("passenger_count").cast(DoubleType()), lit(0.0)).cast(IntegerType()).alias("passenger_count"),
        col("total_amount").cast(DoubleType()),
        col("pickup_datetime").cast(TimestampType()),
        col("dropoff_datetime").cast(TimestampType()),
        col("year").cast(IntegerType()),
        col("month").cast(IntegerType())
    )
    .withColumn("is_valid_total_amount", when(col("total_amount") > 0, True).otherwise(False))
    .withColumn("is_valid_passenger_count", when(col("passenger_count") > 0, True).otherwise(False))
    .withColumn("is_valid_trip_time", when(col("dropoff_datetime") > col("pickup_datetime"), True).otherwise(False))
)
#df_green_silver.write.options(**write_options).option("overwriteSchema", "true").format("delta").mode("overwrite").save(f"{silver_path}green/")
(df_green_silver.coalesce(1).write.options(**write_options)
    .option("overwriteSchema", "true")
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "month")
    .save(f"{silver_path}green/")
)
print("Dados de green padronizados e salvos na camada Silver.")


Dados de green padronizados e salvos na camada Silver.


In [0]:
# Leitura e padronização dos dados de táxis Green
df_fhv_jan = (spark.read.options(**read_options).parquet(f"{bronze_path}fhv/year=2023/month=1")
    .withColumn("year", lit(2023)).withColumn("month", lit(1))
)
df_fhv_feb = (spark.read.options(**read_options).parquet(f"{bronze_path}fhv/year=2023/month=2")
    .withColumn("year", lit(2023)).withColumn("month", lit(2))
)
df_fhv_mar = (spark.read.options(**read_options).parquet(f"{bronze_path}fhv/year=2023/month=3")
    .withColumn("year", lit(2023)).withColumn("month", lit(3))
)
df_fhv_apr = (spark.read.options(**read_options).parquet(f"{bronze_path}fhv/year=2023/month=4")
    .withColumn("year", lit(2023)).withColumn("month", lit(4))
)
df_fhv_may = (spark.read.options(**read_options).parquet(f"{bronze_path}fhv/year=2023/month=5")
    .withColumn("year", lit(2023)).withColumn("month", lit(5))
)
df_fhv_unified = (df_fhv_jan.unionByName(df_fhv_feb, allowMissingColumns=True)
                           .unionByName(df_fhv_mar, allowMissingColumns=True)
                           .unionByName(df_fhv_apr, allowMissingColumns=True)
                           .unionByName(df_fhv_may, allowMissingColumns=True))

df_fhv_silver = (df_fhv_unified
    .withColumn("taxi_type", lit("fhv"))
    .withColumn("pickup_datetime", col("pickup_datetime").cast(TimestampType()))
    .withColumn("dropoff_datetime", col("dropOff_datetime").cast(TimestampType()))
    .withColumnRenamed("dispatching_base_num", "vendor_id")
    .withColumn("passenger_count", lit(None).cast(IntegerType()))
    .withColumn("total_amount", lit(None).cast(DoubleType()))
    .select(
        col("taxi_type"),
        col("vendor_id"),
        col("passenger_count"),
        col("total_amount"),
        col("pickup_datetime"),
        col("dropoff_datetime"),
        col("year").cast(IntegerType()),
        col("month").cast(IntegerType())
    )
    .withColumn("is_valid_total_amount", lit(None).cast("boolean"))
    .withColumn("is_valid_passenger_count", lit(None).cast("boolean"))
    .withColumn("is_valid_trip_time", when(col("dropoff_datetime") > col("pickup_datetime"), True).otherwise(False))
)
#df_fhv_silver.write.options(**write_options).option("overwriteSchema", "true").format("delta").mode("overwrite").save(f"{silver_path}fhv/")
(df_fhv_silver.coalesce(1).write.options(**write_options)
    .option("overwriteSchema", "true")
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "month")
    .save(f"{silver_path}fhv/")
)
print("Dados de fhv padronizados e salvos na camada Silver.")



Dados de fhv padronizados e salvos na camada Silver.


In [0]:
# Leitura e padronização dos dados de táxis Green
df_fhvhv_jan = (spark.read.options(**read_options).parquet(f"{bronze_path}fhvhv/year=2023/month=1")
    .withColumn("year", lit(2023)).withColumn("month", lit(1))
)
df_fhvhv_feb = (spark.read.options(**read_options).parquet(f"{bronze_path}fhvhv/year=2023/month=2")
    .withColumn("year", lit(2023)).withColumn("month", lit(2))
)
df_fhvhv_mar = (spark.read.options(**read_options).parquet(f"{bronze_path}fhvhv/year=2023/month=3")
    .withColumn("year", lit(2023)).withColumn("month", lit(3))
)
df_fhvhv_apr = (spark.read.options(**read_options).parquet(f"{bronze_path}fhvhv/year=2023/month=4")
    .withColumn("year", lit(2023)).withColumn("month", lit(4))
)
df_fhvhv_may = (spark.read.options(**read_options).parquet(f"{bronze_path}fhvhv/year=2023/month=5")
    .withColumn("year", lit(2023)).withColumn("month", lit(5))
)
df_fhvhv_unified = (df_fhvhv_jan.unionByName(df_fhvhv_feb, allowMissingColumns=True)
                           .unionByName(df_fhvhv_mar, allowMissingColumns=True)
                           .unionByName(df_fhvhv_apr, allowMissingColumns=True)
                           .unionByName(df_fhvhv_may, allowMissingColumns=True))

df_fhvhv_silver = (df_fhvhv_unified
    .withColumn("taxi_type", lit("fhvhv"))
    .withColumn("pickup_datetime", col("pickup_datetime").cast(TimestampType()))
    .withColumn("dropoff_datetime", col("dropOff_datetime").cast(TimestampType()))
    .withColumnRenamed("dispatching_base_num", "vendor_id")
    .withColumn("passenger_count", lit(None).cast(IntegerType()))
    .withColumn("total_amount", lit(None).cast(DoubleType()))
    .select(
        col("taxi_type"),
        col("vendor_id"),
        col("passenger_count"),
        col("total_amount"),
        col("pickup_datetime"),
        col("dropoff_datetime"),
        col("year").cast(IntegerType()),
        col("month").cast(IntegerType())
    )
    .withColumn("is_valid_total_amount", lit(None).cast("boolean"))
    .withColumn("is_valid_passenger_count", lit(None).cast("boolean"))
    .withColumn("is_valid_trip_time", when(col("dropoff_datetime") > col("pickup_datetime"), True).otherwise(False))
)
#df_fhvhv_silver.write.options(**write_options).option("overwriteSchema", "true").format("delta").mode("overwrite").save(f"{silver_path}fhvhv/")
(df_fhvhv_silver.coalesce(1).write.options(**write_options)
    .option("overwriteSchema", "true")
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "month")
    .save(f"{silver_path}fhvhv/")
)
print("Dados de fhvhv padronizados e salvos na camada Silver.")

Dados de fhvhv padronizados e salvos na camada Silver.
