## Atividade 3 - Camada Gold

In [0]:
catalogo = "medalhao"
schema_gold  = "gold"

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

df_pedido_total_silver = spark.table("medalhao.silver.pedido_total_silver")
df_consumidores_silver = spark.table("medalhao.silver.consumidores_silver")
df_pedidos_silver = spark.table("medalhao.silver.pedidos_silver")
df_itens_pedidos_silver = spark.table("medalhao.silver.pedidos_itens_silver")
df_produtos_silver = spark.table("medalhao.silver.produtos_silver")
df_vendedores_silver = spark.table("medalhao.silver.vendedores_silver")
df_avaliacao_silver = spark.table("medalhao.silver.avaliacoes_pedidos_validos_silver")
df_cotacao_silver = spark.table("medalhao.silver.cotacao_dolar_silver")

### Função para reutilização de escrita
- Todas as tabelas a serem salvas nessa camada vão chamar essa função para se aproveitar de reutilização de código

In [0]:
def write_to_delta_table(
    df,
    table_name,
    mode="overwrite",
    overwrite_schema=True
):
    df.write \
        .format("delta") \
        .mode(mode) \
        .option("overwriteSchema", str(overwrite_schema).lower()) \
        .saveAsTable(f"{catalogo}.{schema_gold}.{table_name}")
    print(f"✅ {schema_gold}.{table_name} criada com sucesso!\n")

### Atividade 1.1 - gold.ft_vendas_consumidor_local
- silver_pedido_total já contém essa associação de pedido e consumidor. Portanto, é uma tabela essencial para essa fato
- Já silver_consumidor contem informações sobre o consumidor que vão ser relevantes para essa fato como: estado, cidade e etc.
- silver_pedido_total já tem o valor total do pedido em valor_total_pago_brl

In [0]:
ft_vendas_consumidor_local = (df_pedido_total_silver
                                .join
                                (df_consumidores_silver, on="id_consumidor")
                                .select
                                (
                                    F.col("id_consumidor"),
                                    F.col("id_pedido"),
                                    F.col("data_pedido"),
                                    F.col("valor_total_pago_brl").alias("valor_total_pedido_brl"),
                                    F.col("estado"),
                                    F.col("cidade"),
                                )
                            )

ft_vendas_consumidor_local.printSchema()


write_to_delta_table(ft_vendas_consumidor_local, "ft_vendas_consumidor_local")

### Atividade 1.2 - gold.view_total_compras_por_consumidor
- Utilizei a fato ft_vendas_consumidor_local para agrupar cidade e estado em combinações únicos, permitindo encontrar a quantidade de vendas de cada localidade

In [0]:
df_total_compras_por_consumidor = (ft_vendas_consumidor_local.
                                   groupBy("cidade", "estado")
                                   .agg(
                                        F.sum("valor_total_pedido_brl").alias("valor_total_localidade"),
                                        F.count("id_pedido").alias("quantidade_vendas")
                                   ))


display(df_total_compras_por_consumidor.limit(10))

df_total_compras_por_consumidor.createOrReplaceTempView("view_total_compras_por_consumidor")

- A consulta agrupa por estado e faz um ORDER BY DESC, permitindo visualizar os estados com maiores quantidade de vendas

In [0]:
df_total_vendas_por_estado = spark.sql("SELECT estado, COUNT(quantidade_vendas) FROM view_total_compras_por_consumidor GROUP BY estado ORDER BY 2 DESC")

### Atividade 2.1 - gold.ft_atrasos_pedidos_local_vendedor
- Juntando as tabelas silver_pedidos, silver_itens_pedidos e silver_consumidores para criar uma fato com dados lógisticas sobre pedidos, que vai ajudar nas criação das views consolidadas sobre o projeto 2

In [0]:
ft_atrasos_pedidos_local_vendedor = (
    df_consumidores_silver
    .join(df_pedidos_silver, on="id_consumidor")
    .join(df_itens_pedidos_silver, on="id_pedido")
    .select(
        F.col("id_pedido"),
        F.col("id_consumidor"),
        F.col("id_vendedor"),
        F.col("entrega_no_prazo"),
        F.col("tempo_entrega_dias"),
        F.col("tempo_entrega_estimado_dias"),
        F.col("cidade"),
        F.col("estado")
    )
)

