# Gold Layer - Agregacoes e Analises

Este notebook cria metricas agregadas e analises para business intelligence.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

spark = SparkSession.builder \
    .appName("Gold Layer") \
    .getOrCreate()

df = spark.read.parquet("data/silver/dados_limpos.parquet")

os.makedirs("data/gold", exist_ok=True)

df = df.withColumn("data_compra", to_date(col("InvoiceDate")))


## Criando metricas diarias


In [None]:
metricas_diarias = df.groupBy("data_compra").agg(
    countDistinct("InvoiceNo").alias("total_pedidos"),
    sum("valor_total").alias("receita_total"),
    countDistinct("CustomerID").alias("clientes_unicos")
)

metricas_diarias = metricas_diarias.withColumn(
    "ticket_medio",
    col("receita_total") / col("total_pedidos")
)

metricas_diarias.write \
    .mode("overwrite") \
    .parquet("data/gold/metricas_diarias.parquet")

print(f"Metricas diarias: {metricas_diarias.count()} dias")


## Analise RFM de clientes


In [None]:
data_referencia = df.agg(max("InvoiceDate")).collect()[0][0]

analise_clientes = df.groupBy("CustomerID").agg(
    max("InvoiceDate").alias("ultima_compra"),
    countDistinct("InvoiceNo").alias("frequencia_pedidos"),
    sum("valor_total").alias("valor_total_gasto")
)

analise_clientes = analise_clientes.withColumn(
    "dias_ultima_compra",
    datediff(lit(data_referencia), col("ultima_compra"))
)

analise_clientes.write \
    .mode("overwrite") \
    .parquet("data/gold/analise_clientes.parquet")

print(f"Analise de clientes: {analise_clientes.count()} clientes")


## Desempenho de produtos


In [None]:
desempenho_produtos = df.groupBy("StockCode", "Description").agg(
    sum("Quantity").alias("quantidade_vendida"),
    sum("valor_total").alias("receita_total"),
    countDistinct("InvoiceNo").alias("num_pedidos")
)

desempenho_produtos.write \
    .mode("overwrite") \
    .parquet("data/gold/desempenho_produtos.parquet")

print(f"Desempenho de produtos: {desempenho_produtos.count()} produtos")


## Analise de vendas por pais


In [None]:
analise_paises = df.groupBy("Country").agg(
    countDistinct("InvoiceNo").alias("total_pedidos"),
    sum("valor_total").alias("receita_total"),
    countDistinct("CustomerID").alias("clientes_unicos")
)

analise_paises.write \
    .mode("overwrite") \
    .parquet("data/gold/analise_paises.parquet")

print(f"Analise de paises: {analise_paises.count()} paises")


## Metricas mensais


In [None]:
df = df.withColumn("ano_mes", date_format(col("InvoiceDate"), "yyyy-MM"))

metricas_mensais = df.groupBy("ano_mes").agg(
    countDistinct("InvoiceNo").alias("total_pedidos"),
    sum("valor_total").alias("receita_total"),
    countDistinct("CustomerID").alias("clientes_unicos")
)

metricas_mensais = metricas_mensais.withColumn(
    "ticket_medio",
    col("receita_total") / col("total_pedidos")
)

metricas_mensais.write \
    .mode("overwrite") \
    .parquet("data/gold/metricas_mensais.parquet")

print(f"Metricas mensais: {metricas_mensais.count()} meses")


## Analise de devolucoes


In [None]:
devolucoes = df.filter(col("eh_devolucao") == True)

if devolucoes.count() > 0:
    analise_devolucoes = devolucoes.groupBy("StockCode", "Description").agg(
        sum("Quantity").alias("quantidade_devolvida"),
        sum("valor_total").alias("valor_devolvido"),
        countDistinct("InvoiceNo").alias("num_devolucoes")
    )
    
    analise_devolucoes.write \
        .mode("overwrite") \
        .parquet("data/gold/analise_devolucoes.parquet")
    
    print(f"Analise de devolucoes: {analise_devolucoes.count()} produtos")
else:
    print("Nao ha devolucoes nos dados")

spark.stop()
