# Pipeline Silver — E-commerce Brasileiro

**Objetivo:** realizar cleansing, normalização e enriquecimento dos dados da camada Bronze e persistir os resultados na camada Silver (Delta tables).

**Entradas:** tabelas na camada `workspace.bronze` (ft_* e dm_*).

**Saídas:** tabelas transformadas na camada `workspace.silver` prontas para consumo analítico / downstream.

**Como usar:** executar as células sequencialmente em um ambiente com Spark/Delta configurado.

---

In [0]:
# imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, row_number, desc, when, datediff, current_timestamp, to_timestamp, to_date, min, max, expr, explode, sequence, last, round, sum
from pyspark.sql.window import Window

# criação da sessão do Spark
spark = SparkSession.builder.appName("SilverLayer").getOrCreate()

In [0]:
# paths dos schemas
bronze_path = "workspace.bronze"
silver_path = "workspace.silver"

## Tratamento dos Dados

Nesta seção realizamos:
- Tradução de nomes e valores (EN -> PT),
- Normalização de textos (uppercase),
- Conversão de tipos (timestamp, decimal, int),
- Cálculos e colunas derivadas (ex.: tempo de entrega),
- Deduplicação (Last-In-Wins) quando aplicável.

Cada bloco carrega a tabela da camada Bronze, aplica transformações e grava a tabela resultante na camada Silver em formato Delta.


### ft_consumidores

Descrição: normaliza informações do cliente (cep, cidade, estado) e garante uma única versão por consumidor (ultima ingestão).

In [0]:
df_consumidores = spark.read.table(f"{bronze_path}.ft_consumidores")

In [0]:
# seleção dos dados necessários
df_transformed = df_consumidores.select(
    col("customer_id").alias("id_consumidor"),
    col("customer_zip_code_prefix").alias("prefixo_cep"),
    col("customer_city").alias("cidade"),
    col("customer_state").alias("estado"),
    col("ingestion_timestamp")
)


In [0]:
# colocando a cidade e estado em upper case
df_transformed = (
        df_transformed
        .withColumn("cidade", upper(col("cidade")))
        .withColumn("estado", upper(col("estado")))
    )

In [0]:
# aplica lógica Last-In-Wins por consumidor: mantemos apenas a linha de maior ingestion_timestamp
windowSpec = Window.partitionBy("id_consumidor").orderBy(col("ingestion_timestamp").desc())

# rn = 1 => registro mais recente, que é o registro que queremos manter
df_deduplicado = (
    df_transformed
    .withColumn("rn", row_number().over(windowSpec))
    .filter(col("rn") == 1)
    .drop("rn", "ingestion_timestamp") # removendo coluna auxiliar
    )

# aplicar transformações de texto (Upper Case)
df_final = df_deduplicado.withColumn("cidade", upper(col("cidade"))) \
                         .withColumn("estado", upper(col("estado")))

df_final.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_consumidores")

display(df_final.limit(10))

id_consumidor,prefixo_cep,cidade,estado
00012a2ce6f8dcda20d059ce98491703,6273,OSASCO,SP
000161a058600d5901f007fab4c27140,35550,ITAPECERICA,MG
0001fd6190edaaf884bcaf3d49edf079,29830,NOVA VENECIA,ES
0002414f95344307404f0ace7a26f1d5,39664,MENDONCA,MG
000379cdec625522490c315e70c7a9fb,4841,SAO PAULO,SP
0004164d20a9e969af783496f3408652,13272,VALINHOS,SP
000419c5494106c306a97b5635748086,24220,NITEROI,RJ
00046a560d407e99b969756e0b10f282,20540,RIO DE JANEIRO,RJ
00050bf6e01e69d5c0fd612f1bcfb69c,98700,IJUI,RS
000598caf2ef4117407665ac33275130,35540,OLIVEIRA,MG


### ft_pedidos

Descrição: trazemos timestamps, traduzimos status, calculamos tempos de entrega e sinalizamos se a entrega ocorreu no prazo.

In [0]:

df_pedidos = spark.read.table(f"{bronze_path}.ft_pedidos")

