# ABInBev Case - Interactive Queries

Use this notebook to query the data processed by the pipeline. Tables are loaded as temporary views.

In [None]:
import os
from pyspark.sql import SparkSession

# Configurar SparkSession com Delta Lake
spark = (SparkSession.builder
    .appName("ABInBev_Interactive_Query")
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate())

# Caminho base dos dados (assumindo execucao na raiz do projeto ou ajustando path)
DATA_DIR = "../data" if os.path.exists("../data") else "data"

print(f"Spark Version: {spark.version}")
print(f"Data Directory: {DATA_DIR}")

## 1. Load Tables
Loading tables from Silver, Gold, and Consumption layers.

In [None]:
# Silver
spark.read.format("delta").load(f"{DATA_DIR}/silver/silver_sales_enriched").createOrReplaceTempView("silver_sales")
spark.read.format("delta").load(f"{DATA_DIR}/silver/silver_channel_features").createOrReplaceTempView("silver_channels")

# Gold
spark.read.format("delta").load(f"{DATA_DIR}/gold/gold_sales_enriched").createOrReplaceTempView("gold_sales")

# Consumption (Dimensions & Facts)
spark.read.format("delta").load(f"{DATA_DIR}/consumption/dim_date").createOrReplaceTempView("dim_date")
spark.read.format("delta").load(f"{DATA_DIR}/consumption/dim_product").createOrReplaceTempView("dim_product")
spark.read.format("delta").load(f"{DATA_DIR}/consumption/dim_region").createOrReplaceTempView("dim_region")
spark.read.format("delta").load(f"{DATA_DIR}/consumption/dim_channel").createOrReplaceTempView("dim_channel")
spark.read.format("delta").load(f"{DATA_DIR}/consumption/fact_sales").createOrReplaceTempView("fact_sales")

print("Tables loaded and registered as Temp Views!")

## 2. Business Questions
### 2.1 Top 3 Trade Groups by Region

In [None]:
query_1 = """
SELECT 
    r.region_name,
    c.trade_group_desc,
    SUM(f.dollar_volume) as total_dollar_volume
FROM fact_sales f
JOIN dim_region r ON f.region_key = r.region_key
JOIN dim_channel c ON f.channel_key = c.channel_key
GROUP BY r.region_name, c.trade_group_desc
ORDER BY r.region_name, total_dollar_volume DESC
"""
spark.sql(query_1).show()

### 2.2 Sales by Brand per Month

In [None]:
query_2 = """
SELECT 
    p.brand_nm,
    d.year,
    d.month,
    SUM(f.dollar_volume) as total_volume
FROM fact_sales f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_date d ON f.date_key = d.date_key
GROUP BY p.brand_nm, d.year, d.month
ORDER BY p.brand_nm, d.year, d.month
"""
spark.sql(query_2).show()

## 3. Observability & Control
Analysis of pipeline execution metadata and data quality issues.

In [None]:
# Load Process Control
spark.read.format("delta").load(f"{DATA_DIR}/control/process_control").createOrReplaceTempView("process_control")

# Load Quarantine (Handle if empty/missing)
try:
    spark.read.format("delta").load(f"{DATA_DIR}/control/quarantine").createOrReplaceTempView("quarantine")
    print("Control & Quarantine tables loaded.")
except Exception as e:
    print(f"Quarantine table empty (Clean Run). Creating empty view for demo compatibility.")
    # Create empty DataFrame with correct schema
    schema_ddl = "quarantine_id STRING, batch_id STRING, source_table STRING, target_table STRING, record_data STRING, error_type STRING, error_code STRING, error_description STRING, dq_rule_name STRING, is_known_rule BOOLEAN, reprocessed BOOLEAN, reprocess_batch_id STRING, created_at TIMESTAMP, updated_at TIMESTAMP"
    spark.createDataFrame([], schema=schema_ddl).createOrReplaceTempView("quarantine")

### 3.1 Recent Pipeline Executions

In [None]:
query_control = """
SELECT 
    batch_id,
    layer,
    table_name,
    status,
    duration_seconds,
    records_read,
    records_written,
    records_quarantined
FROM process_control
ORDER BY start_timestamp DESC
LIMIT 20
"""
spark.sql(query_control).show(truncate=False)

### 3.2 Quarantine Analysis (if any)

In [None]:
spark.sql("SELECT * FROM quarantine LIMIT 10").show()

### 3.3 Visual Observability
Graphical representation of execution metadata.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Fetch Process Control Data for Charts
pdf_control = spark.sql("""
    SELECT table_name, records_read, records_written, records_quarantined 
    FROM process_control 
    WHERE batch_id = (SELECT MAX(batch_id) FROM process_control)
""").toPandas()

# Create Bar Chart
if not pdf_control.empty:
    plt.figure(figsize=(14, 7))
    ax = pdf_control.set_index('table_name')[['records_read', 'records_written', 'records_quarantined']].plot(kind='bar', figsize=(14, 7))
    plt.title('Records Processed per Table (Latest Batch)')
    plt.ylabel('Count')
    plt.xticks(rotation=45, ha='right')
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.legend(title='Metrics')
    plt.tight_layout()
    plt.show()
else:
    print("No process data found to plot.")

## 4. Governance & Architecture
Evidence of the underlying engineering architecture for Data Governance and Observability.
Although running locally, these configurations define the behavior in a production cloud environment.

### 4.1 Access Policies & Governance
Configuration defining access control and data sensitivity levels (e.g. PII handling).

In [None]:
import os

# Determine project root to find config files
# Assuming notebook is in 'notebooks/' directory
PROJECT_ROOT = ".." if os.path.exists("../config") else "."

policy_path = f"{PROJECT_ROOT}/config/governance_policies.yaml"

if os.path.exists(policy_path):
    print(f"--- Loading Policy Configuration: {policy_path} ---\n")
    print(open(policy_path).read())
else:
    print(f"Config file not found at {policy_path}. Checked at: {os.path.abspath(policy_path)}")

### 4.2 Observability & Alerting Rules
Prometheus alert definitions for pipeline health monitoring.

In [None]:
alert_path = f"{PROJECT_ROOT}/config/alert_rules.yml"

if os.path.exists(alert_path):
    print(f"--- Loading Alert Rules: {alert_path} ---\n")
    print(open(alert_path).read())
else:
    print(f"Config file not found at {alert_path}")

### 4.3 Data Catalog Integration (OpenMetadata)
Source code responsible for registering table metadata and lineage.

In [None]:
ingest_path = f"{PROJECT_ROOT}/src/governance/ingest_metadata.py"

if os.path.exists(ingest_path):
    print(f"--- Source: {ingest_path} ---\n")
    # Print first 50 lines to show import structure and class definition
    with open(ingest_path) as f:
        print("".join(f.readlines()[:50]))
        print("... (truncated)")
else:
    print(f"Source file not found at {ingest_path}")