Objetivo do Notebook é gerar dados da camada Gold, como:

Receita total por mês

Número de pedidos por canal

Ticket médio por loja

Distância média de entrega

Total de pedidos por cidade

Entregas por motorista

In [0]:
from pyspark.sql.functions import col, to_timestamp, concat_ws

# Caminhos das camadas Bronze e Silver
bronze_base = "/Volumes/workspace/default/ifood-files/bronze/"
silver_base = "/Volumes/workspace/default/ifood-files/silver/delivery_center_enriched"

# Leitura das tabelas Bronze
channels   = spark.read.format("delta").load(f"{bronze_base}channels")
deliveries = spark.read.format("delta").load(f"{bronze_base}deliveries")
drivers    = spark.read.format("delta").load(f"{bronze_base}drivers")
hubs       = spark.read.format("delta").load(f"{bronze_base}hubs")
orders     = spark.read.format("delta").load(f"{bronze_base}orders")
payments   = spark.read.format("delta").load(f"{bronze_base}payments")
stores     = spark.read.format("delta").load(f"{bronze_base}stores")

# Criar timestamp do pedido
orders = orders.withColumn(
    "order_purchase_timestamp",
    to_timestamp(concat_ws("-", "order_created_year", "order_created_month", "order_created_day"))
)

# JOINs com aliases
df = orders.alias("o") \
    .join(payments.alias("p"), col("o.order_id") == col("p.payment_order_id"), "left") \
    .join(stores.alias("s"), col("o.store_id") == col("s.store_id"), "left") \
    .join(channels.alias("c"), col("o.channel_id") == col("c.channel_id"), "left") \
    .join(deliveries.alias("d"), col("o.order_id") == col("d.delivery_order_id"), "left") \
    .join(drivers.alias("dr"), col("d.driver_id") == col("dr.driver_id"), "left") \
    .join(hubs.alias("h"), col("s.hub_id") == col("h.hub_id"), "left")

# Seleção das colunas relevantes
df = df.select(
    col("o.order_id").alias("order_id"),
    col("o.order_status").alias("order_status"),
    col("order_purchase_timestamp"),
    col("p.payment_method").alias("payment_method"),
    col("p.payment_amount").alias("payment_amount"),
    col("s.store_id").alias("store_id"),
    col("s.store_name").alias("store_name"),
    col("s.hub_id").alias("hub_id"),
    col("c.channel_id").alias("channel_id"),
    col("c.channel_type").alias("channel_type"),
    col("d.delivery_id").alias("delivery_id"),
    col("d.delivery_distance_meters").alias("delivery_distance_meters"),
    col("dr.driver_id").alias("driver_id"),
    col("dr.driver_type").alias("driver_type"),  # ← substituto de driver_name
    col("h.hub_city").alias("hub_city"),
    col("h.hub_state").alias("hub_state")
)

# Remover duplicatas
df = df.dropDuplicates(["order_id"])

# Escrita na camada Silver
df.write \
  .format("delta") \
  .option("mergeSchema", "true") \
  .mode("overwrite") \
  .save(silver_base)