# selecionando os dados necessarios, além de convertendo timestamps para garantir o tipo correto
df_mapeado = df_pedidos.select(
    col("order_id").alias("id_pedido"),
    col("customer_id").alias("id_consumidor"),
    col("order_status").alias("status_original"),
    col("order_purchase_timestamp").cast("timestamp").alias("pedido_compra_timestamp"),
    col("order_approved_at").cast("timestamp").alias("pedido_aprovado_timestamp"),
    col("order_delivered_carrier_date").cast("timestamp").alias("pedido_carregado_timestamp"),
    col("order_delivered_customer_date").cast("timestamp").alias("pedido_entregue_timestamp"),
    col("order_estimated_delivery_date").cast("timestamp").alias("pedido_estimativa_entrega_timestamp")
)

# tradução dos valores de status para português, mantendo valor original como fallback
df_traduzido = df_mapeado.withColumn(
    "status",
    when(col("status_original") == "delivered", "entregue")
    .when(col("status_original") == "invoiced", "faturado")
    .when(col("status_original") == "shipped", "enviado")
    .when(col("status_original") == "processing", "em processamento")
    .when(col("status_original") == "unavailable", "indisponível")
    .when(col("status_original") == "canceled", "cancelado")
    .when(col("status_original") == "created", "criado")
    .when(col("status_original") == "approved", "aprovado")
    .otherwise(col("status_original"))
).drop("status_original")

# cálculo de métricas de tempo entre eventos (em dias)
df_calculado = (
    df_traduzido
    # diff em dias entre compra e entrega
    .withColumn("tempo_entrega_dias", datediff(col("pedido_entregue_timestamp"), col("pedido_compra_timestamp")))

    # diff em dias entre compra e estimativa de entrega
    .withColumn("tempo_entrega_estimado_dias", datediff(col("pedido_estimativa_entrega_timestamp"), col("pedido_compra_timestamp")))

    # diff em dias entre entrega e estimativa de entrega
    .withColumn("diferenca_entrega_dias", datediff(col("pedido_entregue_timestamp"), col("pedido_estimativa_entrega_timestamp")))
    )

# flag se entrega ocorreu no prazo (Não Entregue / Sim / Não)
df_final = df_calculado.withColumn(
    "entrega_no_prazo",
    when(col("pedido_entregue_timestamp").isNull(), "Não Entregue")
    .when(col("diferenca_entrega_dias") <= 0, "Sim")
    .otherwise("Não")
)

df_final.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_pedidos")

display(df_final.limit(10))

id_pedido,id_consumidor,pedido_compra_timestamp,pedido_aprovado_timestamp,pedido_carregado_timestamp,pedido_entregue_timestamp,pedido_estimativa_entrega_timestamp,status,tempo_entrega_dias,tempo_entrega_estimado_dias,diferenca_entrega_dias,entrega_no_prazo
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,2017-10-02T10:56:33.000Z,2017-10-02T11:07:15.000Z,2017-10-04T19:55:00.000Z,2017-10-10T21:25:13.000Z,2017-10-18T00:00:00.000Z,entregue,8.0,16,-8.0,Sim
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,2018-07-24T20:41:37.000Z,2018-07-26T03:24:27.000Z,2018-07-26T14:31:00.000Z,2018-08-07T15:27:45.000Z,2018-08-13T00:00:00.000Z,entregue,14.0,20,-6.0,Sim
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,2018-08-08T08:38:49.000Z,2018-08-08T08:55:23.000Z,2018-08-08T13:50:00.000Z,2018-08-17T18:06:29.000Z,2018-09-04T00:00:00.000Z,entregue,9.0,27,-18.0,Sim
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,2017-11-18T19:28:06.000Z,2017-11-18T19:45:59.000Z,2017-11-22T13:39:59.000Z,2017-12-02T00:28:42.000Z,2017-12-15T00:00:00.000Z,entregue,14.0,27,-13.0,Sim
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,2018-02-13T21:18:39.000Z,2018-02-13T22:20:29.000Z,2018-02-14T19:46:34.000Z,2018-02-16T18:17:02.000Z,2018-02-26T00:00:00.000Z,entregue,3.0,13,-10.0,Sim
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,2017-07-09T21:57:05.000Z,2017-07-09T22:10:13.000Z,2017-07-11T14:58:04.000Z,2017-07-26T10:57:55.000Z,2017-08-01T00:00:00.000Z,entregue,17.0,23,-6.0,Sim
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,2017-04-11T12:22:08.000Z,2017-04-13T13:25:17.000Z,,,2017-05-09T00:00:00.000Z,faturado,,28,,Não Entregue
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,2017-05-16T13:10:30.000Z,2017-05-16T13:22:11.000Z,2017-05-22T10:07:46.000Z,2017-05-26T12:55:51.000Z,2017-06-07T00:00:00.000Z,entregue,10.0,22,-12.0,Sim
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,2017-01-23T18:29:09.000Z,2017-01-25T02:50:47.000Z,2017-01-26T14:16:31.000Z,2017-02-02T14:08:10.000Z,2017-03-06T00:00:00.000Z,entregue,10.0,42,-32.0,Sim
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,2017-07-29T11:55:02.000Z,2017-07-29T12:05:32.000Z,2017-08-10T19:45:24.000Z,2017-08-16T17:14:30.000Z,2017-08-23T00:00:00.000Z,entregue,18.0,25,-7.0,Sim