ft_atrasos_pedidos_local_vendedor.printSchema()

write_to_delta_table(ft_atrasos_pedidos_local_vendedor, "ft_atrasos_pedidos_local_vendedor")

### Atividade 2.2.1 - gold.view_tempo_medio_entrega_localidade
- `filter(F.col("entrega_no_prazo") != "Não entregue")` é um filtro de extrema importância para a visualização pois "elimina" valores nulos e indejesados, analisando as ocorrências que entregaram e se tiveram atraso
- Agrupamento por cidade e estado para que a visualização se refira as localidades únicas, que é como o enunciado está pedindo
- Efetuando a média de _tempo_entrega_dias_ e _tempo_esitmado_dias_. Após isso comparamos os dois valores em _entrega_maior_que_estimado_ para indicar se houve ou não atraso

In [0]:
df_tempo_medio_entrega_localidade = (ft_atrasos_pedidos_local_vendedor.
                                     filter(F.col("entrega_no_prazo") != "Não entregue").
                                     groupBy("cidade", "estado").
                                      agg(
                                          F.avg("tempo_entrega_dias").alias("tempo_medio_entrega"),
                                          F.avg("tempo_entrega_estimado_dias").alias("tempo_medio_entrega_estimado")
                                      )
                                      .withColumn("entrega_maior_que_estimado", F.when(F.col("tempo_medio_entrega") > F.col("tempo_medio_entrega_estimado"), "Sim").otherwise("Não")))
display(df_tempo_medio_entrega_localidade.limit(10))
df_tempo_medio_entrega_localidade.createOrReplaceTempView("view_tempo_medio_entrega_localidade")

- Revelando quis foram as localidades que tiveram um maior tempo de atraso

In [0]:
df_localidades_com_maior_atraso = spark.sql(
    """
    SELECT 
        AVG(tempo_medio_entrega) - AVG(tempo_medio_entrega_estimado) AS tempo_atraso,
        cidade,
        estado
    FROM view_tempo_medio_entrega_localidade
    WHERE entrega_maior_que_estimado = 'Sim'
    GROUP BY cidade, estado
    ORDER BY tempo_atraso DESC
    """
)
display(df_localidades_com_maior_atraso)

### Atividade 2.2.2 - gold.view_tempo_medio_entrega_localidade
- Faça um agrupamento pelos vendedores da fato _ft_atrasos_pedidos_local_vendedor_
- `F.count(F.when(F.col("entrega_no_prazo") == "Não", F.col("entrega_no_prazo")))` é o filtro aplicado para que _total_atrasados_ se refira a quantidade de pedidos que atrasaram
- relaciono _total_atrasados_ com _total_pedidos_ encontrando a porcentagem de pedidos atrasados em relação ao total de pedidos do vendedor

In [0]:
df_vendedor_pontualidade = (ft_atrasos_pedidos_local_vendedor
                            .groupBy("id_vendedor")
                            .agg(
                                F.count(F.when(F.col("entrega_no_prazo") == "Não", F.col("entrega_no_prazo"))).alias("total_atrasados"),
                                F.count(F.col("id_pedido")).alias("total_pedidos")
                            )
                            .withColumn("percentual_atraso", (F.col("total_atrasados") * 100 / F.col("total_pedidos")))
                            .withColumn("percentual_atraso", F.concat(F.col("percentual_atraso").cast("string"), F.lit("%")))
                            )

display(df_vendedor_pontualidade)
df_vendedor_pontualidade.createOrReplaceTempView("view_df_vendedor_pontualidade")

- Visualizando os vendedores com maior percentual de atraso

In [0]:
df_vendedores_mais_atrasados = spark.sql("SELECT * FROM view_df_vendedor_pontualidade ORDER BY percentual_atraso DESC")
display(df_vendedores_mais_atrasados)

### Atividade 3.1 - gold.dm_tempo
- `data_inicio = dbutils.widgets.get("data_inicio")`
- `data_fim = dbutils.widgets.get("data_fim")`
- Esses dois parâmetros representam o intervalo que vamos "explodir" para criar as ocorrências da tabela
- OBS: de preferência, selecionar uma data_inicio que seja de 2016 pois é onde possuem o maior concentração de dados do dataset
- `dia_da_semana_nome` e `mes_nome` vão ter tratamento para valores em pt-br por meio do "CASE-WHEN"


