# Análise de Vendas Retail com Arquitetura Medallion

Este notebook demonstra como utilizar a arquitetura Medallion (Bronze, Silver, Gold) do DataFlow Lab para uma análise de dados de varejo.

## 1. Configuração do Ambiente

Primeiro, vamos configurar nossa sessão Spark com suporte ao Delta Lake.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    year,
    month,
    dayofmonth,
    hour,
    date_format,
    to_date,
    lit,
    count,
    sum,
    avg,
    when,
    expr,
)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os

# Configuração com suporte ao Delta Lake
spark = (
    SparkSession.builder.appName("Retail-Sales-Analysis")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .getOrCreate()
)

print("Sessão Spark iniciada com sucesso!")

## 2. Geração de Dados de Exemplo

Vamos criar um conjunto de dados simulado de vendas de varejo para nossa análise.

In [None]:
# Criando dados de exemplo
from datetime import datetime, timedelta
import random

# Lista de produtos
products = [
    {"id": 1, "name": "Smartphone", "category": "Eletrônicos", "price": 1999.90},
    {"id": 2, "name": "Notebook", "category": "Eletrônicos", "price": 3499.90},
    {"id": 3, "name": "Tênis", "category": "Vestuário", "price": 299.90},
    {"id": 4, "name": "Camiseta", "category": "Vestuário", "price": 99.90},
    {"id": 5, "name": "Panela Elétrica", "category": "Casa", "price": 399.90},
    {"id": 6, "name": "Liquidificador", "category": "Casa", "price": 149.90},
    {"id": 7, "name": "Livro", "category": "Cultura", "price": 49.90},
    {"id": 8, "name": "Fones de Ouvido", "category": "Eletrônicos", "price": 199.90},
]

# Lista de lojas
stores = [
    {"id": 1, "name": "Loja Centro", "city": "São Paulo", "region": "Sudeste"},
    {"id": 2, "name": "Loja Shopping", "city": "Rio de Janeiro", "region": "Sudeste"},
    {"id": 3, "name": "Loja Norte", "city": "Salvador", "region": "Nordeste"},
    {"id": 4, "name": "Loja Sul", "city": "Porto Alegre", "region": "Sul"},
]

# Gerar dados de vendas
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 12, 31)
sales_data = []

# Adicionar ID para facilitar a simulação de problemas de qualidade de dados
sale_id = 1

current_date = start_date
while current_date <= end_date:
    # Número de vendas por dia varia entre 50 e 200
    daily_sales = random.randint(50, 200)

    for _ in range(daily_sales):
        product = random.choice(products)
        store = random.choice(stores)
        quantity = random.randint(1, 5)

        # Adicionar ruído nos dados (valores nulos, formatação incorreta etc.)
        if random.random() < 0.05:  # 5% de chance de ter um problema de qualidade
            problem_type = random.choice(
                ["null_price", "negative_quantity", "wrong_date", "null_store"]
            )

            if problem_type == "null_price":
                price = None
            elif problem_type == "negative_quantity":
                quantity = -quantity
            elif problem_type == "wrong_date":
                sale_date = "INVALID_DATE"
            elif problem_type == "null_store":
                store_id = None
                store_name = None
                store_city = None
                store_region = None
            else:
                price = product["price"]
                sale_date = current_date.strftime("%Y-%m-%d")
                store_id = store["id"]
                store_name = store["name"]
                store_city = store["city"]
                store_region = store["region"]
        else:
            price = product["price"]
            sale_date = current_date.strftime("%Y-%m-%d")
            store_id = store["id"]
            store_name = store["name"]
            store_city = store["city"]
            store_region = store["region"]

        # Criar registro de venda
        sale = {
            "sale_id": sale_id,
            "sale_date": sale_date,
            "product_id": product["id"],
            "product_name": product["name"],
            "product_category": product["category"],
            "price": price,
            "quantity": quantity,
            "store_id": store_id,
            "store_name": store_name,
            "store_city": store_city,
            "store_region": store_region,
            "total_amount": None if price is None else price * quantity,
        }

        sales_data.append(sale)
        sale_id += 1

    current_date += timedelta(days=1)

# Converter para DataFrame do Pandas
sales_df_pd = pd.DataFrame(sales_data)

# Converter para DataFrame do Spark
sales_df = spark.createDataFrame(sales_df_pd)

print(f"Dados gerados: {sales_df.count()} registros de vendas")
sales_df.printSchema()

## 3. Camada Bronze - Armazenamento de Dados Brutos

Na camada Bronze, salvamos os dados brutos sem transformações além das necessárias para conformidade de formato. Esta camada serve como o principal ponto de entrada de dados no lakehouse.

In [None]:
# Definir o caminho para armazenamento
bronze_path = "s3a://bronze/retail/sales"