### ft_itens_pedidos

Descrição: padroniza tipos numéricos e renomeia colunas (preço e frete em decimal).

In [0]:
df_itens = spark.read.table(f"{bronze_path}.ft_itens_pedidos")

# selecionando colunas necessarias
df_transformed = df_itens.select(
    col("order_id").alias("id_pedido"),
    col("order_item_id").cast("int").alias("id_item"),
    col("product_id").alias("id_produto"),
    col("seller_id").alias("id_vendedor"),
    col("price").cast("decimal(12,2)").alias("preco_BRL"),
    col("freight_value").cast("decimal(12,2)").alias("preco_frete")
)

df_transformed.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_itens_pedidos")

display(df_transformed.limit(10))

id_pedido,id_item,id_produto,id_vendedor,preco_BRL,preco_frete
00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,58.9,13.29
00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,239.9,19.93
000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,199.0,17.87
00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,12.99,12.79
00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,199.9,18.14
00048cc3ae777c65dbb7d2a0634bc1ea,1,ef92defde845ab8450f9d70c526ef70f,6426d21aca402a131fc0a5d0960a3c90,21.9,12.69
00054e8431b9d7675808bcb819fb4a32,1,8d4f2bb7e93e6710a28f34fa83ee7d28,7040e82f899a04d1b434b795a43b4617,19.9,11.85
000576fe39319847cbb9d288c5617fa6,1,557d850972a7d6f792fd18ae1400d9b6,5996cddab893a4652a15592fb58ab8db,810.0,70.75
0005a1a1728c9d785b8e2b08b904576c,1,310ae3c140ff94b03219ad0adc3c778f,a416b6a846a11724393025641d4edd5e,145.95,11.65
0005f50442cb953dcd1d21e1fb923495,1,4535b0e1091c278dfd193e5a1d63b39f,ba143b05f0110f0dc71ad71b4466ce92,53.99,11.4


### ft_pagamentos

Descrição: padroniza formas de pagamento e tipos numéricos (parcelas e valor).

In [0]:
df_pagamentos = spark.read.table(f"{bronze_path}.ft_pagamentos_pedidos")

df_transformed = df_pagamentos.select(
    col("order_id").alias("id_pedido"),
    col("payment_sequential").alias("codigo_pagamento"),
    
    # tradução e padronização do tipo de pagamento
    when(col("payment_type") == "credit_card", "Cartão de Crédito")
    .when(col("payment_type") == "boleto", "Boleto")
    .when(col("payment_type") == "voucher", "Voucher")
    .when(col("payment_type") == "debit_card", "Cartão de Débito")
    .otherwise("Outro")
    .alias("forma_pagamento"),
    col("payment_installments").cast("int").alias("parcelas"),
    col("payment_value").cast("decimal(12,2)").alias("valor_pagamento")
)

df_transformed.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_pagamentos_pedidos")

display(df_transformed.limit(10))