In [0]:
from pyspark.sql import functions as F

data_inicio = dbutils.widgets.get("data_inicio")
data_fim = dbutils.widgets.get("data_fim")

df = spark.sql(
    f"SELECT DATE('{data_inicio}') AS data_inicio, DATE('{data_fim}') AS data_fim"
)

dm_tempo = (
    df
    .withColumn("array_de_dias_a_adicionar", F.sequence(F.lit(1), F.date_diff("data_fim", "data_inicio")))
    .withColumn("dias_a_adicionar", F.explode("array_de_dias_a_adicionar"))
    .withColumn("sk_tempo", F.date_add("data_inicio", "dias_a_adicionar"))
    .withColumn("ano", F.year("sk_tempo"))
    .withColumn("trimestre", F.quarter("sk_tempo"))
    .withColumn("mes", F.month("sk_tempo"))
    .withColumn("semana_do_ano", F.weekofyear("sk_tempo"))
    .withColumn("dia", F.dayofmonth("sk_tempo"))
    .withColumn("dia_da_semana_num", F.dayofweek("sk_tempo"))
    .withColumn("dia_da_semana_nome", F.expr("""
        CASE dayofweek(sk_tempo)
            WHEN 1 THEN 'Domingo'
            WHEN 2 THEN 'Segunda-feira'
            WHEN 3 THEN 'Terça-feira'
            WHEN 4 THEN 'Quarta-feira'
            WHEN 5 THEN 'Quinta-feira'
            WHEN 6 THEN 'Sexta-feira'
            WHEN 7 THEN 'Sábado'
        END
    """))
    .withColumn("mes_nome", F.expr("""
        CASE month(sk_tempo)
            WHEN 1 THEN 'Janeiro'
            WHEN 2 THEN 'Fevereiro'
            WHEN 3 THEN 'Março'
            WHEN 4 THEN 'Abril'
            WHEN 5 THEN 'Maio'
            WHEN 6 THEN 'Junho'
            WHEN 7 THEN 'Julho'
            WHEN 8 THEN 'Agosto'
            WHEN 9 THEN 'Setembro'
            WHEN 10 THEN 'Outubro'
            WHEN 11 THEN 'Novembro'
            WHEN 12 THEN 'Dezembro'
        END
    """))
    .withColumn("eh_fim_de_semana", F.when((F.col("dia_da_semana_num") == 1) | (F.col("dia_da_semana_num") == 7), "Sim").otherwise("Não"))
    .drop("dias_a_adicionar", "data_fim", "array_de_dias_a_adicionar", "data_inicio")
)

dm_tempo.printSchema()

write_to_delta_table(dm_tempo, "dm_tempo")

- criação de dimensões para cliente, produtos e vendedores
- isso é essencial para que a fato _ft_vendas_geral_ possua uma chave estrangeira para essas dimensões, criando uma modelagem dimensional

In [0]:
dim_clientes_df = (df_consumidores_silver
                   .select(
                       F.col("id_consumidor").alias("id_cliente"),
                       F.col("cidade"),
                       F.col("prefixo_cep"),
                       F.col("estado")
                   ))

write_to_delta_table(dim_clientes_df, "dim_clientes")

dim_produtos_df = (df_produtos_silver
                   .select(
                       F.col("id_produto").alias("id_produto"),
                       F.col("categoria_produto"),
                       F.col("comprimento_centimetros"),
                       F.col("altura_centimetros"),
                       F.col("largura_centimetros"),
                       F.col("peso_produto_gramas"),
                   ))

write_to_delta_table(dim_produtos_df, "dim_produtos")

dim_vendedores_df = (df_vendedores_silver
                     .select(
                         F.col("id_vendedor").alias("id_vendedor"),
                         F.col("cidade"),
                         F.col("estado"),
                         F.col("prefixo_cep"),
                     ))

write_to_delta_table(dim_vendedores_df, "dim_vendedores")

- A dimensão de média avaliação vai ajudar a reduzir a granularidade da consulta de crição da fato _ft_vendas_geral_, encontrando logo a média para o _id_pedido_ de _ft_vendas_geral_

In [0]:
dim_media_avaliacao = (
    df_pedido_total_silver
    .join(df_avaliacao_silver, on="id_pedido")
    .groupBy("id_pedido")
    .agg(
        F.avg("avaliacao").alias("media_avaliacao_pedido")
    )
)