# Escrever na camada Bronze no formato Delta
sales_df.write.format("delta").mode("overwrite").save(bronze_path)

print(f"Dados salvos na camada Bronze: {bronze_path}")

# Verificar os dados armazenados
bronze_df = spark.read.format("delta").load(bronze_path)
print(f"Registros na camada Bronze: {bronze_df.count()}")
bronze_df.show(5)

## 4. Camada Silver - Dados Limpos e Validados

Na camada Silver, aplicamos transformações para limpar e validar os dados. Esta camada fornece dados confiáveis de alta qualidade.

In [None]:
# Processamento para camada Silver
silver_df = (
    bronze_df.withColumn("sale_date", to_date(col("sale_date")))
    .filter(col("quantity") > 0)
    .filter(col("price").isNotNull())
    .filter(col("store_id").isNotNull())
    .withColumn("year", year(col("sale_date")))
    .withColumn("month", month(col("sale_date")))
    .withColumn("day", dayofmonth(col("sale_date")))
    .withColumn("total_amount", col("price") * col("quantity"))
)

# Definir o caminho para armazenamento Silver
silver_path = "s3a://silver/retail/sales"

# Escrever na camada Silver
silver_df.write.format("delta").mode("overwrite").save(silver_path)

print(f"Dados salvos na camada Silver: {silver_path}")

# Verificar os dados Silver
silver_df = spark.read.format("delta").load(silver_path)
print(f"Registros na camada Silver: {silver_df.count()}")
silver_df.show(5)

## 5. Camada Gold - Dados Agregados e Prontos para Negócios

Na camada Gold, criamos visões agregadas e preparadas para análise de negócios e ciência de dados.

In [None]:
# Criar visões agregadas para a camada Gold

# Vendas por categoria e região
sales_by_category_region = silver_df.groupBy(
    "product_category", "store_region", "year", "month"
).agg(
    count("sale_id").alias("total_sales"),
    sum("quantity").alias("total_quantity"),
    sum("total_amount").alias("total_revenue"),
    avg("price").alias("avg_price"),
)

# Vendas por loja e dia
sales_by_store_daily = silver_df.groupBy("store_id", "store_name", "sale_date").agg(
    count("sale_id").alias("daily_sales"), sum("total_amount").alias("daily_revenue")
)

# Top produtos por vendas
top_products = (
    silver_df.groupBy("product_id", "product_name", "product_category")
    .agg(
        sum("quantity").alias("total_quantity"),
        sum("total_amount").alias("total_revenue"),
    )
    .orderBy(col("total_revenue").desc())
)

# Definir caminhos para armazenamento Gold
gold_path_category_region = "s3a://gold/retail/sales_by_category_region"
gold_path_store_daily = "s3a://gold/retail/sales_by_store_daily"
gold_path_top_products = "s3a://gold/retail/top_products"

# Escrever na camada Gold
sales_by_category_region.write.format("delta").mode("overwrite").save(
    gold_path_category_region
)
sales_by_store_daily.write.format("delta").mode("overwrite").save(gold_path_store_daily)
top_products.write.format("delta").mode("overwrite").save(gold_path_top_products)

print("Dados agregados salvos na camada Gold")

# Visualizar alguns dados Gold
print("\nVendas por Categoria e Região:")
spark.read.format("delta").load(gold_path_category_region).show(5)

print("\nTop Produtos:")
spark.read.format("delta").load(gold_path_top_products).show(5)

## 6. Análise e Visualização dos Dados

Vamos criar algumas visualizações utilizando os dados da camada Gold.

In [None]:
# Converter para Pandas para visualização
category_region_pd = (
    spark.read.format("delta").load(gold_path_category_region).toPandas()
)
top_products_pd = (
    spark.read.format("delta").load(gold_path_top_products).toPandas().head(10)
)
store_daily_pd = spark.read.format("delta").load(gold_path_store_daily).toPandas()

# Configuração de visualização
plt.style.use("ggplot")
plt.rcParams["figure.figsize"] = (14, 8)

# 1. Vendas por Categoria
category_sales = (
    category_region_pd.groupby("product_category")["total_revenue"]
    .sum()
    .sort_values(ascending=False)
)

plt.figure(figsize=(12, 6))
sns.barplot(x=category_sales.index, y=category_sales.values)
plt.title("Receita Total por Categoria de Produto", fontsize=16)
plt.xlabel("Categoria")
plt.ylabel("Receita Total (R$)")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# 2. Vendas por Região
region_sales = (
    category_region_pd.groupby("store_region")["total_revenue"]
    .sum()
    .sort_values(ascending=False)
)

plt.figure(figsize=(10, 6))
sns.barplot(x=region_sales.index, y=region_sales.values, palette="viridis")
plt.title("Receita Total por Região", fontsize=16)
plt.xlabel("Região")
plt.ylabel("Receita Total (R$)")
plt.tight_layout()
plt.show()