id_pedido,codigo_pagamento,forma_pagamento,parcelas,valor_pagamento
b81ef226f3fe1789b1e8b2acac839d17,1,Cartão de Crédito,8,99.33
a9810da82917af2d9aefd1278f1dcfa0,1,Cartão de Crédito,1,24.39
25e8ea4e93396b6fa0d3dd708e76c1bd,1,Cartão de Crédito,1,65.71
ba78997921bbcdc1373bb41e913ab953,1,Cartão de Crédito,8,107.78
42fdf880ba16b47b59251dd489d4441a,1,Cartão de Crédito,2,128.45
298fcdf1f73eb413e4d26d01b25bc1cd,1,Cartão de Crédito,2,96.12
771ee386b001f06208a7419e4fc1bbd7,1,Cartão de Crédito,1,81.16
3d7239c394a212faae122962df514ac7,1,Cartão de Crédito,3,51.84
1f78449c87a54faf9e96e88ba1491fa9,1,Cartão de Crédito,6,341.09
0573b5e23cbd798006520e1d5b4c6714,1,Boleto,1,51.95


### ft_avaliacoes_pedidos

Descrição: valida e converte timestamps de avaliações, remove registros inválidos e normaliza campos de texto.

In [0]:
df_avaliacoes = spark.read.table(f"{bronze_path}.ft_avaliacoes_pedidos")
total_linhas_inicial = df_avaliacoes.count()

# tenta converter strings para timestamp sem falhar (no caso de falha, o valor vira null)
df_typed = (
    df_avaliacoes
    .withColumn("review_creation_date", expr("try_cast(review_creation_date as timestamp)"))
    .withColumn("review_answer_timestamp", expr("try_cast(review_answer_timestamp as timestamp)"))
)

# filtra registros essencialmente válidos (order_id e data de criação presentes e não futura)
df_clean = df_typed.filter(
    (col("order_id").isNotNull()) &                        
    (col("review_creation_date").isNotNull()) &            
    (col("review_creation_date") <= current_timestamp())   
)

total_linhas_final = df_clean.count()
linhas_removidas = total_linhas_inicial - total_linhas_final

# relatório simples de limpeza, mostrando quantos registros foram removidos
print(f"--- Relatório de Limpeza ---")
print(f"Total inicial de registros: {total_linhas_inicial}")
print(f"Total após limpeza: {total_linhas_final}")
print(f"Registros removidos (Dados Incorretos/Nulos): {linhas_removidas}")

# seleciona e renomeia colunas de saída
df_final = df_clean.select(
    col("review_id").alias("id_avaliacao"),
    col("order_id").alias("id_pedido"),
    expr("try_cast(review_score as int)").alias("avaliacao"), 
    col("review_comment_title").alias("titulo_comentario"),
    col("review_comment_message").alias("comentario"),
    col("review_creation_date").alias("data_comentario"),
    col("review_answer_timestamp").alias("data_resposta")
)

df_final.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_avaliacoes_pedidos")

display(df_final.limit(10))

--- Relatório de Limpeza ---
Total inicial de registros: 104162
Total após limpeza: 95330
Registros removidos (Dados Incorretos/Nulos): 8832


id_avaliacao,id_pedido,avaliacao,titulo_comentario,comentario,data_comentario,data_resposta
7bc2406110b926393aa56f80a40eba40,73fc7af87114b39712e6da79b0a377eb,4,,,2018-01-18T00:00:00.000Z,2018-01-18T21:46:59.000Z
80e641a11e56f04c1ad469d5645fdfde,a548910a1c6147796b98fdf73dbeba33,5,,,2018-03-10T00:00:00.000Z,2018-03-11T03:05:13.000Z
228ce5500dc1d8e020d8d1322874b6f0,f9e4b658b201a9f2ecdecbb34bed034b,5,,,2018-02-17T00:00:00.000Z,2018-02-18T14:36:24.000Z
e64fb393e7b32834bb789ff8bb30750e,658677c97b385a9be170737859d3511b,5,,Recebi bem antes do prazo estipulado.,2017-04-21T00:00:00.000Z,2017-04-21T22:02:06.000Z
f7c4243c7fe1938f181bec41a392bdeb,8e6bfb81e283fa7e4f11123a3fb894f1,5,,Parabéns lojas lannister adorei comprar pela Internet seguro e prático Parabéns a todos feliz Páscoa,2018-03-01T00:00:00.000Z,2018-03-02T10:26:53.000Z
15197aa66ff4d0650b5434f1b46cda19,b18dcdf73be66366873cd26c5724d1dc,1,,,2018-04-13T00:00:00.000Z,2018-04-16T00:39:37.000Z
07f9bee5d1b850860defd761afa7ff16,e48aa0d2dcec3a2e87348811bcfdf22b,5,,,2017-07-16T00:00:00.000Z,2017-07-18T19:30:34.000Z
7c6400515c67679fbee952a7525281ef,c31a859e34e3adac22f376954e19b39d,5,,,2018-08-14T00:00:00.000Z,2018-08-14T21:36:06.000Z
a3f6f7f6f433de0aefbb97da197c554c,9c214ac970e84273583ab523dfafd09b,5,,,2017-05-17T00:00:00.000Z,2017-05-18T12:05:37.000Z
8670d52e15e00043ae7de4c01cc2fe06,b9bf720beb4ab3728760088589c62129,4,recomendo,aparelho eficiente. no site a marca do aparelho esta impresso como 3desinfector e ao chegar esta com outro nome...atualizar com a marca correta uma vez que é o mesmo aparelho,2018-05-22T00:00:00.000Z,2018-05-23T16:45:47.000Z


