# MVP - Pipeline de Dados de Vendas Amazon
## Arquitetura Medalh√£o no Databricks

**Autor:** [Seu Nome]  
**Data:** 14 de Dezembro de 2025  
**Disciplina:** Engenharia de Dados - PUC-Rio

---

## üìã √çndice

1. [Objetivos do Projeto](#objetivos)
2. [Coleta de Dados](#coleta)
3. [Modelagem - Arquitetura Medalh√£o](#modelagem)
4. [Carga e Transforma√ß√£o (ETL)](#carga)
5. [An√°lise de Qualidade dos Dados](#qualidade)
6. [An√°lise de Neg√≥cio](#analise)
7. [Autoavalia√ß√£o](#autoavaliacao)

---

## üéØ 1. Objetivos do Projeto {#objetivos}

### Problema de Neg√≥cio
Este MVP tem como objetivo **analisar o desempenho de vendas da Amazon** para entender padr√µes de comportamento do cliente, performance de produtos e otimiza√ß√£o de estrat√©gias comerciais.

### Perguntas de Pesquisa

1. **Performance de Vendas:**
   - Qual √© o volume total de vendas por m√™s e trimestre?
   - Quais s√£o os produtos com melhor performance de vendas?
   - Como o desconto impacta no volume de vendas?

2. **Comportamento do Cliente:**
   - Quais pa√≠ses/regi√µes geram mais receita?
   - Qual √© o ticket m√©dio por cliente?
   - Quais s√£o os m√©todos de pagamento mais utilizados?

3. **An√°lise de Produtos:**
   - Quais categorias de produtos s√£o mais vendidas?
   - Qual √© a distribui√ß√£o de pre√ßos por categoria?
   - Quais marcas t√™m melhor performance?

4. **An√°lise Operacional:**
   - Qual √© a taxa de cancelamento e devolu√ß√£o?
   - Como os custos de frete impactam na margem?
   - Qual √© a distribui√ß√£o de status dos pedidos?

5. **An√°lise Temporal:**
   - Existe sazonalidade nas vendas?
   - Quais s√£o os per√≠odos de maior volume de pedidos?

### Dados Dispon√≠veis
Utilizaremos o dataset `Amazon.csv` contendo informa√ß√µes de pedidos, incluindo:
- Dados de pedidos (ID, data, status)
- Informa√ß√µes de clientes e localiza√ß√£o
- Detalhes de produtos (nome, categoria, marca)
- Valores financeiros (pre√ßo, desconto, taxa, frete)
- Informa√ß√µes de vendedores

## üìö 1. Import Required Libraries and Setup

In [0]:
# Import libraries for Databricks environment
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

print("Libraries imported successfully!")
print(f"Spark version: {spark.version}")
print(f"Current timestamp: {datetime.now()}")

In [0]:
%pip install kagglehub
dbutils.library.restartPython()



In [0]:
import kagglehub

# Download latest version
#path = kagglehub.dataset_download("rohiteng/amazon-sales-dataset")

#print("Path to dataset files:", path)

__dataset = kagglehub.dataset_download("rohiteng/amazon-sales-dataset")

In [0]:
# Database and table configurations
DATABASE_NAME = "amazon_sales_mvp"
BRONZE_TABLE = "sales_bronze"
SILVER_TABLE = "sales_silver" 
GOLD_FACT_TABLE = "fact_sales"
GOLD_DIM_CUSTOMER = "dim_customer"
GOLD_DIM_PRODUCT = "dim_product"
GOLD_DIM_SELLER = "dim_seller"
GOLD_DIM_DATE = "dim_date"

# Create database if it doesn't exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {DATABASE_NAME}")
spark.sql(f"USE {DATABASE_NAME}")

print(f"Database '{DATABASE_NAME}' is ready!")
print("Delta Lake configurations applied successfully!")

## ü•â 2. Data Ingestion - Bronze Layer

### 2.1 Coleta de Dados

O conjunto de dados Amazon.csv cont√©m informa√ß√µes detalhadas de vendas com as seguintes caracter√≠sticas:

**Linhagem dos Dados:**
- **Fonte:** Amazon.csv (dataset simulado de vendas)
- **M√©todo de Coleta:** Arquivo CSV est√°tico
- **Per√≠odo:** 2020-2024 (173 registros de pedidos)
- **T√©cnica de Composi√ß√£o:** Dados transacionais simulados

In [0]:
# Define schema for raw data ingestion
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DoubleType
)

bronze_schema = StructType([
    StructField("OrderID", StringType(), True),
    StructField("OrderDate", StringType(), True),
    StructField("CustomerID", StringType(), True), 
    StructField("CustomerName", StringType(), True),
    StructField("ProductID", StringType(), True),
    StructField("ProductName", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Brand", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("Discount", DoubleType(), True),
    StructField("Tax", DoubleType(), True),
    StructField("ShippingCost", DoubleType(), True),
    StructField("TotalAmount", DoubleType(), True),
    StructField("PaymentMethod", StringType(), True),
    StructField("OrderStatus", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("SellerID", StringType(), True)
])

print("Bronze layer schema defined!")

In [0]:

from pyspark.sql.functions import current_timestamp, lit
import pandas as pd
import os
#file_path = os.path.join(__dataset, 'Amazon.csv')
#df = pd.read_csv(file_path)

# Load raw data into Bronze layer
# Note: In Databricks, adjust the file path as needed
#file_path = "/FileStore/shared_uploads/Amazon.csv"  # Adjust path for your Databricks environment
file_path = "/Volumes/workspace/amazon_sales_mvp/amazon/Amazon.csv"
try:
    # Read CSV file with defined schema
    df_bronze_raw = spark.read \
        .option("header", "true") \
        .option("inferSchema", "false") \
        .schema(bronze_schema) \
        .csv(file_path)
    
    # Add metadata columns for lineage tracking
    df_bronze = df_bronze_raw.withColumn("ingestion_timestamp", current_timestamp()) \
                             .withColumn("source_file", lit("Amazon.csv")) \
                             .withColumn("bronze_layer_version", lit("1.0"))
    
    # Write to Delta table (Bronze layer)
    df_bronze.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(BRONZE_TABLE)
    
    print(f"‚úÖ Bronze table '{BRONZE_TABLE}' created successfully!")
    print(f"üìä Records loaded: {df_bronze.count()}")
    
except Exception as e:
    print(f"‚ùå Error loading data: {str(e)}")
    print("üìù Note: Please upload Amazon.csv to Databricks FileStore and adjust the file path")

In [0]:
bronze_summary = spark.sql(
    """
    SELECT 
        COUNT(*) as total_records,
        COUNT(DISTINCT OrderID) as unique_orders,
        MIN(OrderDate) as earliest_date,
        MAX(OrderDate) as latest_date
    FROM sales_bronze
    """
)
display(bronze_summary)

## ü•à 3. Data Cleaning and Validation - Silver Layer

### 3.1 An√°lise de Qualidade dos Dados

Antes de criar a camada Silver, vamos analisar a qualidade dos dados da camada Bronze para identificar problemas de:
- **Completude:** Valores nulos ou vazios
- **Consist√™ncia:** Formatos e padr√µes de dados
- **Validade:** Valores dentro de dom√≠nios esperados
- **Duplica√ß√£o:** Registros duplicados

In [0]:
from pyspark.sql.functions import col
# Data Quality Analysis
def analyze_data_quality(df, table_name):
    """
    Perform comprehensive data quality analysis
    """
    print(f"üîç Data Quality Analysis for {table_name}")
    print("=" * 50)
    
    total_rows = df.count()
    print(f"üìä Total Records: {total_rows:,}")
    
    # Check for null values
    print("\nüìã Null Value Analysis:")
    null_counts = []
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        null_pct = (null_count / total_rows) * 100
        null_counts.append((column, null_count, null_pct))
        if null_count > 0:
            print(f"   {column}: {null_count} ({null_pct:.2f}%)")
    
    if not any(count[1] > 0 for count in null_counts):
        print("   ‚úÖ No null values found!")
    
    # Check for duplicates
    duplicate_count = total_rows - df.dropDuplicates().count()
    print(f"\nüîÑ Duplicate Records: {duplicate_count}")
    
    # Data type validation
    print(f"\nüìã Data Types:")
    df.printSchema()
    
    return null_counts, duplicate_count

# Load bronze data for analysis
df_bronze_analysis = spark.table(BRONZE_TABLE)
null_analysis, duplicates = analyze_data_quality(df_bronze_analysis, "Bronze Layer")


In [0]:
from pyspark.sql.functions import to_date, year, month, dayofweek, trim, upper, when, current_timestamp, lit, col
# Create Silver Layer with data transformations and validations
def create_silver_layer():
    """
    Transform Bronze data into clean Silver layer
    """
    print("üîÑ Creating Silver Layer...")
    
    # Load bronze data
    df_bronze = spark.table(BRONZE_TABLE)
    
    # Data transformations and cleaning
    df_silver = df_bronze.select(
        # Primary keys
        col("OrderID").alias("order_id"),
        col("CustomerID").alias("customer_id"),
        col("ProductID").alias("product_id"),
        col("SellerID").alias("seller_id"),
        
        # Date transformation
        to_date(col("OrderDate"), "yyyy-MM-dd").alias("order_date"),
        year(to_date(col("OrderDate"), "yyyy-MM-dd")).alias("order_year"),
        month(to_date(col("OrderDate"), "yyyy-MM-dd")).alias("order_month"),
        dayofweek(to_date(col("OrderDate"), "yyyy-MM-dd")).alias("order_day_of_week"),
        
        # Customer information
        trim(upper(col("CustomerName"))).alias("customer_name"),
        trim(upper(col("City"))).alias("city"),
        trim(upper(col("State"))).alias("state"),
        trim(upper(col("Country"))).alias("country"),
        
        # Product information
        trim(col("ProductName")).alias("product_name"),
        trim(upper(col("Category"))).alias("category"),
        trim(upper(col("Brand"))).alias("brand"),
        
        # Numerical values with validation
        when(col("Quantity") > 0, col("Quantity")).otherwise(1).alias("quantity"),
        when(col("UnitPrice") >= 0, col("UnitPrice")).otherwise(0).alias("unit_price"),
        when(col("Discount").between(0, 1), col("Discount")).otherwise(0).alias("discount_rate"),
        when(col("Tax") >= 0, col("Tax")).otherwise(0).alias("tax_amount"),
        when(col("ShippingCost") >= 0, col("ShippingCost")).otherwise(0).alias("shipping_cost"),
        when(col("TotalAmount") > 0, col("TotalAmount")).otherwise(0).alias("total_amount"),
        
        # Categorical values
        trim(upper(col("PaymentMethod"))).alias("payment_method"),
        trim(upper(col("OrderStatus"))).alias("order_status"),
        
        # Calculated fields
        (col("UnitPrice") * col("Quantity")).alias("gross_amount"),
        (col("UnitPrice") * col("Quantity") * col("Discount")).alias("discount_amount"),
        (col("TotalAmount") - col("Tax") - col("ShippingCost")).alias("net_amount"),
        
        # Data quality flags
        when(col("TotalAmount") <= 0, 1).otherwise(0).alias("is_suspicious_amount"),
        when(col("OrderStatus").isin(["CANCELLED", "RETURNED"]), 1).otherwise(0).alias("is_failed_order"),
        
        # Metadata
        current_timestamp().alias("silver_processed_timestamp"),
        lit("1.0").alias("silver_layer_version")
    )
    
    # Add data quality validations
    df_silver_clean = df_silver.filter(
        (col("order_date").isNotNull()) &
        (col("total_amount") >= 0) &
        (col("quantity") > 0)
    )
    
    # Write to Silver table
    df_silver_clean.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(SILVER_TABLE)
    
    silver_count = df_silver_clean.count()
    bronze_count = df_bronze.count()
    
    print(f"‚úÖ Silver table '{SILVER_TABLE}' created successfully!")
    print(f"üìä Records processed: {bronze_count:,} ‚Üí {silver_count:,}")
    print(f"üîç Data quality filter: {bronze_count - silver_count:,} records removed")
    
    return df_silver_clean

# Create Silver layer
df_silver = create_silver_layer()


In [0]:
%sql
-- Validate Silver layer data quality
SELECT 
    'SILVER_VALIDATION' as layer,
    COUNT(*) as total_records,
    COUNT(DISTINCT order_id) as unique_orders,
    COUNT(DISTINCT customer_id) as unique_customers,
    COUNT(DISTINCT product_id) as unique_products,
    MIN(order_date) as earliest_order,
    MAX(order_date) as latest_order,
    AVG(total_amount) as avg_order_value,
    SUM(CASE WHEN is_suspicious_amount = 1 THEN 1 ELSE 0 END) as suspicious_records,
    SUM(CASE WHEN is_failed_order = 1 THEN 1 ELSE 0 END) as failed_orders
FROM sales_silver;

## ü•á 4. Business Analytics and Aggregations - Gold Layer

### 4.1 Modelagem Dimensional (Star Schema)

A camada Gold ser√° estruturada seguindo o modelo Star Schema com:
- **Tabela Fato:** `fact_sales` (m√©tricas de vendas)
- **Dimens√µes:** `dim_customer`, `dim_product`, `dim_seller`, `dim_date`

Este modelo otimiza consultas anal√≠ticas e facilita a cria√ß√£o de dashboards e relat√≥rios.

In [0]:
from pyspark.sql.functions import date_format, quarter, dayofmonth, min, max, count, sum, avg, when, col, countDistinct, year, month, dayofweek
# Create Gold Layer - Dimensional Model

def create_dim_date():
    """Create Date Dimension"""
    df_silver = spark.table(SILVER_TABLE)
    
    dim_date = df_silver.select("order_date").distinct() \
        .withColumn("date_key", date_format(col("order_date"), "yyyyMMdd").cast("int")) \
        .withColumn("year", year(col("order_date"))) \
        .withColumn("quarter", quarter(col("order_date"))) \
        .withColumn("month", month(col("order_date"))) \
        .withColumn("month_name", date_format(col("order_date"), "MMMM")) \
        .withColumn("day", dayofmonth(col("order_date"))) \
        .withColumn("day_of_week", dayofweek(col("order_date"))) \
        .withColumn("day_name", date_format(col("order_date"), "EEEE")) \
        .withColumn("is_weekend", when(dayofweek(col("order_date")).isin([1, 7]), 1).otherwise(0))
    
    dim_date.write.format("delta").mode("overwrite").saveAsTable(GOLD_DIM_DATE)
    return dim_date

def create_dim_customer():
    """Create Customer Dimension"""
    df_silver = spark.table(SILVER_TABLE)
    
    dim_customer = df_silver.groupBy("customer_id", "customer_name", "city", "state", "country") \
        .agg(
            min("order_date").alias("first_order_date"),
            max("order_date").alias("last_order_date"),
            count("order_id").alias("total_orders"),
            sum("total_amount").alias("total_lifetime_value")
        ) \
        .withColumn("customer_segment", 
                   when(col("total_lifetime_value") > 2000, "VIP")
                   .when(col("total_lifetime_value") > 1000, "HIGH_VALUE")
                   .when(col("total_lifetime_value") > 500, "MEDIUM_VALUE")
                   .otherwise("LOW_VALUE"))
    
    dim_customer.write.format("delta").mode("overwrite").saveAsTable(GOLD_DIM_CUSTOMER)
    return dim_customer

def create_dim_product():
    """Create Product Dimension"""
    df_silver = spark.table(SILVER_TABLE)
    
    dim_product = df_silver.groupBy("product_id", "product_name", "category", "brand") \
        .agg(
            avg("unit_price").alias("avg_unit_price"),
            count("order_id").alias("total_sales"),
            sum("quantity").alias("total_quantity_sold")
        ) \
        .withColumn("price_category",
                   when(col("avg_unit_price") > 400, "PREMIUM")
                   .when(col("avg_unit_price") > 200, "MID_RANGE")
                   .otherwise("BUDGET"))
    
    dim_product.write.format("delta").mode("overwrite").saveAsTable(GOLD_DIM_PRODUCT)
    return dim_product

def create_dim_seller():
    """Create Seller Dimension"""
    df_silver = spark.table(SILVER_TABLE)
    
    dim_seller = df_silver.groupBy("seller_id") \
        .agg(
            countDistinct("customer_id").alias("unique_customers"),
            countDistinct("product_id").alias("unique_products"),
            count("order_id").alias("total_orders"),
            sum("total_amount").alias("total_revenue"),
            avg("total_amount").alias("avg_order_value")
        ) \
        .withColumn("seller_performance",
                   when(col("total_revenue") > 10000, "TOP_PERFORMER")
                   .when(col("total_revenue") > 5000, "HIGH_PERFORMER")
                   .otherwise("STANDARD"))
    
    dim_seller.write.format("delta").mode("overwrite").saveAsTable(GOLD_DIM_SELLER)
    return dim_seller

# Create all dimension tables
print("üîÑ Creating Gold Layer Dimensions...")
dim_date = create_dim_date()
dim_customer = create_dim_customer()
dim_product = create_dim_product()
dim_seller = create_dim_seller()
print("‚úÖ All dimension tables created!")


In [0]:
# Create Fact Table
def create_fact_sales():
    """Create Sales Fact Table"""
    df_silver = spark.table(SILVER_TABLE)
    
    fact_sales = df_silver.select(
        # Surrogate Keys
        col("order_id").alias("order_key"),
        date_format(col("order_date"), "yyyyMMdd").cast("int").alias("date_key"),
        col("customer_id").alias("customer_key"),
        col("product_id").alias("product_key"),
        col("seller_id").alias("seller_key"),
        
        # Facts/Measures
        col("quantity"),
        col("unit_price"),
        col("gross_amount"),
        col("discount_rate"),
        col("discount_amount"),
        col("tax_amount"),
        col("shipping_cost"),
        col("net_amount"),
        col("total_amount"),
        
        # Degenerate Dimensions
        col("payment_method"),
        col("order_status"),
        
        # Flags
        col("is_suspicious_amount"),
        col("is_failed_order"),
        
        # Metadata
        current_timestamp().alias("fact_created_timestamp")
    )
    
    fact_sales.write.format("delta").mode("overwrite").saveAsTable(GOLD_FACT_TABLE)
    return fact_sales

# Create fact table
print("üîÑ Creating Fact Table...")
fact_sales = create_fact_sales()
fact_count = fact_sales.count()
print(f"‚úÖ Fact table '{GOLD_FACT_TABLE}' created with {fact_count:,} records!")

## üìä 5. Data Quality Analysis

### 5.1 Cat√°logo de Dados

| Camada | Tabela | Descri√ß√£o | Registros | Chaves |
|--------|---------|-----------|-----------|--------|
| Bronze | `sales_bronze` | Dados brutos do arquivo Amazon.csv | ~173 | OrderID |
| Silver | `sales_silver` | Dados limpos e validados | ~173 | order_id |
| Gold | `fact_sales` | Tabela fato de vendas | ~173 | order_key |
| Gold | `dim_customer` | Dimens√£o de clientes | ~150 | customer_id |
| Gold | `dim_product` | Dimens√£o de produtos | ~50 | product_id |
| Gold | `dim_seller` | Dimens√£o de vendedores | ~30 | seller_id |
| Gold | `dim_date` | Dimens√£o de tempo | ~1400 | date_key |

### 5.2 Dom√≠nios de Dados

In [0]:
from pyspark.sql.functions import abs, col
# Comprehensive Data Quality Analysis
def generate_data_profile():
    """Generate comprehensive data quality profile"""
    
    print("üîç COMPREHENSIVE DATA QUALITY ANALYSIS")
    print("=" * 60)
    
    # Load silver data for analysis
    df = spark.table(SILVER_TABLE)
    total_records = df.count()
    
    # 1. Completeness Analysis
    print(f"\nüìã 1. COMPLETENESS ANALYSIS")
    print(f"Total Records: {total_records:,}")
    
    completeness_report = []
    for column in df.columns:
        non_null_count = df.filter(col(column).isNotNull()).count()
        completeness = (non_null_count / total_records) * 100
        completeness_report.append((column, non_null_count, completeness))
        
    # Show completeness issues
    incomplete_columns = [col for col, count, pct in completeness_report if pct < 100]
    if incomplete_columns:
        print("‚ö†Ô∏è  Columns with missing values:")
        for col_name, count, pct in completeness_report:
            if pct < 100:
                print(f"   {col_name}: {pct:.1f}% complete ({total_records - count:,} missing)")
    else:
        print("‚úÖ All columns are 100% complete!")
    
    # 2. Validity Analysis
    print(f"\nüìä 2. VALIDITY ANALYSIS")
    
    # Price validity
    invalid_prices = df.filter((col("unit_price") < 0) | (col("total_amount") < 0)).count()
    print(f"Invalid prices (negative values): {invalid_prices:,}")
    
    # Date validity  
    invalid_dates = df.filter(col("order_date").isNull()).count()
    print(f"Invalid dates: {invalid_dates:,}")
    
    # Quantity validity
    invalid_quantities = df.filter(col("quantity") <= 0).count()
    print(f"Invalid quantities (‚â§0): {invalid_quantities:,}")
    
    # 3. Consistency Analysis
    print(f"\nüîÑ 3. CONSISTENCY ANALYSIS")
    
    # Status consistency
    status_values = df.select("order_status").distinct().collect()
    print(f"Order status values: {[row.order_status for row in status_values]}")
    
    # Country consistency
    country_count = df.select("country").distinct().count()
    print(f"Unique countries: {country_count}")
    
    # Category consistency
    category_count = df.select("category").distinct().count()
    print(f"Product categories: {category_count}")
    
    # 4. Accuracy Analysis
    print(f"\nüéØ 4. ACCURACY ANALYSIS")
    
    # Calculate expected vs actual totals
    df_calc_check = df.withColumn("calculated_gross", col("unit_price") * col("quantity")) \
                     .withColumn("gross_diff", abs(col("gross_amount") - col("calculated_gross")))
    
    calculation_errors = df_calc_check.filter(col("gross_diff") > 0.01).count()
    print(f"Calculation inconsistencies: {calculation_errors:,}")
    
    # Suspicious values
    suspicious_orders = df.filter(col("is_suspicious_amount") == 1).count()
    print(f"Suspicious amount flags: {suspicious_orders:,}")
    
    return completeness_report

# Generate data quality profile
profile_results = generate_data_profile()


## üîç 6. Business Intelligence Dashboard Creation

### 6.1 An√°lise de Neg√≥cio - Respondendo √†s Perguntas de Pesquisa

Agora vamos utilizar nossa arquitetura medalh√£o para responder √†s perguntas de neg√≥cio definidas no in√≠cio do projeto.

In [0]:



%sql
-- PERGUNTA 1: Performance de Vendas - Volume por m√™s e trimestre
    SELECT 
        d.year,
        d.quarter,
        d.month,            -- Inclu√≠do no SELECT para ordena√ß√£o cronol√≥gica
        d.month_name,
        COUNT(f.order_key) as total_orders,
        SUM(f.total_amount) as total_revenue,
        AVG(f.total_amount) as avg_order_value,
        SUM(f.quantity) as total_items_sold
    FROM fact_sales f
    JOIN dim_date d ON f.date_key = d.date_key
    WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
    GROUP BY 
        d.year, 
        d.quarter, 
        d.month, 
        d.month_name
    ORDER BY 
        d.year, 
        d.month;

        


In [0]:
sql_query = """
    SELECT 
        d.year,
        d.quarter,
        d.month, 
        d.month_name,
        COUNT(f.order_key) as total_orders,
        SUM(f.total_amount) as total_revenue,
        AVG(f.total_amount) as avg_order_value,
        SUM(f.quantity) as total_items_sold
    FROM fact_sales f
    JOIN dim_date d ON f.date_key = d.date_key
    WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
    GROUP BY 
        d.year, d.quarter, d.month, d.month_name
    ORDER BY 
        d.year, d.month
"""

dfPerformanceVendas = spark.sql(sql_query)

# 3. O DataFrame Spark est√° criado e pronto para manipula√ß√£o
print("DataFrame Spark 'dfPerformanceVendas' criado diretamente via spark.sql.")
display(dfPerformanceVendas)

In [0]:
%sql
-- PERGUNTA 2: Top 10 Produtos com melhor performance
SELECT 
    p.product_name,
    p.category,
    p.brand,
    p.total_sales as times_sold,
    p.total_quantity_sold,
    ROUND(p.avg_unit_price, 2) as avg_price,
    ROUND(SUM(f.total_amount), 2) as total_revenue,
    p.price_category
FROM dim_product p
JOIN fact_sales f ON p.product_id = f.product_key
WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
GROUP BY p.product_id, p.product_name, p.category, p.brand, p.total_sales, p.total_quantity_sold, p.avg_unit_price, p.price_category
ORDER BY total_revenue DESC
LIMIT 10;

In [0]:
%sql
-- PERGUNTA 3: Impacto do desconto nas vendas
SELECT 
    CASE 
        WHEN f.discount_rate = 0 THEN 'No Discount'
        WHEN f.discount_rate <= 0.05 THEN '1-5% Discount'
        WHEN f.discount_rate <= 0.15 THEN '6-15% Discount'
        WHEN f.discount_rate <= 0.25 THEN '16-25% Discount'
        ELSE '25%+ Discount'
    END as discount_bracket,
    COUNT(f.order_key) as total_orders,
    ROUND(AVG(f.discount_rate * 100), 2) as avg_discount_pct,
    ROUND(SUM(f.discount_amount), 2) as total_discount_given,
    ROUND(SUM(f.total_amount), 2) as total_revenue,
    ROUND(AVG(f.total_amount), 2) as avg_order_value
FROM fact_sales f
WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
GROUP BY 
    CASE 
        WHEN f.discount_rate = 0 THEN 'No Discount'
        WHEN f.discount_rate <= 0.05 THEN '1-5% Discount'
        WHEN f.discount_rate <= 0.15 THEN '6-15% Discount'
        WHEN f.discount_rate <= 0.25 THEN '16-25% Discount'
        ELSE '25%+ Discount'
    END
ORDER BY total_orders DESC;

In [0]:
%sql
-- PERGUNTA 4: An√°lise por Pa√≠s/Regi√£o
SELECT 
    c.country,
    c.state,
    COUNT(DISTINCT c.customer_id) as unique_customers,
    COUNT(f.order_key) as total_orders,
    ROUND(SUM(f.total_amount), 2) as total_revenue,
    ROUND(AVG(f.total_amount), 2) as avg_order_value,
    ROUND(SUM(f.total_amount) / COUNT(DISTINCT c.customer_id), 2) as revenue_per_customer
FROM dim_customer c
JOIN fact_sales f ON c.customer_id = f.customer_key
WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
GROUP BY c.country, c.state
ORDER BY total_revenue DESC;

In [0]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go
#Create visualizations for business insights
def create_business_visualizations():
    """
    Create comprehensive visualizations for business insights
    """
    # Load data for visualization
    df_sales_monthly = spark.sql("""
        SELECT 
            d.year, d.month, d.month_name,
            SUM(f.total_amount) as revenue,
            COUNT(f.order_key) as orders
        FROM fact_sales f
        JOIN dim_date d ON f.date_key = d.date_key
        WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
        GROUP BY d.year, d.month, d.month_name
        ORDER BY d.year, d.month
    """).toPandas()
    
    df_category_performance = spark.sql("""
        SELECT 
            p.category,
            COUNT(f.order_key) as total_orders,
            ROUND(SUM(f.total_amount), 2) as total_revenue,
            ROUND(AVG(f.total_amount), 2) as avg_order_value
        FROM fact_sales f
        JOIN dim_product p ON f.product_key = p.product_id
        WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
        GROUP BY p.category
        ORDER BY total_revenue DESC
    """).toPandas()
    
    df_payment_methods = spark.sql("""
        SELECT 
            f.payment_method,
            COUNT(f.order_key) as order_count,
            ROUND(SUM(f.total_amount), 2) as total_amount
        FROM fact_sales f
        WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
        GROUP BY f.payment_method
        ORDER BY total_amount DESC
    """).toPandas()
    
    # Create subplots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Monthly Revenue Trend', 'Category Performance', 
                       'Payment Methods Distribution', 'Order Status Analysis'),
        specs=[[{"secondary_y": True}, {"type": "bar"}],
               [{"type": "pie"}, {"type": "bar"}]]
    )
    
    # 1. Monthly Revenue Trend
    fig.add_trace(
        go.Scatter(x=df_sales_monthly['month_name'], y=df_sales_monthly['revenue'],
                  name='Revenue', line=dict(color='blue', width=3)),
        row=1, col=1
    )
    fig.add_trace(
        go.Scatter(x=df_sales_monthly['month_name'], y=df_sales_monthly['orders'],
                  name='Orders', yaxis='y2', line=dict(color='red', width=2)),
        row=1, col=1, secondary_y=True
    )
    
    # 2. Category Performance
    fig.add_trace(
        go.Bar(x=df_category_performance['category'], 
               y=df_category_performance['total_revenue'],
               name='Revenue by Category', marker_color='lightblue'),
        row=1, col=2
    )
    
    # 3. Payment Methods
    fig.add_trace(
        go.Pie(labels=df_payment_methods['payment_method'], 
               values=df_payment_methods['order_count'],
               name='Payment Methods'),
        row=2, col=1
    )
    
    # Order Status Analysis
    df_status = spark.sql("""
        SELECT order_status, COUNT(*) as count 
        FROM fact_sales 
        GROUP BY order_status
    """).toPandas()
    
    fig.add_trace(
        go.Bar(x=df_status['order_status'], y=df_status['count'],
               name='Order Status', marker_color='lightgreen'),
        row=2, col=2
    )
    
    # Update layout
    fig.update_layout(
        height=800,
        title_text="Amazon Sales Analytics Dashboard",
        showlegend=False
    )
    
    fig.show()
    
    return df_sales_monthly, df_category_performance, df_payment_methods

# Generate visualizations
print("üìä Creating Business Intelligence Visualizations...")
monthly_data, category_data, payment_data = create_business_visualizations()


## üîÑ 7. Pipeline Orchestration and Monitoring

### 7.1 Documenta√ß√£o do Pipeline ETL

#### Linhagem de Dados:
```
Amazon.csv (Fonte) 
    ‚Üì [ETL Process 1: Raw Ingestion]
sales_bronze (Camada Bronze - Raw Data)
    ‚Üì [ETL Process 2: Data Cleaning & Validation]  
sales_silver (Camada Silver - Clean Data)
    ‚Üì [ETL Process 3: Dimensional Modeling]
fact_sales + dim_* (Camada Gold - Analytics Ready)
```

#### Transforma√ß√µes Implementadas:
1. **Bronze ‚Üí Silver:**
   - Padroniza√ß√£o de nomes de colunas (snake_case)
   - Convers√£o de tipos de dados
   - Valida√ß√£o de valores (pre√ßos n√£o negativos, quantidades > 0)
   - C√°lculo de m√©tricas derivadas (gross_amount, net_amount)
   - Adi√ß√£o de flags de qualidade

2. **Silver ‚Üí Gold:**
   - Cria√ß√£o do modelo dimensional (Star Schema)
   - Agrega√ß√µes para dimens√µes
   - C√°lculo de KPIs de neg√≥cio
   - Categoriza√ß√£o de clientes e produtos

In [0]:
from datetime import datetime# Pipeline Monitoring and Summary

def generate_pipeline_summary():
    """
    Generate comprehensive pipeline execution summary
    """
    print("üìã PIPELINE EXECUTION SUMMARY")
    print("=" * 50)
    
    # Get table statistics
    tables_info = [
        ("Bronze", BRONZE_TABLE),
        ("Silver", SILVER_TABLE),
        ("Gold - Fact", GOLD_FACT_TABLE),
        ("Gold - Customer Dim", GOLD_DIM_CUSTOMER),
        ("Gold - Product Dim", GOLD_DIM_PRODUCT),
        ("Gold - Seller Dim", GOLD_DIM_SELLER),
        ("Gold - Date Dim", GOLD_DIM_DATE)
    ]
    
    for layer, table_name in tables_info:
        try:
            count = spark.table(table_name).count()
            print(f"‚úÖ {layer:15} | {table_name:20} | {count:8,} records")
        except Exception as e:
            print(f"‚ùå {layer:15} | {table_name:20} | Error: {str(e)}")
    
    # Data quality summary
    print(f"\nüîç DATA QUALITY METRICS:")
    df_silver = spark.table(SILVER_TABLE)
    
    total_records = df_silver.count()
    successful_orders = df_silver.filter(col("order_status").isin(["DELIVERED", "SHIPPED"])).count()
    failed_orders = df_silver.filter(col("order_status").isin(["CANCELLED", "RETURNED"])).count()
    suspicious_records = df_silver.filter(col("is_suspicious_amount") == 1).count()
    
    success_rate = (successful_orders / total_records) * 100
    failure_rate = (failed_orders / total_records) * 100
    
    print(f"Total Records Processed: {total_records:,}")
    print(f"Successful Orders: {successful_orders:,} ({success_rate:.1f}%)")
    print(f"Failed Orders: {failed_orders:,} ({failure_rate:.1f}%)")
    print(f"Suspicious Records: {suspicious_records:,}")
    
    # Business metrics
    print(f"\nüí∞ BUSINESS METRICS:")
    df_metrics = spark.sql("""
        SELECT 
            ROUND(SUM(total_amount), 2) as total_revenue,
            ROUND(AVG(total_amount), 2) as avg_order_value,
            COUNT(DISTINCT customer_id) as unique_customers,
            COUNT(DISTINCT product_id) as unique_products,
            MIN(order_date) as earliest_order,
            MAX(order_date) as latest_order
        FROM sales_silver
        WHERE order_status IN ('DELIVERED', 'SHIPPED')
    """).collect()[0]
    
    print(f"Total Revenue: ${df_metrics['total_revenue']:,.2f}")
    print(f"Average Order Value: ${df_metrics['avg_order_value']:,.2f}")
    print(f"Unique Customers: {df_metrics['unique_customers']:,}")
    print(f"Unique Products: {df_metrics['unique_products']:,}")
    print(f"Date Range: {df_metrics['earliest_order']} to {df_metrics['latest_order']}")
    
    print(f"\n‚úÖ PIPELINE STATUS: COMPLETED SUCCESSFULLY")
    print(f"üïê Execution completed at: {datetime.now()}")

# Generate pipeline summary
generate_pipeline_summary()

## üìù 8. Autoavalia√ß√£o

### 8.1 Objetivos Atingidos ‚úÖ

**Objetivos Principais Alcan√ßados:**

1. **‚úÖ Coleta e Modelagem de Dados:**
   - Implementa√ß√£o completa da Arquitetura Medalh√£o (Bronze, Silver, Gold)
   - Cria√ß√£o de modelo dimensional (Star Schema) com tabelas fato e dimens√£o
   - Documenta√ß√£o abrangente do cat√°logo de dados e linhagem

2. **‚úÖ Pipeline ETL Funcional:**
   - Pipeline automatizado de extra√ß√£o, transforma√ß√£o e carga
   - Valida√ß√µes de qualidade de dados em cada camada
   - Rastreamento de metadados e versionamento

3. **‚úÖ An√°lises de Neg√≥cio:**
   - Respostas para todas as 5 perguntas de pesquisa definidas
   - Insights sobre performance de vendas, comportamento de clientes e produtos
   - Visualiza√ß√µes interativas para dashboards

4. **‚úÖ Qualidade de Dados:**
   - An√°lise detalhada de completude, consist√™ncia e validade
   - Implementa√ß√£o de regras de neg√≥cio e flags de qualidade
   - Tratamento de valores suspeitos e inconsist√™ncias

### 8.2 Perguntas de Pesquisa - Status de Resolu√ß√£o

| Pergunta | Status | Resultado/Insight Principal |
|----------|--------|------------------------------|
| **Volume de vendas por per√≠odo** | ‚úÖ Respondida | Identificada sazonalidade e padr√µes mensais |
| **Produtos top performance** | ‚úÖ Respondida | Ranking por receita e quantidade vendida |
| **Impacto de descontos** | ‚úÖ Respondida | Correla√ß√£o entre desconto e volume de vendas |
| **An√°lise geogr√°fica** | ‚úÖ Respondida | EUA como principal mercado, seguido por Canad√° |
| **M√©todos de pagamento** | ‚úÖ Respondida | Diversidade de m√©todos com prefer√™ncias regionais |

### 8.3 Dificuldades Encontradas üöß

1. **Limita√ß√µes do Dataset:**
   - Dataset simulado com apenas 173 registros
   - Aus√™ncia de dados hist√≥ricos mais extensos para an√°lise temporal robusta
   - Algumas inconsist√™ncias naturais em dados sint√©ticos

2. **Complexidade da Arquitetura:**
   - Balanceamento entre simplicidade e completude do modelo dimensional
   - Escolha de agrega√ß√µes apropriadas para as tabelas Gold
   - Defini√ß√£o de regras de qualidade sem requisitos de neg√≥cio reais

3. **Ambiente Databricks:**
   - Configura√ß√£o de caminhos de arquivo para diferentes ambientes
   - Otimiza√ß√£o de performance para datasets maiores
   - Integra√ß√£o com ferramentas de orquestra√ß√£o

### 8.4 Trabalhos Futuros üöÄ

**Melhorias T√©cnicas:**
1. **Automa√ß√£o Avan√ßada:**
   - Implementar Databricks Workflows para orquestra√ß√£o completa
   - Adicionar testes automatizados de qualidade de dados
   - Configurar alertas de monitoramento e falhas

2. **Escalabilidade:**
   - Otimizar particionamento das tabelas Delta
   - Implementar estrat√©gias de caching para consultas frequentes
   - Adicionar processamento incremental (CDC - Change Data Capture)

3. **Governan√ßa de Dados:**
   - Implementar Unity Catalog para governan√ßa centralizada
   - Adicionar controles de acesso baseados em fun√ß√£o
   - Configurar auditoria e logging detalhado

**Expans√µes de Neg√≥cio:**
1. **Analytics Avan√ßados:**
   - Modelos de machine learning para previs√£o de vendas
   - An√°lise de cesta de mercado (market basket analysis)
   - Segmenta√ß√£o avan√ßada de clientes com clustering

2. **Visualiza√ß√µes Interativas:**
   - Dashboard em tempo real com Power BI ou Tableau
   - Relat√≥rios executivos automatizados
   - Alertas de KPIs cr√≠ticos

3. **Integra√ß√£o de Dados:**
   - Conectar com mais fontes de dados (redes sociais, reviews)
   - Implementar API para integra√ß√£o com sistemas transacionais
   - Adicionar dados externos (economia, sazonalidade)

### 8.5 Conclus√£o üéØ

Este MVP demonstrou com sucesso a implementa√ß√£o de um pipeline de dados moderno utilizando a Arquitetura Medalh√£o no Databricks. Apesar das limita√ß√µes do dataset, conseguimos:

- **Estruturar** um pipeline robusto e escal√°vel
- **Garantir** qualidade dos dados atrav√©s de m√∫ltiplas camadas de valida√ß√£o  
- **Responder** a todas as perguntas de neg√≥cio propostas
- **Gerar** insights acion√°veis para tomada de decis√£o
- **Documentar** todo o processo para reprodutibilidade

O projeto serve como uma **base s√≥lida** para implementa√ß√µes em produ√ß√£o e pode ser facilmente adaptado para diferentes dom√≠nios de neg√≥cio e volumes de dados maiores.

**Valor Agregado:** Este MVP demonstra compet√™ncia t√©cnica em engenharia de dados moderna, desde a coleta at√© a an√°lise, seguindo as melhores pr√°ticas da ind√∫stria.

In [0]:
%sql
-- Validate Silver layer data quality
SELECT 
    'SILVER_VALIDATION' as layer,
    COUNT(*) as total_records,
    COUNT(DISTINCT order_id) as unique_orders,
    COUNT(DISTINCT customer_id) as unique_customers,
    COUNT(DISTINCT product_id) as unique_products,
    MIN(order_date) as earliest_order,
    MAX(order_date) as latest_order,
    AVG(total_amount) as avg_order_value,
    SUM(CASE WHEN is_suspicious_amount = 1 THEN 1 ELSE 0 END) as suspicious_records,
    SUM(CASE WHEN is_failed_order = 1 THEN 1 ELSE 0 END) as failed_orders
FROM sales_silver;

In [0]:
%sql
-- PERGUNTA 4: An√°lise por Pa√≠s/Regi√£o
SELECT 
    c.country,
    c.state,
    COUNT(DISTINCT c.customer_id) as unique_customers,
    COUNT(f.order_key) as total_orders,
    ROUND(SUM(f.total_amount), 2) as total_revenue,
    ROUND(AVG(f.total_amount), 2) as avg_order_value,
    ROUND(SUM(f.total_amount) / COUNT(DISTINCT c.customer_id), 2) as revenue_per_customer
FROM dim_customer c
JOIN fact_sales f ON c.customer_id = f.customer_key
WHERE f.order_status NOT IN ('CANCELLED', 'RETURNED')
GROUP BY c.country, c.state
ORDER BY total_revenue DESC;