# 3. Top 10 Produtos
plt.figure(figsize=(14, 7))
sns.barplot(
    x="product_name", y="total_revenue", data=top_products_pd, palette="coolwarm"
)
plt.title("Top 10 Produtos por Receita Total", fontsize=16)
plt.xlabel("Produto")
plt.ylabel("Receita Total (R$)")
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
plt.show()

# 4. Tendência de vendas diárias
# Convertendo para o formato de data correto
store_daily_pd["sale_date"] = pd.to_datetime(store_daily_pd["sale_date"])

# Agrupando por data para obter o total de vendas diárias em todas as lojas
daily_sales = store_daily_pd.groupby("sale_date")["daily_revenue"].sum().reset_index()
daily_sales = daily_sales.sort_values("sale_date")

plt.figure(figsize=(16, 6))
plt.plot(
    daily_sales["sale_date"],
    daily_sales["daily_revenue"],
    marker="o",
    markersize=3,
    linestyle="-",
)
plt.title("Tendência de Vendas Diárias", fontsize=16)
plt.xlabel("Data")
plt.ylabel("Receita Diária (R$)")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 7. Aplicação de Machine Learning - Previsão de Vendas

Vamos utilizar os dados da camada Silver para treinar um modelo simples de previsão de vendas.

In [None]:
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

# Configurar MLflow
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("Previsão de Vendas Varejo")

# Preparar dados para ML a partir da camada Silver
# Vamos prever a quantidade vendida com base nas características do produto e da loja

# Usar dados da camada Silver
ml_data = silver_df.select(
    "product_id",
    "product_category",
    "store_id",
    "store_region",
    "price",
    "month",
    "day",
    "quantity",
).toPandas()

# Separar features e target
X = ml_data.drop("quantity", axis=1)
y = ml_data["quantity"]

# Preprocessamento - Codificar variáveis categóricas
categorical_cols = ["product_category", "store_region"]
numerical_cols = ["product_id", "store_id", "price", "month", "day"]

preprocessor = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), categorical_cols),
        ("num", "passthrough", numerical_cols),
    ]
)

# Dividir em treino e teste
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Iniciar experimento MLflow
with mlflow.start_run(run_name="RF_Vendas_Varejo"):

    # Definir hiperparâmetros
    n_estimators = 100
    max_depth = 10
    min_samples_split = 5

    # Logar parâmetros
    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)
    mlflow.log_param("min_samples_split", min_samples_split)

    # Criar pipeline com preprocessamento e modelo
    model = Pipeline(
        steps=[
            ("preprocessor", preprocessor),
            (
                "regressor",
                RandomForestRegressor(
                    n_estimators=n_estimators,
                    max_depth=max_depth,
                    min_samples_split=min_samples_split,
                    random_state=42,
                ),
            ),
        ]
    )

    # Treinar modelo
    model.fit(X_train, y_train)

    # Fazer predições
    y_pred = model.predict(X_test)

    # Calcular métricas
    mae = mean_absolute_error(y_test, y_pred)
    rmse = mean_squared_error(y_test, y_pred, squared=False)
    r2 = r2_score(y_test, y_pred)

    print(f"Métricas do Modelo:")
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"R²: {r2:.4f}")

    # Logar métricas
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)

    # Registrar o modelo
    mlflow.sklearn.log_model(model, "retail_sales_model")

    print(f"Modelo registrado no MLflow com sucesso!")
    print(
        f"Acesse a interface do MLflow em http://localhost:5000 para ver os detalhes do experimento."
    )

## 8. Conclusão

Neste notebook, demonstramos o fluxo completo de dados através da arquitetura Medallion do DataFlow Lab:

1. **Bronze**: Armazenamos os dados brutos sem transformações significativas
2. **Silver**: Aplicamos limpeza e validações para garantir qualidade
3. **Gold**: Criamos visões agregadas prontas para análise de negócios

Além disso, utilizamos os dados para:
- Criar visualizações de negócios relevantes
- Treinar um modelo de machine learning para previsão de vendas
- Rastrear experimentos e versões de modelos com MLflow

Esta arquitetura oferece várias vantagens:
- **Rastreabilidade**: Podemos rastrear os dados desde a origem até seu uso final
- **Qualidade**: Validação progressiva em cada camada
- **Performance**: Dados pré-agregados para consultas rápidas
- **Governança**: Separação clara entre dados brutos e processados

Para próximos passos, considere:
- Implementar um pipeline de atualização incremental
- Adicionar validações de qualidade mais rigorosas
- Explorar modelos de machine learning mais avançados
- Criar dashboards no Streamlit conectados à camada Gold

In [None]:
# Encerrar a sessão Spark
spark.stop()
print("Sessão Spark encerrada.")