### ft_produtos

Padronização de dimensões físicas e categorias do produto (casts para int/decimal e renomeação).

In [0]:
df_produtos = spark.read.table(f"{bronze_path}.ft_produtos")

# seleciona e renomeia colunas de saída, além de converter dimensões para inteiros
df_transformed = df_produtos.select(
    col("product_id").alias("id_produto"),
    col("product_category_name").alias("categoria_produto"),
    col("product_weight_g").cast("int").alias("peso_produto_gramas"),
    col("product_length_cm").cast("int").alias("comprimento_centimetros"),
    col("product_height_cm").cast("int").alias("altura_centimetros"),
    col("product_width_cm").cast("int").alias("largura_centimetros")
)

df_transformed.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_produtos")

display(df_transformed.limit(10))

id_produto,categoria_produto,peso_produto_gramas,comprimento_centimetros,altura_centimetros,largura_centimetros
1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,225,16,10,14
3aa071139cb16b67ca9e5dea641aaa2f,artes,1000,30,18,20
96bd76ec8810374ed1b65e291975717f,esporte_lazer,154,18,9,15
cef67bcfe19066a932b7673e239eb23d,bebes,371,26,4,26
9dc1a7de274444849c219cff195d0b71,utilidades_domesticas,625,20,17,13
41d3672d4792049fa1779bb35283ed13,instrumentos_musicais,200,38,5,11
732bd381ad09e530fe0a5f457d81becb,cool_stuff,18350,70,24,44
2548af3e6e77a690cf3eb6368e9ab61e,moveis_decoracao,900,40,8,40
37cc742be07708b53a98702e77a21a02,eletrodomesticos,400,27,13,17
8c92109888e8cdf9d66dc7e463025574,brinquedos,600,17,10,12


### ft_vendedores

Padroniza informações dos vendedores (cidade/estado em uppercase e renomeação de colunas).

In [0]:
df_vendedores = spark.read.table(f"{bronze_path}.ft_vendedores")

df_mapeado = df_vendedores.select(
    col("seller_id").alias("id_vendedor"),
    col("seller_zip_code_prefix").alias("prefixo_cep"),
    col("seller_city").alias("cidade"),
    col("seller_state").alias("estado")
)

# aplica transformações de texto (Upper Case)
df_final = (
    df_mapeado
    .withColumn("cidade", upper(col("cidade")))
    .withColumn("estado", upper(col("estado")))
)

df_final.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_vendedores")

display(df_final.limit(10))

id_vendedor,prefixo_cep,cidade,estado
3442f8959a84dea7ee197c632cb2df15,13023,CAMPINAS,SP
d1b65fc7debc3361ea86b5f14c68d2e2,13844,MOGI GUACU,SP
ce3ad9de960102d0677a81f5d0bb7b2d,20031,RIO DE JANEIRO,RJ
c0f3eea2e14555b6faeea3dd58c1b1c3,4195,SAO PAULO,SP
51a04a8a6bdcb23deccc82b0b80742cf,12914,BRAGANCA PAULISTA,SP
c240c4061717ac1806ae6ee72be3533b,20920,RIO DE JANEIRO,RJ
e49c26c3edfa46d227d5121a6b6e4d37,55325,BREJAO,PE
1b938a7ec6ac5061a66a3766e0e75f90,16304,PENAPOLIS,SP
768a86e36ad6aae3d03ee3c6433d61df,1529,SAO PAULO,SP
ccc4bbb5f32a6ab2b7066a4130f114e3,80310,CURITIBA,PR