display(dim_media_avaliacao.limit(10))

### Atividade 3.2 - gold.ft_vendas_geral
- a fato _ft_vendas_geral_ tem valor quantitativo e agrega várias informações da empresa
- fk_cliente,
fk_produto,
fk_vendedor &
fk_tempo são chaves estrangeiras para as respectivas dimensões: dim_clientes, dim_produtos, dim_vendedores e dm_tempo.
- _valor_total_item_brl_ é a soma de _valor_produto_brl_ com _valor_frete_brl_
- valor_total_item_url_ possui o mesmo cálculo, com o final da conversão dividindo pela _cotacao_dolar_

In [0]:
df_itens_pedidos_silver_alias = df_itens_pedidos_silver.alias("itens")
dim_media_avaliacao_alias = dim_media_avaliacao.alias("avaliacao")
df_pedidos_silver_alias = df_pedidos_silver.alias("pedidos")
df_pedido_total_silver_alias = df_pedido_total_silver.alias("pedido_total")
df_cotacao_silver_alias = df_cotacao_silver.alias("cotacao")

ft_vendas_geral = (
    df_itens_pedidos_silver_alias
    .join(dim_media_avaliacao_alias, df_itens_pedidos_silver_alias.id_pedido == dim_media_avaliacao_alias.id_pedido)
    .join(df_pedidos_silver_alias, df_itens_pedidos_silver_alias.id_pedido == df_pedidos_silver_alias.id_pedido)
    .join(df_pedido_total_silver_alias, df_itens_pedidos_silver_alias.id_pedido == df_pedido_total_silver_alias.id_pedido)
    .join(df_cotacao_silver_alias, F.to_date(df_pedido_total_silver_alias.data_pedido) == F.to_date(df_cotacao_silver_alias.dataHoraCotacao))
    .select(
        F.col("itens.id_pedido"),
        F.col("itens.id_item"),
        F.col("pedido_total.id_consumidor").alias("fk_cliente"),
        F.col("itens.id_produto").alias("fk_produto"),
        F.col("itens.id_vendedor").alias("fk_vendedor"),
        F.col("pedido_total.data_pedido").alias("fk_tempo"),
        F.col("pedidos.status").alias("status_pedido"),
        F.col("pedidos.tempo_entrega_dias"),
        F.col("pedidos.entrega_no_prazo"),
        F.col("preco_BRL").alias("valor_produto_brl"),
        F.col("preco_frete").alias("valor_frete_brl"),
        F.col("cotacao.cotacaoCompra").cast("decimal(8,4)").alias("cotacao_dolar"),
        F.col("avaliacao.media_avaliacao_pedido").cast("decimal(3,2)").alias("avaliacao_pedido")
    )
    .withColumn("valor_total_item_brl", (F.col("valor_produto_brl") + F.col("valor_frete_brl")).cast("decimal(12,2)"))
    .withColumn("valor_produto_usd", (F.col("valor_produto_brl") / F.col("cotacao_dolar")).cast("decimal(12,2)"))
    .withColumn("valor_frete_usd", (F.col("valor_frete_brl") / F.col("cotacao_dolar")).cast("decimal(12,2)"))
    .withColumn("valor_total_item_usd", (F.col("valor_produto_usd") + F.col("valor_frete_usd")).cast("decimal(12,2)"))
)

ft_vendas_geral.printSchema()

write_to_delta_table(ft_vendas_geral, "ft_vendas_geral")

In [0]:
df_vendas_por_periodo = (
    ft_vendas_geral
    .join(
        dm_tempo,
        ft_vendas_geral["fk_tempo"] == dm_tempo["sk_tempo"]
    )
    .groupBy(
        dm_tempo["ano"],
        dm_tempo["mes"],
        dm_tempo["trimestre"],
        dm_tempo["semana_do_ano"],
        dm_tempo["dia"],
        dm_tempo["dia_da_semana_num"],
        dm_tempo["dia_da_semana_nome"],
        dm_tempo["mes_nome"],
        dm_tempo["eh_fim_de_semana"],
    )
    .agg(
        F.countDistinct("id_pedido").cast("bigint").alias("total_pedidos"),
        F.count("id_item").cast("bigint").alias("total_itens"),
        F.sum("valor_total_item_brl").cast("decimal(12,2)").alias("receita_total_brl"),
        F.sum("valor_total_item_usd").cast("decimal(12,2)").alias("receita_total_usd"),
        (F.sum("valor_total_item_brl") / F.countDistinct("id_pedido")).cast("decimal(12,2)").alias("ticket_medio_brl"),
        F.avg("avaliacao_pedido").cast("decimal(3,2)").alias("avaliacao_media")
    )
)