### dm_categoria_produtos_traducao

Descrição: tabela de mapeamento de categorias de produtos entre Português (PT) e Inglês (EN)

In [0]:
df_categorias = spark.read.table(f"{bronze_path}.dm_categoria_produtos_traducao")

# seleciona e renomeia colunas de saída
df_transformed = df_categorias.select(
    col("product_category_name").alias("nome_produto_pt"),
    col("product_category_name_english").alias("nome_produto_en")
)

df_transformed.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.dm_categoria_produtos_traducao")

display(df_transformed.limit(10))

nome_produto_pt,nome_produto_en
beleza_saude,health_beauty
informatica_acessorios,computers_accessories
automotivo,auto
cama_mesa_banho,bed_bath_table
moveis_decoracao,furniture_decor
esporte_lazer,sports_leisure
perfumaria,perfumery
utilidades_domesticas,housewares
telefonia,telephony
relogios_presentes,watches_gifts


### dm_cotacao_dolar

Gera um calendário de datas entre a primeira e a última cotação disponível e preenche valores nulos com o último valor conhecido (forward-fill).

In [0]:
# carrega cotações e gera calendário completo com forward-fill (preenchimento dos valores faltantes)
df_cotacao = (
    spark.read.table(f"{bronze_path}.dm_cotacao_dolar")
    .select(
        to_date(col("dataHoraCotacao")).alias("data"),
        col("cotacaoCompra").cast("decimal(10,4)").alias("cotacao_dolar")
    )
)

# identifica intervalo de datas disponível, para criar calendário completo
datas_limite = df_cotacao.select(min("data").alias("min_data"), max("data").alias("max_data")).collect()[0]

# extrai datas para variáveis
data_inicio, data_fim = datas_limite["min_data"], datas_limite["max_data"]

# cria sequência diária entre as datas e faz left join com as cotações
df_calendario = spark.sql(f"""
    SELECT explode(sequence(to_date('{data_inicio}'), to_date('{data_fim}'), interval 1 day)) as data
""")

# junta calendário com cotações, mantendo todas as datas do calendário
df_completo = df_calendario.join(df_cotacao, "data", "left")

# window para forward-fill (ultimo valor não-nulo até a linha atual)
window_spec = Window.orderBy("data").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# aplica forward-fill
df_silver = df_completo.withColumn(
    "cotacao_dolar", 
    last("cotacao_dolar", ignorenulls=True).over(window_spec)
)

df_silver.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.dm_cotacao_dolar")

display(df_silver.orderBy("data").limit(10))



data,cotacao_dolar
1984-12-03,2814.0
1984-12-03,2814.0
1984-12-04,2867.0
1984-12-05,2908.0
1984-12-05,2867.0
1984-12-05,2867.0
1984-12-06,2908.0
1984-12-07,2908.0
1984-12-08,2908.0
1984-12-09,2908.0


## Validações

Nesta seção executamos checks simples para garantir integridade referencial entre as tabelas Silver (ex.: pedidos com consumidores existentes, itens com pedidos existentes).
Em casos detectados, preferimos limpar as tabelas afetadas e persistir a versão corrigida.

In [0]:
# carrega tabelas Silver para validações
df_pedidos = spark.read.table(f"{silver_path}.ft_pedidos")
df_consumidores = spark.read.table(f"{silver_path}.ft_consumidores")
df_itens = spark.read.table(f"{silver_path}.ft_itens_pedidos")

In [0]:
# CASO 1: Pedidos sem consumidor correspondente (left_anti identifica órfãos)
df_pedidos_orfaos = df_pedidos.join(df_consumidores, "id_consumidor", "left_anti")
qtd_pedidos_orfaos = df_pedidos_orfaos.count()

print(f"1. Verificação de Pedidos Órfãos:")
print(f"   Quantidade encontrada: {qtd_pedidos_orfaos}")

if qtd_pedidos_orfaos > 0:
    print("   Removendo registros órfãos da tabela ft_pedidos...")

    # left_semi mantém apenas registros com correspondência (filtra órfãos)
    df_pedidos_limpo = df_pedidos.join(df_consumidores, "id_consumidor", "left_semi")

    # sobrescreve tabela com dados limpos
    df_pedidos_limpo.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_pedidos")
    df_pedidos = df_pedidos_limpo 
else:
    print("   Nenhuma limpeza necessária.")

1. Verificação de Pedidos Órfãos:
   Quantidade encontrada: 0
   Nenhuma limpeza necessária.


In [0]:
# CASO 2: Itens sem pedido correspondente (verificação análoga ao caso anterior)
df_itens_orfaos = df_itens.join(df_pedidos, "id_pedido", "left_anti")
qtd_itens_orfaos = df_itens_orfaos.count()

print(f"\n2. Verificação de Itens Órfãos:")
print(f"   Quantidade encontrada: {qtd_itens_orfaos}")

if qtd_itens_orfaos > 0:
    print("   Removendo registros órfãos da tabela ft_itens_pedidos...")

    # left_semi mantém apenas registros com correspondência (filtra órfãos)
    df_itens_limpo = df_itens.join(df_pedidos, "id_pedido", "left_semi")

    # sobrescreve tabela com dados limpos
    df_itens_limpo.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_itens_pedidos")
else:
    print("   Nenhuma limpeza necessária.")

print("\nValidações concluídas com sucesso.")


2. Verificação de Itens Órfãos:
   Quantidade encontrada: 0
   Nenhuma limpeza necessária.

Validações concluídas com sucesso.


## silver.ft_pedido_total

Agrega pagamentos por pedido, junta com pedidos e aplica conversão BRL -> USD usando a cotação do dia do pedido.

In [0]:
# monta ft_pedido_total agregando pagamentos e convertendo para USD com a cotação do dia
df_pedidos = spark.read.table(f"{silver_path}.ft_pedidos")
df_pagamentos = spark.read.table(f"{silver_path}.ft_pagamentos_pedidos")
df_cotacao = spark.read.table(f"{silver_path}.dm_cotacao_dolar")

# soma pagamentos por pedido (valor em BRL)
df_pagamentos_agrupado = (
    df_pagamentos
    .groupBy("id_pedido")
    .agg(sum("valor_pagamento").alias("valor_total_pago_brl"))
)

# prepara pedidos e extrai data do pedido para casar com a cotação
df_pedidos_prep = df_pedidos.select(
    "id_pedido", 
    "id_consumidor", 
    "status", 
    "pedido_compra_timestamp"
).withColumn("data_pedido", to_date("pedido_compra_timestamp"))

df_join_pagamentos = df_pedidos_prep.join(df_pagamentos_agrupado, "id_pedido", "left")

# junta cotação pela data do pedido (pode haver cotação nula se data fora do intervalo; tratar conforme necessidade)
df_final_join = df_join_pagamentos.join(
    df_cotacao, 
    df_join_pagamentos.data_pedido == df_cotacao.data, 
    "left"
)

# calcula valor em USD (divisão por cotação); resultado cast para decimal(12,2)
df_resultado = df_final_join.select(
    df_join_pagamentos["id_pedido"],
    df_join_pagamentos["id_consumidor"],
    df_join_pagamentos["status"],
    col("valor_total_pago_brl"),
    (col("valor_total_pago_brl") / col("cotacao_dolar")).cast("decimal(12,2)").alias("valor_total_pago_usd"),
    df_join_pagamentos["data_pedido"]
)

df_resultado.write.format("delta").mode("overwrite").saveAsTable(f"{silver_path}.ft_pedido_total")

display(df_resultado.limit(10))

id_pedido,id_consumidor,status,valor_total_pago_brl,valor_total_pago_usd,data_pedido
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,entregue,38.71,12.24,2017-10-02
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,entregue,141.46,37.77,2018-07-24
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,entregue,179.12,47.75,2018-08-08
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,entregue,72.2,22.02,2017-11-18
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,entregue,28.62,8.72,2018-02-13
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,entregue,175.26,53.29,2017-07-09
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,faturado,65.95,20.99,2017-04-11
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,entregue,75.16,24.31,2017-05-16
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,entregue,35.95,11.38,2017-01-23
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,entregue,169.76,53.98,2017-07-29