display(df_vendas_por_periodo)

df_vendas_por_periodo.printSchema()

df_vendas_por_periodo.createOrReplaceTempView("view_vendas_por_periodo")

In [0]:
%sql
SELECT mes_nome FROM view_vendas_por_periodo WHERE avaliacao_media IN (
  SELECT MAX(avaliacao_media) FROM view_vendas_por_periodo
);

SELECT dia_da_semana_nome FROM view_vendas_por_periodo WHERE receita_total_brl IN (
  SELECT MAX(receita_total_brl) FROM view_vendas_por_periodo
);

### Atividade 3.4 - gold.view_top_produto
- Agrupamento de produtos e categoria para relacionar o desempenho de um produto "x" tal que tenha categoria "y"
-  

In [0]:
df_top_produto = (dim_produtos_df.
                  join(
                      ft_vendas_geral,
                      dim_produtos_df["id_produto"] == ft_vendas_geral["fk_produto"]
                  )
                  .groupBy(
                      dim_produtos_df["id_produto"],
                      dim_produtos_df["categoria_produto"],
                  )
                  .agg(
                        F.count("id_produto").cast("bigint").alias("quantidade_vendida"),
                        F.count("id_pedido").cast("bigint").alias('total_pedidos'),
                        F.avg("peso_produto_gramas").cast("decimal(8,2)").alias("peso_medio_gramas"),
                        F.sum("valor_total_item_brl").cast("decimal(12,2)").alias("receita_brl"),
                        F.sum("valor_total_item_usd").cast("decimal(12,2)").alias("receita_usd"),
                        F.avg("valor_produto_brl").cast("decimal(12,2)").alias("preco_medio_brl"),
                        F.avg("avaliacao_pedido").cast("decimal(3,2)").alias("avaliacao_media")
                  ))

df_top_produto.printSchema()

df_top_produto.createOrReplaceTempView("view_top_produto")

### Atividade 3.5 - gold.view_vendas_produtos_esteticos
- Agrupando por ano e mês para visualizar a performace de vendas do E-commerce de fashion em cada mês
- `HAVING categoria_produto LIKE 'fashion%'` filtrando as categorias que começam com "fashion"
- Aplicando junção com _ft_vendas_geral_ e _dim_produtos_ para obter os dados de desempenho

In [0]:
spark.sql("USE CATALOG medalhao")
spark.sql("USE SCHEMA gold")

view_vendas_produtos_esteticos = spark.sql(
    """
    WITH view_vendas_produtos_esteticos AS (
        SELECT 
            ano, 
            mes, 
            categoria_produto, 
            COUNT(id_produto) AS total_pedidos, 
            COUNT(id_item) AS total_itens_vendidos,
            CAST(SUM(valor_total_item_brl) AS DECIMAL(12,2)) AS receita_total_brl,
            CAST(SUM(valor_produto_usd) AS DECIMAL(12,2)) AS receita_total_usd,
            CAST(SUM(valor_total_item_brl)/COUNT(id_item) AS DECIMAL(12,2)) AS ticket_medio_brl,
            CAST(SUM(valor_produto_usd)/COUNT(id_item) AS DECIMAL(12,2)) AS ticket_medio_usd,
            CAST(AVG(avaliacao_pedido) AS DECIMAL(3,2)) AS avaliacao_media
        FROM ft_vendas_geral v 
        JOIN dim_produtos p ON fk_produto = p.id_produto
        JOIN dm_tempo t ON fk_tempo = t.sk_tempo 
        GROUP BY ano, mes, categoria_produto
        HAVING categoria_produto LIKE 'fashion%'
    )
    SELECT * FROM view_vendas_produtos_esteticos;
    """
)

view_vendas_produtos_esteticos.printSchema()
display(view_vendas_produtos_esteticos)