# Fabric SKU Advisor
> **Version 1.1.0** | Updated: 2026-02-06 | [Data Nova](https://www.data-nova.io)
## Throttling & carryforward analysis

**By [Prathy Kamasani](https://www.linkedin.com/in/prathy/) | [Data Nova](https://www.data-nova.io)**

*Microsoft Fabric Training & Consulting*

---

This notebook extracts metrics from the Fabric Capacity Metrics semantic model, including:

- **Utilisation**: Interactive vs Background CU consumption
- **Throttling**: Delay and rejection percentages, risk levels
- **Carryforward/Overage**: Accumulated debt and recovery projections
- **Health score**: Composite metric (0–100)
- **SKU recommendation**: Based on usage and target utilisation

### Prerequisites
1. Install the Fabric Capacity Metrics app in your tenant
2. Have capacity admin or app access permissions
3. Move the Capacity Metrics workspace to a Fabric capacity (for XMLA access)
4. Ensure XMLA endpoint is enabled on the capacity

### Expected runtime
**15–30+ minutes** – The notebook runs many DAX queries (one per day for timepoints and one per day for item/operation data). If you run via Fabric CLI, use a timeout of at least **1800 seconds (30 min)** or **3600 (1 hour)**.

### How to Use
1. **Run All** – Install/import runs first; if the kernel restarts after pip, run **Run All** again (configuration will re-run).
2. Set **WORKSPACE_ID** and **DATASET_ID** in the Configuration section (from your Capacity Metrics app URL or semantic model Properties).
3. Set DEBUG_MODE = True for detailed logging (recommended for first run).
4. Review the report and SKU recommendation.

---
## 1. Import Libraries & Optional Install

**Import libraries first** (no pip, no kernel restart). Then the optional cell checks your Semantic Link version and installs/upgrades **semantic-link-labs** only. If that cell restarts the kernel, run **Run All** again – imports run first again, then configuration.

Ref: [Keep Your Semantic Link Libraries Up to Date](https://www.data-nova.io/post/keep-your-semantic-link-libraries-up-to-date-in-fabric-notebooks) (Data Nova)

In [None]:
# =============================================================================
# Import libraries – run this first (no pip, no kernel restart)
# =============================================================================
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time
import warnings
warnings.filterwarnings('ignore')

import sempy.fabric as fabric

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from plotly.io import to_html

print("[SUCCESS] All libraries loaded (SemPy, pandas, plotly)")

In [None]:
# =============================================================================
# Semantic Link version check + semantic-link-labs install (optional)
# =============================================================================
# We do NOT upgrade semantic-link (pre-installed). We only install/upgrade
# semantic-link-labs. If the kernel restarts after this, run Run All again.
#
import subprocess
import sys
import json
import urllib.request

def get_latest_version(package_name):
    """Fetch latest version from PyPI"""
    try:
        url = f"https://pypi.org/pypi/{package_name}/json"
        with urllib.request.urlopen(url, timeout=5) as response:
            data = json.loads(response.read())
            return data['info']['version']
    except Exception as e:
        print(f"[WARNING] Could not fetch latest version: {e}")
        return None

# Check semantic-link version (pre-installed – we do not reinstall)
try:
    from importlib.metadata import version, PackageNotFoundError
    installed_version = version('semantic-link-sempy')
    print(f"[INFO] semantic-link (pre-installed): {installed_version}")
    latest_version = get_latest_version('semantic-link')
    if latest_version:
        try:
            from packaging.version import Version
            if Version(installed_version) < Version(latest_version):
                print(f"[INFO] Newer on PyPI: {latest_version} (Fabric manages this; no action needed)")
        except ImportError:
            pass
except PackageNotFoundError:
    print("[INFO] semantic-link not found (unexpected in Fabric)")

# Install/upgrade semantic-link-labs only
print("\n[INFO] Installing/upgrading semantic-link-labs...")
result = subprocess.run(
    [sys.executable, "-m", "pip", "install", "--upgrade", "--quiet", "semantic-link-labs"],
    capture_output=True,
    text=True
)
if result.returncode == 0:
    print("[SUCCESS] semantic-link-labs installed/upgraded")
else:
    print(f"[WARNING] pip message (often safe to ignore): {result.stderr[:300] if result.stderr else 'none'}")

---
## 2. Configuration

Set your Capacity Metrics workspace/dataset IDs and options here. If the kernel restarted after the install cell, run **Run All** again – config will re-run.

In [None]:
# =============================================================================
# CONFIGURATION - Update these values for your environment
# =============================================================================

# ── Capacity Metrics Connection ──
# Workspace, semantic model, and capacity IDs (replace with placeholders when replicating to public).
WORKSPACE_ID = "your-workspace-guid"   # Workspace containing Capacity Metrics
DATASET_ID   = "your-dataset-guid"   # Capacity Metrics semantic model ID

# ── Analysis Mode ──
# "single" = analyse one capacity (CAPACITY_ID required)
# "multi"  = auto-discover and analyse all active capacities
ANALYSIS_MODE = "single"      # "single" or "multi"
CAPACITY_ID = "your-capacity-guid"   # Capacity to analyse (single mode)
CAPACITY_IDS = None           # Optional for multi mode: list of GUIDs to filter, or None for all active

# ── Analysis Settings ──
DAYS_TO_ANALYZE = 14          # Number of days (max 14 for Capacity Metrics retention)
NEEDS_FREE_VIEWERS = False    # Set True if you need F64+ for free viewers

# ── Advanced Analysis ──
WEEKDAY_WEEKEND_SPLIT = True  # Separate weekday vs weekend analysis for more accurate sizing
TREND_ANALYSIS = True         # Show consumption trend and growth projection
RESERVED_VS_PAYG = True       # Include reserved vs PAYG cost comparison
SPIKE_FILTERING = True        # Filter settlement/pause catch-up spikes from P80 calculations

# ── Output ──
DEBUG_MODE = True             # True for detailed logging, False for clean output
SAVE_TO_LAKEHOUSE = True      # Save results to Lakehouse Delta tables
LAKEHOUSE_NAME = "LH_Capacity_Advisor"
SAVE_HTML_REPORT = True       # Generate one HTML report per capacity
REPLACE_EXISTING_OUTPUTS = True  # True: deterministic names (replace). False: keep history with timestamp suffix.

# ── Validation ──
if not WORKSPACE_ID or not WORKSPACE_ID.strip():
    raise ValueError("WORKSPACE_ID is empty. Provide the workspace GUID containing Capacity Metrics.")
if not DATASET_ID or not DATASET_ID.strip():
    raise ValueError("DATASET_ID is empty. Provide the Capacity Metrics semantic model GUID.")

WORKSPACE_ID = WORKSPACE_ID.strip()
DATASET_ID = DATASET_ID.strip()

_mode = ANALYSIS_MODE.strip().lower()
if _mode not in ("single", "multi"):
    raise ValueError(f"ANALYSIS_MODE must be 'single' or 'multi', got: '{ANALYSIS_MODE}'")

if _mode == "single" and (not CAPACITY_ID or not CAPACITY_ID.strip()):
    raise ValueError("ANALYSIS_MODE is 'single' but CAPACITY_ID is empty. Please provide a capacity GUID.")

if _mode == "single":
    CAPACITY_ID = CAPACITY_ID.strip()
    print(f"[INFO] Mode: SINGLE capacity")
    print(f"[INFO] Capacity ID: {CAPACITY_ID}")
else:
    print(f"[INFO] Mode: MULTI capacity (auto-discover all active)")
    if CAPACITY_IDS:
        print(f"[INFO] Filtered to {len(CAPACITY_IDS)} specific capacities")

print(f"[INFO] Workspace: {WORKSPACE_ID}")
print(f"[INFO] Dataset:   {DATASET_ID}")
print(f"[INFO] Days: {DAYS_TO_ANALYZE} | Debug: {'ON' if DEBUG_MODE else 'OFF'}")
print(f"[INFO] Lakehouse file policy: {'REPLACE existing files' if REPLACE_EXISTING_OUTPUTS else 'KEEP history (timestamped suffixes)'}")


---
## 3. Reference Data & Helper Functions

SKU definitions and thresholds used for recommendations.

### Fabric SKU Tiers
Each SKU has a fixed CU (Capacity Unit) rate per second. In a 30-second evaluation window:
- F2 = 2 CUs/sec = 60 CUs per 30-sec window
- F64 = 64 CUs/sec = 1,920 CUs per 30-sec window (minimum for free viewers)

### Throttling Thresholds
Microsoft applies throttling when capacity is overloaded:
- **Interactive Delay**: Starts after 10 minutes of overload
- **Interactive Rejection**: Starts after 60 minutes of overload
- **Background Rejection**: Starts after 24 hours of overload

In [None]:
# =============================================================================
# SKU Definitions - Microsoft Fabric capacity tiers
# =============================================================================
# CU rates and budgets from Microsoft documentation.
# Pricing: published PAYG list prices (USD) and estimated 1-year reserved prices.
# Actual costs vary by region, currency, and agreement type.
# Check https://azure.microsoft.com/pricing/details/microsoft-fabric/
# Prices last updated: 2025. Verify current pricing before making decisions.
#
SKUS = [
    {"name": "F2",    "cus_per_second": 2,    "budget_30s": 60,    "monthly_usd": 262,    "monthly_reserved_usd": 155},
    {"name": "F4",    "cus_per_second": 4,    "budget_30s": 120,   "monthly_usd": 525,    "monthly_reserved_usd": 310},
    {"name": "F8",    "cus_per_second": 8,    "budget_30s": 240,   "monthly_usd": 1049,   "monthly_reserved_usd": 619},
    {"name": "F16",   "cus_per_second": 16,   "budget_30s": 480,   "monthly_usd": 2099,   "monthly_reserved_usd": 1239},
    {"name": "F32",   "cus_per_second": 32,   "budget_30s": 960,   "monthly_usd": 4198,   "monthly_reserved_usd": 2477},
    {"name": "F64",   "cus_per_second": 64,   "budget_30s": 1920,  "monthly_usd": 8395,   "monthly_reserved_usd": 4954},   # Min for free viewers
    {"name": "F128",  "cus_per_second": 128,  "budget_30s": 3840,  "monthly_usd": 16790,  "monthly_reserved_usd": 9907},
    {"name": "F256",  "cus_per_second": 256,  "budget_30s": 7680,  "monthly_usd": 33580,  "monthly_reserved_usd": 19813},
    {"name": "F512",  "cus_per_second": 512,  "budget_30s": 15360, "monthly_usd": 67161,  "monthly_reserved_usd": 39625},
    {"name": "F1024", "cus_per_second": 1024, "budget_30s": 30720, "monthly_usd": 134321, "monthly_reserved_usd": 79250},
    {"name": "F2048", "cus_per_second": 2048, "budget_30s": 61440, "monthly_usd": 268643, "monthly_reserved_usd": 158499},
]

# Target utilisation: 80% leaves headroom for peaks
TARGET_UTILISATION = 0.80

# Reserved pricing discount (approx 41% savings vs PAYG)
RESERVED_DISCOUNT_PCT = 41
RESERVED_BREAKEVEN_UTIL = 0.60  # Below this, PAYG + pause/resume may be cheaper

print(f"[INFO] {len(SKUS)} SKU tiers loaded (F2 to F2048)")
print("[INFO] Pricing: published PAYG and 1-year reserved list prices (USD). Your actual costs may differ.")


In [None]:
# =============================================================================
# Helper Functions
# =============================================================================
import numpy as np

def calculate_utilisation(daily_cus: float, sku: dict) -> float:
    """
    Calculate utilisation ratio for a given daily CU consumption on a SKU.
    Returns a decimal (0.75 = 75%).
    """
    cus_per_window = daily_cus / 2880  # 2880 = 30-second windows per 24 hours
    return cus_per_window / sku["budget_30s"]


def calculate_required_budget(daily_cus: float, target_utilisation: float = 0.80) -> float:
    """
    Calculate the required 30-second CU budget to handle a daily load
    at the target utilisation level.
    """
    cus_per_window = daily_cus / 2880
    return cus_per_window / target_utilisation


def get_sku_status(avg_util: float, max_util: float, needs_free_viewers: bool, sku: dict) -> str:
    """
    Classify a SKU's fit based on utilisation.
    avg_util and max_util are decimals (0.75 = 75%).
    """
    if needs_free_viewers and sku["cus_per_second"] < 64:
        return "NO FREE VIEWERS"
    if max_util > 1.0:
        return "THROTTLING RISK"
    if avg_util > 0.95:
        return "TOO SMALL"
    if avg_util > 0.85:
        return "TIGHT"
    if avg_util > 0.60:
        return "GOOD FIT"
    if avg_util > 0.40:
        return "COMFORTABLE"
    return "OVERSIZED"


def calculate_health_score(avg_util_pct: float, throttle_pct: float, carryover_pct: float) -> tuple:
    """
    Composite capacity health score (0-100).

    Weights: utilisation 40%, throttling 40%, carryforward 20%.
    Returns (score, rating).
    """
    # Utilisation component (40%)
    if avg_util_pct <= 70:
        util_score = 100
    elif avg_util_pct <= 85:
        util_score = 100 - ((avg_util_pct - 70) * 2)
    elif avg_util_pct <= 100:
        util_score = 70 - ((avg_util_pct - 85) * 3)
    else:
        util_score = max(0, 25 - ((avg_util_pct - 100) * 5))

    # Throttling component (40%)
    if throttle_pct == 0:
        throttle_score = 100
    elif throttle_pct < 5:
        throttle_score = 80
    elif throttle_pct < 15:
        throttle_score = 50
    elif throttle_pct < 30:
        throttle_score = 25
    else:
        throttle_score = 0

    # Carryforward component (20%)
    if carryover_pct == 0:
        carryover_score = 100
    elif carryover_pct < 10:
        carryover_score = 80
    elif carryover_pct < 30:
        carryover_score = 50
    elif carryover_pct < 50:
        carryover_score = 25
    else:
        carryover_score = 0

    score = (util_score * 0.4) + (throttle_score * 0.4) + (carryover_score * 0.2)

    if score >= 90:
        rating = "EXCELLENT"
    elif score >= 75:
        rating = "GOOD"
    elif score >= 50:
        rating = "FAIR"
    elif score >= 25:
        rating = "POOR"
    else:
        rating = "CRITICAL"

    return round(score, 1), rating


def format_duration(minutes: float) -> str:
    """Format minutes into a readable duration."""
    if minutes < 60:
        return f"{minutes:.0f} min"
    elif minutes < 1440:
        return f"{minutes / 60:.1f} hours"
    else:
        return f"{minutes / 1440:.1f} days"


print("[INFO] Core SKU helper functions loaded")


### Trend Analysis & Spike Detection
Linear regression on daily CU consumption to detect growth/decline, plus spike filtering for pause/resume catch-up days.


In [None]:
# =============================================================================
# Trend Analysis & Spike Detection
# =============================================================================

def calculate_trend(daily_summary, column='ActualCUs_sum'):
    """
    Calculate linear trend and growth rate for a daily metric.
    Returns dict with slope, weekly_growth_pct, forecast_weeks_to_exceed (if applicable).
    """
    if len(daily_summary) < 3:
        return {'has_trend': False}

    y = daily_summary[column].values.astype(float)
    x = np.arange(len(y), dtype=float)

    # Linear regression
    n = len(x)
    sum_x = np.sum(x)
    sum_y = np.sum(y)
    sum_xy = np.sum(x * y)
    sum_x2 = np.sum(x * x)

    denom = (n * sum_x2 - sum_x * sum_x)
    if denom == 0:
        return {'has_trend': False}

    slope = (n * sum_xy - sum_x * sum_y) / denom
    intercept = (sum_y - slope * sum_x) / n

    # R-squared
    y_pred = slope * x + intercept
    ss_res = np.sum((y - y_pred) ** 2)
    ss_tot = np.sum((y - np.mean(y)) ** 2)
    r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0

    # Weekly growth rate
    avg_val = np.mean(y) if np.mean(y) != 0 else 1
    daily_growth_pct = (slope / avg_val) * 100
    weekly_growth_pct = daily_growth_pct * 7

    # Direction
    if weekly_growth_pct > 2:
        direction = "GROWING"
    elif weekly_growth_pct < -2:
        direction = "DECLINING"
    else:
        direction = "STABLE"

    return {
        'has_trend': True,
        'slope': slope,
        'intercept': intercept,
        'r_squared': round(r_squared, 3),
        'daily_growth_pct': round(daily_growth_pct, 2),
        'weekly_growth_pct': round(weekly_growth_pct, 2),
        'direction': direction,
    }


def detect_pause_spikes(daily_summary, threshold_factor=2.0):
    """
    Detect days that are likely settlement or pause/resume catch-up spikes.
    A spike is a day where peak CUs exceed the median by threshold_factor times.
    Returns a boolean mask of spike days.
    """
    if len(daily_summary) < 3:
        return pd.Series([False] * len(daily_summary), index=daily_summary.index)

    median_peak = daily_summary['ActualCUs_max'].median()
    if median_peak == 0:
        return pd.Series([False] * len(daily_summary), index=daily_summary.index)

    return daily_summary['ActualCUs_max'] > (median_peak * threshold_factor)

print("[INFO] Trend analysis & spike detection loaded")


### Workspace Breakdown
Groups item-level CU data by workspace, with optional billable vs non-billable split.


In [None]:
# =============================================================================
# Workspace Breakdown
# =============================================================================

def get_workspace_breakdown(df_items_cap):
    """
    Extract workspace-level CU breakdown from item data.
    Prefers Workspace Name over Workspace Id for display.
    Returns a DataFrame grouped by workspace.
    """
    if len(df_items_cap) == 0:
        return pd.DataFrame()

    # Prefer Workspace Name, fall back to Workspace Id
    ws_name_cols = [c for c in df_items_cap.columns if 'workspace' in c.lower() and 'name' in c.lower()]
    ws_id_cols = [c for c in df_items_cap.columns if 'workspace' in c.lower() and 'id' in c.lower()]

    # Use name column only if it exists AND has non-empty values
    ws_col = None
    if ws_name_cols:
        _name_col = ws_name_cols[0]
        _valid = df_items_cap[_name_col].dropna().astype(str).str.strip().replace('', pd.NA).dropna()
        if len(_valid) > 0:
            ws_col = _name_col
    if ws_col is None:
        ws_col = ws_id_cols[0] if ws_id_cols else None

    cu_cols = [c for c in df_items_cap.columns if 'CU' in c.upper() and 'Total' in c]
    if not ws_col or not cu_cols:
        return pd.DataFrame()

    cu_col = cu_cols[0]

    # Check for billing type column
    billing_cols = [c for c in df_items_cap.columns if 'billing' in c.lower() and 'type' in c.lower()]

    ws_summary = df_items_cap.groupby(ws_col).agg(
        TotalCUs=(cu_col, 'sum'),
        Operations=(cu_col, 'count'),
    ).reset_index()
    ws_summary.columns = ['Workspace', 'Total CUs', 'Operations']

    # Add billable vs non-billable if available
    if billing_cols:
        bill_col = billing_cols[0]
        billable = df_items_cap[df_items_cap[bill_col].astype(str).str.lower().str.contains('billable')]
        nonbillable = df_items_cap[~df_items_cap[bill_col].astype(str).str.lower().str.contains('billable')]
        bill_by_ws = billable.groupby(ws_col)[cu_col].sum().reset_index()
        bill_by_ws.columns = ['Workspace', 'Billable CUs']
        ws_summary = ws_summary.merge(bill_by_ws, on='Workspace', how='left')
        ws_summary['Billable CUs'] = ws_summary['Billable CUs'].fillna(0)

    ws_summary = ws_summary.sort_values('Total CUs', ascending=False)
    total = ws_summary['Total CUs'].sum()
    ws_summary['Share %'] = (ws_summary['Total CUs'] / total * 100).round(1) if total > 0 else 0
    return ws_summary

print("[INFO] Workspace breakdown loaded")


---
## 4. Connect & Detect Version

The Capacity Metrics semantic model has multiple versions (v37, v40, v44, v47+) with different measure names. This section detects the version to choose the correct DAX queries.

This approach ensures compatibility across different Capacity Metrics app versions.

In [None]:
# =============================================================================
# Connect to Capacity Metrics & Detect Version
# =============================================================================
# The Capacity Metrics semantic model has multiple versions with different
# measure names. We detect the version to use the correct DAX queries.
#
print("[INFO] Connecting to Capacity Metrics semantic model...")

start_time = time.time()
version = None

# Test for v53 (newer Capacity Metrics / FUAM; uses Capacity Id with capital C)
try:
    test_query_v53 = """EVALUATE ROW("Test", 'All Measures'[Blocked workspaces (Day)])"""
    test_df = fabric.evaluate_dax(
        workspace=WORKSPACE_ID, 
        dataset=DATASET_ID, 
        dax_string=test_query_v53
    )
    version = 'v53'
    print("[SUCCESS] Detected Capacity Metrics version: v53 (latest)")
except Exception as e:
    if DEBUG_MODE:
        print(f"[DEBUG] v53 test failed: {str(e)[:100]}")

# Test for v47 (CapacitiesList parameter; capacity Id lowercase)
if version is None:
    try:
        test_query_v47 = """DEFINE MPARAMETER 'DefaultCapacityID' = "00000000-0000-0000-0000-000000000000"
        EVALUATE SUMMARIZECOLUMNS("Test", [Background billable CU %])"""
        test_df = fabric.evaluate_dax(
            workspace=WORKSPACE_ID, 
            dataset=DATASET_ID, 
            dax_string=test_query_v47
        )
        version = 'v47'
        print("[SUCCESS] Detected Capacity Metrics version: v47+")
    except Exception as e:
        if DEBUG_MODE:
            print(f"[DEBUG] v47 test failed: {str(e)[:100]}")

# Test for v40-v44
if version is None:
    try:
        test_query_v40 = """EVALUATE ROW("Test", 'All Measures'[Background billable CU %])"""
        test_df = fabric.evaluate_dax(
            workspace=WORKSPACE_ID, 
            dataset=DATASET_ID, 
            dax_string=test_query_v40
        )
        version = 'v40'
        print("[SUCCESS] Detected Capacity Metrics version: v40-v44")
    except Exception as e:
        if DEBUG_MODE:
            print(f"[DEBUG] v40 test failed: {str(e)[:100]}")

# Test for v37 (older version)
if version is None:
    try:
        test_query_v37 = """EVALUATE ROW("Test", 'All Measures'[xBackground %])"""
        test_df = fabric.evaluate_dax(
            workspace=WORKSPACE_ID, 
            dataset=DATASET_ID, 
            dax_string=test_query_v37
        )
        version = 'v37'
        print("[SUCCESS] Detected Capacity Metrics version: v37 (older)")
    except Exception as e:
        if DEBUG_MODE:
            print(f"[DEBUG] v37 test failed: {str(e)[:100]}")


def _probe_column_exists(column_ref: str) -> tuple:
    """Return (exists, error_message). Uses a tiny DAX query to validate column metadata."""
    probe = f"""
    EVALUATE
    TOPN(1, SUMMARIZECOLUMNS({column_ref}))
    """
    try:
        fabric.evaluate_dax(
            workspace=WORKSPACE_ID,
            dataset=DATASET_ID,
            dax_string=probe
        )
        return True, None
    except Exception as ex:
        return False, str(ex)


def _validate_schema_for_version(ver: str) -> None:
    """Validate required/optional columns for detected Capacity Metrics version."""

    # Each required entry is: (label, [acceptable column references])
    required_by_version = {
        'v53': [
            ("Capacity id", ["Capacities[Capacity Id]"]),
            ("Capacity name", ["Capacities[Capacity name]"]),
            ("Capacity state", ["Capacities[State]", "Capacities[state]"]),
            ("Timepoint date", ["TimePoints[Date]"]),
            ("Timepoint key", ["TimePoints[TimePoint]"]),
            ("Item operation date", ["'Metrics By Item Operation And Day'[Date]"]),
            ("Item operation item id", ["'Metrics By Item Operation And Day'[Item Id]"]),
            ("Item operation CU", ["'Metrics By Item Operation And Day'[CU (s)]"]),
            ("Item name", ["'Items'[Item Name]"]),
            ("Item kind", ["'Items'[Item Kind]"]),
            ("Workspace name", ["'Items'[Workspace Name]"]),
        ],
        'v47': [
            ("Capacity id", ["Capacities[capacity Id]"]),
            ("Capacity name", ["Capacities[Capacity name]"]),
            ("Capacity state", ["Capacities[State]", "Capacities[state]"]),
            ("Timepoint date", ["TimePoints[Date]"]),
            ("Timepoint key", ["TimePoints[TimePoint]"]),
            ("Item operation date", ["'Metrics By Item Operation And Day'[Date]"]),
            ("Item operation item id", ["'Metrics By Item Operation And Day'[Item Id]"]),
            ("Item operation CU", ["'Metrics By Item Operation And Day'[CU (s)]"]),
            ("Item name", ["'Items'[Item Name]"]),
            ("Item kind", ["'Items'[Item Kind]"]),
            ("Workspace name", ["'Items'[Workspace Name]"]),
        ],
        'v40': [
            ("Capacity id", ["Capacities[capacity Id]"]),
            ("Capacity name", ["Capacities[Capacity name]"]),
            ("Capacity state", ["Capacities[State]", "Capacities[state]"]),
            ("Timepoint date", ["TimePoints[Date]"]),
            ("Timepoint key", ["TimePoints[TimePoint]"]),
            ("Item operation date", ["'Metrics By Item Operation And Day'[Date]"]),
            ("Item operation item id", ["'Metrics By Item Operation And Day'[Item Id]"]),
            ("Item operation CU", ["'Metrics By Item Operation And Day'[CU (s)]"]),
            ("Item name", ["'Items'[Item Name]"]),
            ("Item kind", ["'Items'[Item Kind]"]),
            ("Workspace name", ["'Items'[Workspace Name]"]),
        ],
        'v37': [
            ("Capacity id", ["Capacities[capacityId]"]),
            ("Capacity state", ["Capacities[state]"]),
            ("Timepoint date", ["TimePoints[Date]"]),
            ("Timepoint key", ["TimePoints[TimePoint]"]),
            ("Item operation date", ["'MetricsByItemandOperationandDay'[Date]"]),
            ("Item operation item id", ["'MetricsByItemandOperationandDay'[ItemId]"]),
            ("Item operation CU", ["'MetricsByItemandOperationandDay'[sum_CU]"]),
            ("Item name", ["'Items'[Item Name]"]),
            ("Item kind", ["'Items'[ItemKind]"]),
            ("Workspace name", ["'Items'[Workspace Name]"]),
        ]
    }

    optional_by_version = {
        'v53': [
            ("Billing type", "'Metrics By Item Operation And Day'[Billing type]"),
            ("Overloaded minutes", "'Metrics By Item Operation And Day'[Overloaded minutes]"),
            ("Throttling minutes", "'Metrics By Item Operation And Day'[Throttling (min)]"),
        ],
        'v47': [
            ("Billing type", "'Metrics By Item Operation And Day'[Billing type]"),
            ("Overloaded minutes", "'Metrics By Item Operation And Day'[Overloaded minutes]"),
            ("Throttling minutes", "'Metrics By Item Operation And Day'[Throttling (min)]"),
        ],
        'v40': [
            ("Billing type", "'Metrics By Item Operation And Day'[Billing type]"),
            ("Overloaded minutes", "'Metrics By Item Operation And Day'[Overloaded minutes]"),
            ("Throttling minutes", "'Metrics By Item Operation And Day'[Throttling (min)]"),
        ],
        'v37': [
            ("Throttling minutes", "'MetricsByItemandOperationandDay'[Throttling (min)]"),
        ]
    }

    required_specs = required_by_version.get(ver, [])
    optional_specs = optional_by_version.get(ver, [])

    missing_required = []
    missing_optional = []

    for label, alternatives in required_specs:
        matched = False
        errors = []
        for col in alternatives:
            ok, err = _probe_column_exists(col)
            if ok:
                matched = True
                break
            if err:
                errors.append(err)

        if not matched:
            missing_required.append((label, alternatives, errors[0] if errors else "Unknown error"))

    for label, col in optional_specs:
        ok, err = _probe_column_exists(col)
        if not ok:
            missing_optional.append((label, col, err if err else "Unknown error"))

    if missing_optional:
        print("[WARNING] Optional columns not found (continuing):")
        for label, col, _ in missing_optional:
            print(f"  - {label}: {col}")

    if missing_required:
        msg_lines = [
            f"Schema validation failed for version {ver}.",
            "Missing REQUIRED columns:",
        ]
        for label, alternatives, err in missing_required:
            msg_lines.append(f"- {label}: expected one of {alternatives}")
            if DEBUG_MODE:
                msg_lines.append(f"  First error: {err[:240]}")

        raise Exception("\n".join(msg_lines))

    print(f"[SUCCESS] Schema validation passed for version {ver}")


# Validate connection and schema
elapsed = time.time() - start_time
if version is None:
    print("\n[ERROR] Could not connect to Capacity Metrics semantic model")
    print("\nTroubleshooting:")
    print("1. Verify XMLA endpoint is enabled (Capacity Settings > Power BI Workloads)")
    print("2. Ensure you have Build/Read permissions on the semantic model")
    print("3. Confirm the workspace is on a Fabric or Premium capacity")
    print("4. Check WORKSPACE_ID and DATASET_ID in the Configuration cell")
    raise Exception("Connection failed - see troubleshooting tips above")
else:
    print(f"[INFO] Connection established in {elapsed:.1f}s")
    _validate_schema_for_version(version)


---
## 5. Extract Capacities

Capacities are read from the Capacity Metrics app. Only **Active** capacities are included (Suspended are ignored). Set CAPACITY_ID for single capacity, or use multi to analyse all active.

In [None]:
# =============================================================================
# Extract Available Capacities
# =============================================================================
print("[INFO] Extracting capacity list...")
start_time = time.time()

# Capacities table schema sourced from FUAM (Fabric Unified Admin Monitoring)
# https://github.com/microsoft/fabric-toolbox/tree/main/monitoring/fabric-unified-admin-monitoring
# Columns: Capacity Id, Capacity name, SKU, State, Owners, Region, Source
capacity_query = """EVALUATE SELECTCOLUMNS(
    Capacities,
    "CapacityId", Capacities[Capacity Id],
    "CapacityName", Capacities[Capacity name],
    "SKU", Capacities[SKU],
    "State", Capacities[State]
)"""

try:
    df_capacities = fabric.evaluate_dax(
        workspace=WORKSPACE_ID,
        dataset=DATASET_ID,
        dax_string=capacity_query
    )
    elapsed = time.time() - start_time

    # Standardise column names
    df_capacities.columns = ['CapacityId', 'CapacityName', 'SKU', 'State']

    # Clean up names: fall back to first 8 chars of ID if name is empty
    df_capacities['CapacityName'] = df_capacities['CapacityName'].fillna('').astype(str).str.strip()
    df_capacities.loc[df_capacities['CapacityName'] == '', 'CapacityName'] = df_capacities['CapacityId'].str[:8]

    # Only Active capacities
    df_active = df_capacities[df_capacities['State'].str.strip().str.lower() == 'active'].copy()
    n_suspended = len(df_capacities) - len(df_active)
    if n_suspended > 0:
        print(f"[INFO] Ignoring {n_suspended} suspended capacit{'y' if n_suspended == 1 else 'ies'}")

    if len(df_active) == 0:
        raise ValueError("No Active capacities found in Capacity Metrics.")

    print(f"[SUCCESS] Found {len(df_active)} active capacit{'y' if len(df_active) == 1 else 'ies'} ({elapsed:.1f}s)")
    for idx, row in df_active.iterrows():
        print(f"  \u2713 {row['CapacityName']} ({row['SKU']}) - {row['CapacityId'][:8]}...")

    # Build lookups
    CAPACITY_NAMES = dict(zip(df_active['CapacityId'], df_active['CapacityName']))
    CAPACITY_SKUS  = dict(zip(df_active['CapacityId'], df_active['SKU']))

    # -- Build CAPACITY_ID_LIST based on ANALYSIS_MODE --
    _mode = ANALYSIS_MODE.strip().lower()

    if _mode == "single":
        if CAPACITY_ID not in df_active['CapacityId'].values:
            available = [(r['CapacityName'], r['SKU'], r['CapacityId']) for _, r in df_active.iterrows()]
            raise ValueError(
                f"CAPACITY_ID '{CAPACITY_ID}' is not found or not Active.\n"
                f"Available active capacities: {available}"
            )
        CAPACITY_ID_LIST = [CAPACITY_ID]
        print(f"[INFO] Single mode: {CAPACITY_NAMES[CAPACITY_ID]} ({CAPACITY_SKUS[CAPACITY_ID]})")

    else:  # multi
        if CAPACITY_IDS:
            df_filtered = df_active[df_active['CapacityId'].isin(CAPACITY_IDS)].copy()
            missing = set(CAPACITY_IDS) - set(df_filtered['CapacityId'].tolist())
            if missing:
                print(f"[WARNING] Not found or not Active: {missing}")
            if len(df_filtered) == 0:
                raise ValueError("None of the specified CAPACITY_IDS are Active.")
            CAPACITY_ID_LIST = df_filtered['CapacityId'].tolist()
        else:
            CAPACITY_ID_LIST = df_active['CapacityId'].tolist()
        print(f"[INFO] Multi mode: analysing {len(CAPACITY_ID_LIST)} capacities")

    print(f"[INFO] Capacities to analyse:")
    for cid in CAPACITY_ID_LIST:
        print(f"  - {CAPACITY_NAMES.get(cid, cid[:8])} ({CAPACITY_SKUS.get(cid, '?')})")

    if DEBUG_MODE:
        display(df_active[['CapacityName', 'SKU', 'CapacityId', 'State']])

except Exception as e:
    print(f"[ERROR] {e}")
    raise


---
## 6. Extract Timepoint Data (30-second granularity)

Timepoint data (30-second granularity) is used for consumption, throttling, and carryforward, including:
- **Utilisation %**: How much of your capacity is being used
- **Throttling %**: Operations being delayed or rejected
- **Carryforward %**: Accumulated "debt" from overages

This data is extracted day-by-day due to the volume of records (2,880 timepoints per day).

In [None]:
# =============================================================================
# Extract Timepoint-Level Metrics
# =============================================================================
# This is the core data extraction - 30-second granularity metrics including
# utilisation, throttling percentages, and carryforward data.
#
# We iterate day-by-day to manage data volume and avoid timeouts.
#
print("[INFO] Extracting timepoint-level metrics...")
print(f"[INFO] Analyzing {DAYS_TO_ANALYZE} days of data")

# Calculate date range
end_date = datetime.now()
start_date = end_date - timedelta(days=DAYS_TO_ANALYZE)

all_timepoints = []
extraction_start = time.time()

# Build the DAX query template based on version
def build_timepoint_query(capacity_id: str, year: int, month: int, day: int, ver: str) -> str:
    """
    Build DAX query for timepoint data based on semantic model version.
    
    This query extracts metrics for each 30-second timepoint:
    - Background/Interactive billable CU %
    - Throttling metrics (delay %, rejection %)
    - Carryforward metrics (add %, burndown %, cumulative %)
    - Recovery estimates (expected burndown in minutes)
    """
    
    # v53 uses Capacity Id (capital C); v47 uses capacity Id (lowercase c)
    cap_col = "Capacities[Capacity Id]" if ver == 'v53' else "Capacities[capacity Id]"
    if ver in ['v53', 'v47']:
        return f"""
        DEFINE
            MPARAMETER 'CapacitiesList' = {{ "{capacity_id}" }}
        
        VAR __Data = 
            SUMMARIZECOLUMNS(
                {cap_col},
                'TimePoints'[TimePoint],
                FILTER(Capacities, {cap_col} = "{capacity_id}"),
                FILTER(TimePoints, 'TimePoints'[Date] = DATE({year}, {month}, {day})),
                "BackgroundPct", 'All Measures'[Background billable CU %],
                "InteractivePct", 'All Measures'[Interactive billable CU %],
                "SKUPct", 'All Measures'[SKU CU by timepoint %],
                "CULimit", 'All Measures'[CU limit],
                "SKUCUByTimepoint", 'All Measures'[SKU CU by timepoint],
                "InteractiveDelayPct", 'All Measures'[Dynamic interactive delay %],
                "InteractiveRejectPct", 'All Measures'[Dynamic interactive rejection %],
                "InteractiveRejectThreshold", 'All Measures'[Interactive rejection threshold],
                "BackgroundRejectPct", 'All Measures'[Dynamic background rejection %],
                "BackgroundRejectThreshold", 'All Measures'[Background rejection threshold],
                "CarryoverAddPct", 'All Measures'[Carry over add %],
                "CarryoverBurndownPct", 'All Measures'[Carry over burndown %],
                "CarryoverCumulativePct", 'All Measures'[Cumulative carry over %],
                "OverageRefLine", 'All Measures'[Overage reference line],
                "ExpectedBurndownMin", 'All Measures'[Expected burndown in minutes]
            )
        EVALUATE __Data
        """
    
    elif ver == 'v40':
        return f"""
        DEFINE
            MPARAMETER 'CapacityID' = "{capacity_id}"
        
        VAR __Data = 
            SUMMARIZECOLUMNS(
                Capacities[capacity Id],
                'TimePoints'[TimePoint],
                FILTER(Capacities, Capacities[capacity Id] = "{capacity_id}"),
                FILTER(TimePoints, 'TimePoints'[Date] = DATE({year}, {month}, {day})),
                "BackgroundPct", 'All Measures'[Background billable CU %],
                "InteractivePct", 'All Measures'[Interactive billable CU %],
                "SKUPct", 'All Measures'[SKU CU by timepoint %],
                "CULimit", 'All Measures'[CU limit],
                "SKUCUByTimepoint", 'All Measures'[SKU CU by timepoint],
                "InteractiveDelayPct", 'All Measures'[Dynamic interactive delay %],
                "InteractiveRejectPct", 'All Measures'[Dynamic interactive rejection %],
                "InteractiveRejectThreshold", 'All Measures'[Interactive rejection threshold],
                "BackgroundRejectPct", 'All Measures'[Dynamic background rejection %],
                "BackgroundRejectThreshold", 'All Measures'[Background rejection threshold],
                "CarryoverAddPct", 'All Measures'[Carry over add %],
                "CarryoverBurndownPct", 'All Measures'[Carry over burndown %],
                "CarryoverCumulativePct", 'All Measures'[Cumulative carry over %],
                "OverageRefLine", 'All Measures'[Overage reference line],
                "ExpectedBurndownMin", 'All Measures'[Expected burndown in minutes]
            )
        EVALUATE __Data
        """
    
    else:  # v37
        return f"""
        DEFINE
            MPARAMETER 'CapacityID' = "{capacity_id}"
        
        VAR __Data = 
            SUMMARIZECOLUMNS(
                Capacities[capacityId],
                'TimePoints'[TimePoint],
                FILTER(Capacities, Capacities[capacityId] = "{capacity_id}"),
                FILTER(TimePoints, 'TimePoints'[Date] = DATE({year}, {month}, {day})),
                "BackgroundPct", 'All Measures'[xBackground %],
                "InteractivePct", 'All Measures'[xInteractive %],
                "SKUPct", 'All Measures'[SKU CU by TimePoint %],
                "CULimit", 'All Measures'[CU Limit],
                "SKUCUByTimepoint", 'All Measures'[SKU CU by TimePoint],
                "InteractiveDelayPct", 'All Measures'[Dynamic InteractiveDelay %],
                "InteractiveRejectPct", 'All Measures'[Dynamic InteractiveRejection %],
                "InteractiveRejectThreshold", 'All Measures'[Interactive rejection threshold],
                "BackgroundRejectPct", 'All Measures'[Dynamic BackgroundRejection %],
                "BackgroundRejectThreshold", 'All Measures'[Background rejection threshold],
                "CarryoverAddPct", 'All Measures'[xCarryOver_added %],
                "CarryoverBurndownPct", 'All Measures'[xCarryOver_burndown %],
                "CarryoverCumulativePct", 'All Measures'[xCarryOver_Cumulative %],
                "OverageRefLine", 'All Measures'[Overage reference line],
                "ExpectedBurndownMin", 'All Measures'[Expected burndown in minutes]
            )
        EVALUATE __Data
        """


# Iterate through each capacity and day
for cap_idx, cap_row in df_capacities.iterrows():
    capacity_id = cap_row['CapacityId']
    print(f"\n[INFO] Processing capacity: {capacity_id}")
    
    # Generate date list
    current_date = start_date
    while current_date <= end_date:
        year = current_date.year
        month = current_date.month
        day = current_date.day
        date_label = current_date.strftime('%Y-%m-%d')
        
        try:
            query = build_timepoint_query(capacity_id, year, month, day, version)
            df_day = fabric.evaluate_dax(
                workspace=WORKSPACE_ID,
                dataset=DATASET_ID,
                dax_string=query
            )
            
            if len(df_day) > 0:
                all_timepoints.append(df_day)
                if DEBUG_MODE:
                    print(f"  {date_label}: {len(df_day)} timepoints")
            else:
                    pass
        except Exception as e:
            print(f"  [WARNING] {date_label}: Query failed - {str(e)[:50]}")
        
        current_date += timedelta(days=1)

# Combine all data
if all_timepoints:
    df_timepoints = pd.concat(all_timepoints, ignore_index=True)
    elapsed = time.time() - extraction_start
    print(f"\n[SUCCESS] Extracted {len(df_timepoints):,} timepoint records ({elapsed:.1f}s)")
    
    if DEBUG_MODE:
        display(df_timepoints.head(3))
else:
    print("\n[ERROR] No timepoint data extracted")
    df_timepoints = pd.DataFrame()

---
## 7. Extract Item/Operation Data

Item-level data shows which workloads (reports, dataflows, notebooks, etc.) are consuming capacity:
- Top consumers that might need optimization
- Failed/rejected operations indicating problems
- Throttling at the item level

**No item-level rows?** Ensure WORKSPACE_ID and DATASET_ID point to the **Capacity Metrics app** (not FUAM's semantic model). See **Context/ITEM-LEVEL-DATA-AND-FUAM.md** for troubleshooting.

In [None]:
# =============================================================================
# Extract Item/Operation Level Metrics
# =============================================================================
# Extracts consumption data per item per operation per day,
# including workspace name, item name, billing type, and throttling minutes.
#
import re
print("[INFO] Extracting item/operation level metrics...")

# Optional measures for v40/v47: (output_name, column_name_in_model, dax_expr).
# If a column is missing (MS schema change), we omit it (use 0) and retry.
ITEM_OPTIONAL_MEASURES_V47 = [
    ("ThrottlingMin", "Throttling (min)", "SUM('{table_name}'[Throttling (min)])"),
    ("OverloadedMin", "Overloaded minutes", "SUM('{table_name}'[Overloaded minutes])"),
    ("UserCount", "Users", "SUM('{table_name}'[Users])"),
    ("SuccessCount", "Successful operations", "SUM('{table_name}'[Successful operations])"),
    ("RejectCount", "Rejected operations", "SUM('{table_name}'[Rejected operations])"),
    ("TotalOps", "Operations", "SUM('{table_name}'[Operations])"),
    ("InvalidCount", "Invalid operations", "SUM('{table_name}'[Invalid operations])"),
    ("FailedCount", "Failed operations", "SUM('{table_name}'[Failed operations])"),
    ("CancelledCount", "Cancelled operations", "SUM('{table_name}'[Cancelled operations])"),
]

def _parse_missing_column_error(exc):
    """Extract column name from 'Column ... cannot be found' DAX error."""
    msg = str(exc)
    if "cannot be found" not in msg and "may not be used" not in msg:
        return None
    m = re.search(r"Column\s+['\"]?(?:<[^>]+>)?([^'\"<>]+)(?:</[^>]+>)?['\"]?", msg)
    return m.group(1).strip() if m else None

def _build_item_measures_v47(skip_columns, table_name):
    """Build measure list for v47/v40 item query; use 0 for columns in skip_columns (schema drift)."""
    lines = [
        ("ThrottlingMin", "Throttling (min)", f"SUM('{table_name}'[Throttling (min)])"),
        ("OverloadedMin", "Overloaded minutes", f"SUM('{table_name}'[Overloaded minutes])"),
        ("UserCount", "Users", f"SUM('{table_name}'[Users])"),
        ("SuccessCount", "Successful operations", f"SUM('{table_name}'[Successful operations])"),
        ("RejectCount", "Rejected operations", f"SUM('{table_name}'[Rejected operations])"),
        ("TotalOps", "Operations", f"SUM('{table_name}'[Operations])"),
        ("InvalidCount", "Invalid operations", f"SUM('{table_name}'[Invalid operations])"),
        ("FailedCount", "Failed operations", f"SUM('{table_name}'[Failed operations])"),
        ("CancelledCount", "Cancelled operations", f"SUM('{table_name}'[Cancelled operations])"),
    ]
    parts = [f'"{out_name}", {expr}' if col_name not in skip_columns else f'"{out_name}", 0' for out_name, col_name, expr in lines]
    return ",\n                ".join(parts)

def build_item_query(capacity_id: str, year: int, month: int, day: int, ver: str, skip_columns=None) -> str:
    """Build DAX query for item-level metrics. skip_columns = set of column names to omit (use 0) when model schema differs."""
    skip_columns = skip_columns or set()

    if ver in ['v53', 'v47', 'v40']:
        param_name = "CapacitiesList" if ver in ['v53', 'v47'] else "CapacityID"
        param_format = f'{{ "{capacity_id}" }}' if ver in ['v53', 'v47'] else f'"{capacity_id}"'
        # v53 uses Capacity Id (capital C); v47/v40 use capacity Id (lowercase c)
        cap_col = "Capacities[Capacity Id]" if ver == 'v53' else "Capacities[capacity Id]"
        table_name = "Metrics By Item Operation And Day"
        measures_str = _build_item_measures_v47(skip_columns, table_name)

        return f"""
        DEFINE
            MPARAMETER '{param_name}' = {param_format}

        VAR __Filter =
            FILTER(
                KEEPFILTERS(VALUES('{table_name}'[Date])),
                '{table_name}'[Date] = DATE({year}, {month}, {day})
            )

        VAR __Data =
            SUMMARIZECOLUMNS(
                {cap_col},
                Items[Workspace Id],
                'Items'[Workspace Name],
                '{table_name}'[Date],
                '{table_name}'[Item Id],
                'Items'[Item Name],
                'Items'[Item Kind],
                '{table_name}'[Operation name],
                '{table_name}'[Billing type],
                FILTER(Capacities, {cap_col} = "{capacity_id}"),
                __Filter,
                "DurationSec", SUM('{table_name}'[Duration (s)]),
                "TotalCUs", SUM('{table_name}'[CU (s)]),
                {measures_str}
            )

        EVALUATE
            FILTER(__Data, [TotalCUs] > 0)
        ORDER BY [TotalCUs] DESC
        """

    else:  # v37
        return f"""
        DEFINE
            MPARAMETER 'CapacityID' = "{capacity_id}"

        VAR __Filter =
            FILTER(
                KEEPFILTERS(VALUES('MetricsByItemandOperationandDay'[Date])),
                'MetricsByItemandOperationandDay'[Date] = DATE({year}, {month}, {day})
            )

        VAR __Data =
            SUMMARIZECOLUMNS(
                Capacities[capacityId],
                Items[WorkspaceId],
                'Items'[Workspace Name],
                'MetricsByItemandOperationandDay'[Date],
                'MetricsByItemandOperationandDay'[ItemId],
                'Items'[Item Name],
                'Items'[ItemKind],
                'MetricsByItemandOperationandDay'[OperationName],
                FILTER(Capacities, Capacities[capacityId] = "{capacity_id}"),
                __Filter,
                "DurationSec", SUM('MetricsByItemandOperationandDay'[sum_duration]),
                "TotalCUs", SUM('MetricsByItemandOperationandDay'[sum_CU]),
                "ThrottlingMin", SUM('MetricsByItemandOperationandDay'[Throttling (min)]),
                "UserCount", SUM('MetricsByItemandOperationandDay'[count_users]),
                "SuccessCount", SUM('MetricsByItemandOperationandDay'[count_successful_operations]),
                "RejectCount", SUM('MetricsByItemandOperationandDay'[count_rejected_operations]),
                "TotalOps", SUM('MetricsByItemandOperationandDay'[count_operations]),
                "InvalidCount", SUM('MetricsByItemandOperationandDay'[count_Invalid_operations]),
                "FailedCount", SUM('MetricsByItemandOperationandDay'[count_failure_operations]),
                "CancelledCount", SUM('MetricsByItemandOperationandDay'[count_cancelled_operations])
            )

        EVALUATE
            FILTER(__Data, [TotalCUs] > 0)
        ORDER BY [TotalCUs] DESC
        """


def build_item_query_range(capacity_id: str, start_dt, end_dt, ver: str, skip_columns=None, include_billing=True, cap_col_override=None) -> str:
    """Build a range-based item query (more robust than day-by-day for schema drift/data sparsity)."""
    skip_columns = skip_columns or set()

    sy, sm, sd = start_dt.year, start_dt.month, start_dt.day
    ey, em, ed = end_dt.year, end_dt.month, end_dt.day

    if ver in ['v53', 'v47', 'v40']:
        param_name = "CapacitiesList" if ver in ['v53', 'v47'] else "CapacityID"
        param_format = f'{{ "{capacity_id}" }}' if ver in ['v53', 'v47'] else f'"{capacity_id}"'
        default_cap_col = "Capacities[Capacity Id]" if ver == 'v53' else "Capacities[capacity Id]"
        cap_col = cap_col_override or default_cap_col
        table_name = "Metrics By Item Operation And Day"
        measures_str = _build_item_measures_v47(skip_columns, table_name)
        billing_line = f"'{table_name}'[Billing type]," if include_billing else ""

        return f"""
        DEFINE
            MPARAMETER '{param_name}' = {param_format}

        VAR __Filter =
            FILTER(
                KEEPFILTERS(VALUES('{table_name}'[Date])),
                '{table_name}'[Date] >= DATE({sy}, {sm}, {sd})
                    && '{table_name}'[Date] <= DATE({ey}, {em}, {ed})
            )

        VAR __Data =
            SUMMARIZECOLUMNS(
                {cap_col},
                Items[Workspace Id],
                'Items'[Workspace Name],
                '{table_name}'[Date],
                '{table_name}'[Item Id],
                'Items'[Item Name],
                'Items'[Item Kind],
                '{table_name}'[Operation name],
                {billing_line}
                FILTER(Capacities, {cap_col} = "{capacity_id}"),
                __Filter,
                "DurationSec", SUM('{table_name}'[Duration (s)]),
                "TotalCUs", SUM('{table_name}'[CU (s)]),
                {measures_str}
            )

        EVALUATE
            FILTER(__Data, [TotalCUs] > 0)
        ORDER BY [TotalCUs] DESC
        """

    # v37
    return f"""
    DEFINE
        MPARAMETER 'CapacityID' = "{capacity_id}"

    VAR __Filter =
        FILTER(
            KEEPFILTERS(VALUES('MetricsByItemandOperationandDay'[Date])),
            'MetricsByItemandOperationandDay'[Date] >= DATE({sy}, {sm}, {sd})
                && 'MetricsByItemandOperationandDay'[Date] <= DATE({ey}, {em}, {ed})
        )

    VAR __Data =
        SUMMARIZECOLUMNS(
            Capacities[capacityId],
            Items[WorkspaceId],
            'Items'[Workspace Name],
            'MetricsByItemandOperationandDay'[Date],
            'MetricsByItemandOperationandDay'[ItemId],
            'Items'[Item Name],
            'Items'[ItemKind],
            'MetricsByItemandOperationandDay'[OperationName],
            FILTER(Capacities, Capacities[capacityId] = "{capacity_id}"),
            __Filter,
            "DurationSec", SUM('MetricsByItemandOperationandDay'[sum_duration]),
            "TotalCUs", SUM('MetricsByItemandOperationandDay'[sum_CU]),
            "ThrottlingMin", SUM('MetricsByItemandOperationandDay'[Throttling (min)]),
            "UserCount", SUM('MetricsByItemandOperationandDay'[count_users]),
            "SuccessCount", SUM('MetricsByItemandOperationandDay'[count_successful_operations]),
            "RejectCount", SUM('MetricsByItemandOperationandDay'[count_rejected_operations]),
            "TotalOps", SUM('MetricsByItemandOperationandDay'[count_operations]),
            "InvalidCount", SUM('MetricsByItemandOperationandDay'[count_Invalid_operations]),
            "FailedCount", SUM('MetricsByItemandOperationandDay'[count_failure_operations]),
            "CancelledCount", SUM('MetricsByItemandOperationandDay'[count_cancelled_operations])
        )

    EVALUATE
        FILTER(__Data, [TotalCUs] > 0)
    ORDER BY [TotalCUs] DESC
    """


all_items = []
item_start = time.time()
item_error_logged = False
item_skip_columns = set()
optional_item_columns = {c for _, c, _ in ITEM_OPTIONAL_MEASURES_V47}

for _, cap_row in df_capacities.iterrows():
    capacity_id = cap_row['CapacityId']
    include_billing = True
    extracted_for_capacity = False

    if version in ['v53', 'v47', 'v40']:
        cap_col_candidates = [
            "Capacities[Capacity Id]",
            "Capacities[capacity Id]",
        ]
        if version in ['v47', 'v40']:
            cap_col_candidates = ["Capacities[capacity Id]", "Capacities[Capacity Id]"]

        for cap_col_try in cap_col_candidates:
            done = False
            max_retries = 15
            while not done and max_retries > 0:
                try:
                    query = build_item_query_range(
                        capacity_id=capacity_id,
                        start_dt=start_date,
                        end_dt=end_date,
                        ver=version,
                        skip_columns=item_skip_columns,
                        include_billing=include_billing,
                        cap_col_override=cap_col_try,
                    )
                    df_items_cap = fabric.evaluate_dax(
                        workspace=WORKSPACE_ID,
                        dataset=DATASET_ID,
                        dax_string=query
                    )

                    if len(df_items_cap) > 0:
                        all_items.append(df_items_cap)
                        extracted_for_capacity = True
                        print(f"  [INFO] Item rows for {capacity_id[:8]} using {cap_col_try}: {len(df_items_cap):,}")
                    done = True

                except Exception as e:
                    missing_col = _parse_missing_column_error(e)

                    if missing_col == "Billing type":
                        include_billing = False
                        print("  [INFO] Omitting column not in model (schema drift): 'Billing type'")
                        max_retries -= 1
                        continue

                    if missing_col in optional_item_columns:
                        if missing_col not in item_skip_columns:
                            print(f"  [INFO] Omitting column not in model (schema drift): '{missing_col}'")
                        item_skip_columns.add(missing_col)
                        max_retries -= 1
                        continue

                    if not item_error_logged:
                        print(f"  [WARNING] Item query failed (first occurrence): {e}")
                        item_error_logged = True
                    if DEBUG_MODE:
                        print(f"  [DEBUG] Item query failed for capacity {capacity_id} using {cap_col_try}")
                    done = True
                    max_retries = 0

            if extracted_for_capacity:
                break

    else:
        try:
            query = build_item_query_range(
                capacity_id=capacity_id,
                start_dt=start_date,
                end_dt=end_date,
                ver=version,
                skip_columns=item_skip_columns,
                include_billing=False,
                cap_col_override=None,
            )
            df_items_cap = fabric.evaluate_dax(
                workspace=WORKSPACE_ID,
                dataset=DATASET_ID,
                dax_string=query
            )
            if len(df_items_cap) > 0:
                all_items.append(df_items_cap)
                extracted_for_capacity = True
                print(f"  [INFO] Item rows for {capacity_id[:8]}: {len(df_items_cap):,}")
        except Exception as e:
            if not item_error_logged:
                print(f"  [WARNING] Item query failed (first occurrence): {e}")
                item_error_logged = True

    if not extracted_for_capacity:
        print(f"  [WARNING] No item rows for capacity {capacity_id[:8]} in selected date range")

# Combine item data
if all_items:
    df_items = pd.concat(all_items, ignore_index=True)
    elapsed = time.time() - item_start
    print(f"[SUCCESS] Extracted {len(df_items):,} item/operation records ({elapsed:.1f}s)")

    # Standardize column names
    col_map = {col: col.replace('[', '').replace(']', '').strip() for col in df_items.columns}
    df_items = df_items.rename(columns=col_map)

    if DEBUG_MODE:
        # Show column names for verification
        print(f"[DEBUG] Columns: {list(df_items.columns)}")
        display(df_items.head(3))
else:
    print("[WARNING] No item-level data extracted")
    df_items = pd.DataFrame()


---
## 8. Process & Analyze Data

This section aggregates the extracted data and calculates:
- Daily aggregates for trending
- Throttling summaries
- Carryforward analysis
- Item-level rankings

---

## Capacity Analysis Pipeline

This section defines reusable functions and runs the full analysis pipeline **once per active capacity**.

In **single** mode it processes one capacity with full inline output. In **multi** mode it loops through all active capacities with compact progress output.

Each capacity gets:

- Timepoint processing and daily summary
- Key metric calculations and health score
- SKU recommendation (weekday-filtered, spike-aware)
- Charts: health gauge, utilisation, throttling, carryforward, SKU comparison, top workloads
- Individual HTML report saved to Lakehouse (if enabled)

Results are collected in `all_capacity_results` for cross-capacity comparison.


In [None]:
# =============================================================================
# Analysis Functions: Processing, Metrics, SKU Recommendation
# =============================================================================

def process_timepoints_for_capacity(df_timepoints_all, capacity_id):
    """
    Filter and process timepoint data for a single capacity.
    Returns (daily_summary DataFrame, current_sku dict or None).
    """
    if len(df_timepoints_all) == 0:
        return pd.DataFrame(), None

    # Find capacity ID column
    cap_col_candidates = [c for c in df_timepoints_all.columns if 'capacity' in c.lower() and 'id' in c.lower()]
    cap_col = cap_col_candidates[0] if cap_col_candidates else df_timepoints_all.columns[0]
    df_tp = df_timepoints_all[df_timepoints_all[cap_col].astype(str).str.strip() == capacity_id].copy()

    if len(df_tp) == 0:
        return pd.DataFrame(), None

    # Standardize column names (DAX may return with brackets)
    col_map = {col: col.replace('[', '').replace(']', '').strip() for col in df_tp.columns}
    df_tp = df_tp.rename(columns=col_map)

    # Find TimePoint column
    tp_candidates = [c for c in df_tp.columns if 'TimePoint' in c or ('Time' in c and 'Point' in c)]
    tp_col = tp_candidates[0] if tp_candidates else (df_tp.columns[1] if len(df_tp.columns) >= 2 else None)
    if tp_col is None:
        raise ValueError(f"Could not find TimePoint column. Columns: {list(df_tp.columns)}")

    df_tp['TimePoint'] = pd.to_datetime(df_tp[tp_col])
    df_tp['Date'] = df_tp['TimePoint'].dt.date

    # Scale percentage columns from 0-1 to 0-100
    pct_cols = [c for c in df_tp.columns if 'Pct' in c or '%' in c]
    for col in pct_cols:
        df_tp[col] = pd.to_numeric(df_tp[col], errors='coerce').fillna(0) * 100

    # Detect current SKU from the CU budget per 30-sec window
    current_sku = None
    sku_budget_30s = None
    if 'SKUCUByTimepoint' in df_tp.columns:
        _sku_vals = pd.to_numeric(df_tp['SKUCUByTimepoint'], errors='coerce').dropna()
        if len(_sku_vals) > 0:
            sku_budget_30s = _sku_vals.mode().iloc[0]
            current_sku = next((s for s in SKUS if s['budget_30s'] == sku_budget_30s), None)

    # Calculate actual CUs per timepoint from utilisation % and SKU budget
    # This is more reliable than the cumulative measure which is a running total
    if sku_budget_30s and sku_budget_30s > 0:
        df_tp['ActualCUs'] = (df_tp['BackgroundPct'] + df_tp['InteractivePct']) / 100.0 * sku_budget_30s
    elif 'SKUCUByTimepoint' in df_tp.columns:
        _sku_col = pd.to_numeric(df_tp['SKUCUByTimepoint'], errors='coerce').fillna(0)
        df_tp['ActualCUs'] = (df_tp['BackgroundPct'] + df_tp['InteractivePct']) / 100.0 * _sku_col
    else:
        df_tp['ActualCUs'] = 0
        print("  [WARNING] Cannot calculate CUs: no SKU budget data available")

    # Daily summaries
    daily_summary = df_tp.groupby('Date').agg({
        'BackgroundPct': ['mean', 'max'],
        'InteractivePct': ['mean', 'max'],
        'InteractiveDelayPct': ['mean', 'max'],
        'InteractiveRejectPct': ['mean', 'max'],
        'BackgroundRejectPct': ['mean', 'max'],
        'CarryoverCumulativePct': ['mean', 'max'],
        'ExpectedBurndownMin': ['mean', 'max'],
        'ActualCUs': ['sum', 'max'],  # sum = daily total, max = peak window
    }).reset_index()

    daily_summary.columns = ['_'.join(col).strip('_') if col[1] else col[0]
                             for col in daily_summary.columns.values]
    daily_summary['TotalUtilPct_mean'] = daily_summary['BackgroundPct_mean'] + daily_summary['InteractivePct_mean']
    daily_summary['TotalUtilPct_max'] = daily_summary['BackgroundPct_max'] + daily_summary['InteractivePct_max']

    # Add day-of-week for weekday/weekend analysis
    daily_summary['DayOfWeek'] = pd.to_datetime(daily_summary['Date']).dt.dayofweek  # 0=Mon, 6=Sun
    daily_summary['IsWeekday'] = daily_summary['DayOfWeek'] < 5

    # Detect pause/resume spikes
    daily_summary['IsSuspectedSpike'] = detect_pause_spikes(daily_summary)

    return daily_summary, current_sku


def calculate_capacity_metrics(daily_summary):
    """Calculate all key metrics from daily summary. Returns a dict."""
    if len(daily_summary) == 0:
        return None

    m = {}
    m['days_analyzed'] = len(daily_summary)

    # -------------------------------------------------------------------
    # Core metrics (all days)
    # -------------------------------------------------------------------
    m['avg_daily_cus'] = daily_summary['ActualCUs_sum'].mean()
    m['max_daily_cus'] = daily_summary['ActualCUs_sum'].max()
    m['p80_daily_cus'] = daily_summary['ActualCUs_sum'].quantile(0.8)

    # Utilisation
    m['avg_util'] = daily_summary['TotalUtilPct_mean'].mean()
    m['max_util'] = daily_summary['TotalUtilPct_max'].max()
    m['avg_interactive'] = daily_summary['InteractivePct_mean'].mean()
    m['avg_background'] = daily_summary['BackgroundPct_mean'].mean()

    # Throttling
    m['avg_delay_pct'] = daily_summary['InteractiveDelayPct_mean'].mean()
    m['max_delay_pct'] = daily_summary['InteractiveDelayPct_max'].max()
    m['avg_int_reject_pct'] = daily_summary['InteractiveRejectPct_mean'].mean()
    m['max_int_reject_pct'] = daily_summary['InteractiveRejectPct_max'].max()
    m['avg_bg_reject_pct'] = daily_summary['BackgroundRejectPct_mean'].mean()
    m['max_bg_reject_pct'] = daily_summary['BackgroundRejectPct_max'].max()

    # Carryforward
    m['avg_carryover_pct'] = daily_summary['CarryoverCumulativePct_mean'].mean()
    m['max_carryover_pct'] = daily_summary['CarryoverCumulativePct_max'].max()
    m['avg_burndown_min'] = daily_summary['ExpectedBurndownMin_mean'].mean()
    m['max_burndown_min'] = daily_summary['ExpectedBurndownMin_max'].max()

    # Days with issues
    m['days_with_delay'] = int((daily_summary['InteractiveDelayPct_max'] > 0).sum())
    m['days_with_rejection'] = int((daily_summary['InteractiveRejectPct_max'] > 0).sum())
    m['days_with_carryover'] = int((daily_summary['CarryoverCumulativePct_max'] > 0).sum())

    # Health score
    m['health_score'], m['health_rating'] = calculate_health_score(
        m['avg_util'],
        max(m['avg_delay_pct'], m['avg_int_reject_pct']),
        m['avg_carryover_pct']
    )

    # -------------------------------------------------------------------
    # Weekday vs Weekend split
    # -------------------------------------------------------------------
    weekdays = daily_summary[daily_summary['IsWeekday']]
    weekends = daily_summary[~daily_summary['IsWeekday']]

    if len(weekdays) > 0:
        m['weekday_avg_cus'] = weekdays['ActualCUs_sum'].mean()
        m['weekday_p80_cus'] = weekdays['ActualCUs_sum'].quantile(0.8)
        m['weekday_avg_util'] = weekdays['TotalUtilPct_mean'].mean()
        m['weekday_count'] = len(weekdays)
    else:
        m['weekday_avg_cus'] = m['avg_daily_cus']
        m['weekday_p80_cus'] = m['p80_daily_cus']
        m['weekday_avg_util'] = m['avg_util']
        m['weekday_count'] = 0

    if len(weekends) > 0:
        m['weekend_avg_cus'] = weekends['ActualCUs_sum'].mean()
        m['weekend_p80_cus'] = weekends['ActualCUs_sum'].quantile(0.8)
        m['weekend_avg_util'] = weekends['TotalUtilPct_mean'].mean()
        m['weekend_count'] = len(weekends)
    else:
        m['weekend_avg_cus'] = 0
        m['weekend_p80_cus'] = 0
        m['weekend_avg_util'] = 0
        m['weekend_count'] = 0

    # Weekday/weekend ratio
    if m['weekend_avg_cus'] > 0:
        m['weekday_weekend_ratio'] = round(m['weekday_avg_cus'] / m['weekend_avg_cus'], 1)
    else:
        m['weekday_weekend_ratio'] = float('inf')

    # -------------------------------------------------------------------
    # Spike-filtered P80 (excludes suspected pause/resume spikes)
    # -------------------------------------------------------------------
    non_spike = daily_summary[~daily_summary['IsSuspectedSpike']]
    m['spike_days_detected'] = int(daily_summary['IsSuspectedSpike'].sum())
    if len(non_spike) >= 3:
        m['p80_daily_cus_filtered'] = non_spike['ActualCUs_sum'].quantile(0.8)
        m['weekday_p80_cus_filtered'] = non_spike[non_spike['IsWeekday']]['ActualCUs_sum'].quantile(0.8) if len(non_spike[non_spike['IsWeekday']]) > 0 else m['weekday_p80_cus']
    else:
        m['p80_daily_cus_filtered'] = m['p80_daily_cus']
        m['weekday_p80_cus_filtered'] = m['weekday_p80_cus']

    # -------------------------------------------------------------------
    # Trend analysis
    # -------------------------------------------------------------------
    m['trend'] = calculate_trend(daily_summary, 'ActualCUs_sum')

    return m


def recommend_sku_for_capacity(metrics, skus, needs_free_viewers):
    """
    Recommend SKU using the 80/80 approach:
    find the smallest SKU where P80 daily consumption fits at 80% utilisation.

    When weekday/weekend split is enabled and SPIKE_FILTERING is on,
    uses the weekday P80 (filtered) for sizing, as weekdays typically drive
    the highest sustained load.
    """
    # Use weekday P80 (spike-filtered) if weekday split is enabled
    if WEEKDAY_WEEKEND_SPLIT and SPIKE_FILTERING:
        p80_cus = metrics.get('weekday_p80_cus_filtered', metrics['p80_daily_cus'])
    elif WEEKDAY_WEEKEND_SPLIT:
        p80_cus = metrics.get('weekday_p80_cus', metrics['p80_daily_cus'])
    elif SPIKE_FILTERING:
        p80_cus = metrics.get('p80_daily_cus_filtered', metrics['p80_daily_cus'])
    else:
        p80_cus = metrics['p80_daily_cus']

    required_budget = calculate_required_budget(p80_cus, TARGET_UTILISATION)

    sku_analysis = []
    recommended_sku = None

    for sku in skus:
        avg_util_sku = calculate_utilisation(metrics['avg_daily_cus'], sku)
        max_util_sku = calculate_utilisation(metrics['max_daily_cus'], sku)
        p80_util_sku = calculate_utilisation(p80_cus, sku)
        status = get_sku_status(avg_util_sku, max_util_sku, needs_free_viewers, sku)

        # Reserved vs PAYG
        payg = sku.get("monthly_usd", 0)
        reserved = sku.get("monthly_reserved_usd", 0)
        savings = payg - reserved

        sku_analysis.append({
            "SKU": sku["name"],
            "CUs/sec": sku["cus_per_second"],
            "Daily Budget": sku["budget_30s"] * 2880,
            "Avg Util %": round(avg_util_sku * 100, 1),
            "Peak Util %": round(max_util_sku * 100, 1),
            "P80 Util %": round(p80_util_sku * 100, 1),
            "Status": status,
            "PAYG $/mo": payg,
            "Reserved $/mo": reserved,
            "Savings $/mo": savings,
        })

        # Recommend: smallest SKU whose budget meets the P80-at-80% requirement
        if recommended_sku is None and sku["budget_30s"] >= required_budget:
            if not (needs_free_viewers and sku["cus_per_second"] < 64):
                recommended_sku = sku

    # Fallback: smallest that doesn't throttle on peak
    if recommended_sku is None:
        for sku in skus:
            if calculate_utilisation(metrics['max_daily_cus'], sku) <= 1.0:
                if not (needs_free_viewers and sku["cus_per_second"] < 64):
                    recommended_sku = sku
                    break

    if recommended_sku is None:
        recommended_sku = skus[-1]

    return sku_analysis, recommended_sku


def filter_items_for_capacity(df_items_all, capacity_id):
    """Filter item-level data for a single capacity."""
    if len(df_items_all) == 0:
        return pd.DataFrame()
    cap_col_candidates = [c for c in df_items_all.columns if 'capacity' in c.lower() and 'id' in c.lower()]
    if not cap_col_candidates:
        return df_items_all
    cap_col = cap_col_candidates[0]
    return df_items_all[df_items_all[cap_col].astype(str).str.strip() == capacity_id].copy()


print("[INFO] Analysis functions loaded")


In [None]:
# =============================================================================
# Analysis Functions: Chart Generation
# =============================================================================
from plotly.subplots import make_subplots

def create_capacity_charts(daily_summary, metrics, sku_analysis, recommended_sku, current_sku, df_items_cap):
    """Create all Plotly charts for a capacity. Returns dict of figure objects."""
    charts = {}

    def _apply_chart_spacing(fig):
        """Apply consistent spacing so axis/label text remains visible across charts."""
        fig.update_layout(
            margin=dict(l=90, r=90, t=90, b=90),
            uniformtext_minsize=9,
            uniformtext_mode="hide",
        )
        fig.update_xaxes(automargin=True, title_standoff=16)
        fig.update_yaxes(automargin=True, title_standoff=16)
        fig.update_traces(cliponaxis=False, selector=dict(type="bar"))
        return fig

    # 1. Health Gauge
    fig_health = go.Figure(go.Indicator(
        mode="gauge+number+delta",
        value=metrics['health_score'],
        domain={'x': [0, 1], 'y': [0, 1]},
        title={'text': f"Capacity Health Score<br><span style='font-size:0.6em;color:gray'>{metrics['health_rating']}</span>"},
        gauge={
            'axis': {'range': [0, 100], 'tickwidth': 1},
            'bar': {'color': "darkblue"},
            'steps': [
                {'range': [0, 25], 'color': '#dc3545'},
                {'range': [25, 50], 'color': '#fd7e14'},
                {'range': [50, 75], 'color': '#ffc107'},
                {'range': [75, 90], 'color': '#28a745'},
                {'range': [90, 100], 'color': '#20c997'}
            ],
            'threshold': {'line': {'color': "black", 'width': 4}, 'thickness': 0.75, 'value': metrics['health_score']}
        }
    ))
    fig_health.update_layout(height=300, margin=dict(l=20, r=20, t=60, b=20), font={'size': 14})
    charts['fig_health'] = _apply_chart_spacing(fig_health)

    # 2. Utilisation Gauges (Average & Peak on recommended SKU)
    title_sku = recommended_sku["name"]
    avg_util_pct = calculate_utilisation(metrics['avg_daily_cus'], recommended_sku) * 100
    peak_util_pct = calculate_utilisation(metrics['max_daily_cus'], recommended_sku) * 100

    steps = [
        {"range": [0, 40], "color": "#EBF2F9"},
        {"range": [40, 80], "color": "#D4EDDA"},
        {"range": [80, 95], "color": "#FFF3CD"},
        {"range": [95, 100], "color": "#F8D7DA"},
    ]
    fig_util = make_subplots(rows=1, cols=2, specs=[[{"type": "indicator"}, {"type": "indicator"}]], horizontal_spacing=0.12)
    fig_util.add_trace(go.Indicator(
        mode="gauge+number", value=round(avg_util_pct, 1),
        title={"text": "Average", "font": {"size": 16, "color": "#333"}},
        number={"suffix": "%", "valueformat": ".1f", "font": {"size": 28, "color": "#2D65BC"}},
        gauge={"axis": {"range": [0, 100], "ticksuffix": "%"}, "bar": {"color": "#6A8DDC"}, "bgcolor": "white", "steps": steps,
               "threshold": {"line": {"color": "#28A745", "width": 3}, "thickness": 0.8, "value": 80}},
    ), row=1, col=1)
    fig_util.add_trace(go.Indicator(
        mode="gauge+number", value=round(peak_util_pct, 1),
        title={"text": "Peak", "font": {"size": 16, "color": "#333"}},
        number={"suffix": "%", "valueformat": ".1f", "font": {"size": 28, "color": "#2D65BC"}},
        gauge={"axis": {"range": [0, 100], "ticksuffix": "%"}, "bar": {"color": "#F59E0B"}, "bgcolor": "white", "steps": steps,
               "threshold": {"line": {"color": "#DC3545", "width": 3}, "thickness": 0.8, "value": 95}},
    ), row=1, col=2)
    fig_util.update_layout(title=None, height=280, margin=dict(l=40, r=40, t=70, b=20), paper_bgcolor="white",
        annotations=[dict(text=f"Utilisation on {title_sku}", x=0.5, y=1.02, xref="paper", yref="paper", showarrow=False, font=dict(size=18), xanchor="center")])
    charts['fig_util_gauges'] = _apply_chart_spacing(fig_util)

    # 3. Daily Utilisation (with weekday/weekend colouring and trend line)
    if len(daily_summary) > 0:
        fig_daily = go.Figure()
        dates_str = pd.to_datetime(daily_summary['Date']).dt.strftime('%d %b')
        day_names = pd.to_datetime(daily_summary['Date']).dt.strftime('%a')

        # Colour weekends differently
        bg_colors = ['rgba(40, 167, 69, 0.5)' if wd else 'rgba(40, 167, 69, 0.25)' for wd in daily_summary['IsWeekday']]
        int_colors = ['rgba(102, 126, 234, 0.8)' if wd else 'rgba(102, 126, 234, 0.4)' for wd in daily_summary['IsWeekday']]

        # Mark spike days
        spike_mask = daily_summary['IsSuspectedSpike']

        fig_daily.add_trace(go.Bar(
            x=dates_str, y=daily_summary['BackgroundPct_mean'],
            name='Background', marker_color=bg_colors,
            text=[f"{d}" for d in day_names], textposition='none',
            hovertemplate='%{x} (%{text})<br>Background: %{y:.1f}%<extra></extra>'
        ))
        fig_daily.add_trace(go.Bar(
            x=dates_str, y=daily_summary['InteractivePct_mean'],
            name='Interactive', marker_color=int_colors,
            hovertemplate='%{x}<br>Interactive: %{y:.1f}%<extra></extra>'
        ))

        # Mark suspected spikes with red markers
        if spike_mask.any():
            spike_dates = dates_str[spike_mask]
            spike_vals = daily_summary.loc[spike_mask, 'TotalUtilPct_mean']
            fig_daily.add_trace(go.Scatter(
                x=spike_dates, y=spike_vals + 3,
                mode='markers+text', name='Suspected Spike',
                marker=dict(symbol='triangle-down', size=12, color='red'),
                text=['SPIKE'] * len(spike_dates), textposition='top center',
                textfont=dict(size=9, color='red'),
                hovertemplate='%{x}<br>Suspected pause/settlement spike<extra></extra>'
            ))

        # Trend line (fit on utilisation %, not raw CU scale)
        if TREND_ANALYSIS:
            t = calculate_trend(daily_summary, 'TotalUtilPct_mean')
            if t.get('has_trend'):
                x_vals = list(range(len(daily_summary)))
                trend_y = [t['intercept'] + t['slope'] * xi for xi in x_vals]
                fig_daily.add_trace(go.Scatter(
                    x=dates_str, y=trend_y,
                    mode='lines', name=f"Trend ({t['direction']}, {t['weekly_growth_pct']:+.1f}%/wk)",
                    line=dict(color='#e74c3c' if t['direction'] == 'GROWING' else '#3498db' if t['direction'] == 'DECLINING' else '#95a5a6',
                              width=2, dash='dash'),
                ))

        fig_daily.add_hline(y=80, line_dash="dash", line_color="orange", annotation_text="80% Target")
        fig_daily.add_hline(y=100, line_dash="solid", line_color="red", annotation_text="100% Capacity")
        y_upper = max(110, float(daily_summary['TotalUtilPct_mean'].max()) * 1.2)
        fig_daily.update_layout(
            title="Daily Capacity Utilisation (lighter bars = weekends)",
            barmode='stack', xaxis_title="Date", yaxis_title="Utilisation %",
            yaxis=dict(range=[0, y_upper]),
            height=420, template="plotly_white",
            legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="center", x=0.5)
        )
        charts['fig_daily'] = _apply_chart_spacing(fig_daily)

    # 4. Throttling Timeline
    if len(daily_summary) > 0:
        fig_throttle = go.Figure()
        fig_throttle.add_trace(go.Scatter(x=pd.to_datetime(daily_summary['Date']), y=daily_summary['InteractiveDelayPct_max'], name='Interactive Delay %', mode='lines+markers', line=dict(color='#fd7e14', width=2), fill='tozeroy', fillcolor='rgba(253, 126, 20, 0.2)'))
        fig_throttle.add_trace(go.Scatter(x=pd.to_datetime(daily_summary['Date']), y=daily_summary['InteractiveRejectPct_max'], name='Interactive Rejection %', mode='lines+markers', line=dict(color='#dc3545', width=2)))
        fig_throttle.add_trace(go.Scatter(x=pd.to_datetime(daily_summary['Date']), y=daily_summary['BackgroundRejectPct_max'], name='Background Rejection %', mode='lines+markers', line=dict(color='#6c757d', width=2, dash='dot')))
        fig_throttle.update_layout(title="Throttling Analysis Over Time", xaxis_title="Date", yaxis_title="Throttling %", height=350, template="plotly_white",
            legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="center", x=0.5))
        charts['fig_throttle'] = _apply_chart_spacing(fig_throttle)

    # 5. Carryforward Analysis
    if len(daily_summary) > 0:
        fig_carry = make_subplots(rows=1, cols=2, subplot_titles=['Cumulative Carryforward %', 'Expected Burndown Time'])
        fig_carry.add_trace(go.Bar(x=pd.to_datetime(daily_summary['Date']).dt.strftime('%d %b'), y=daily_summary['CarryoverCumulativePct_max'],
            marker_color=['#dc3545' if v > 50 else '#fd7e14' if v > 20 else '#28a745' for v in daily_summary['CarryoverCumulativePct_max']], name='Carryforward %'), row=1, col=1)
        fig_carry.add_trace(go.Scatter(x=pd.to_datetime(daily_summary['Date']), y=daily_summary['ExpectedBurndownMin_max'], mode='lines+markers', line=dict(color='#667eea', width=2), name='Burndown (min)'), row=1, col=2)
        fig_carry.update_layout(height=350, showlegend=False, template="plotly_white", title_text="Carryforward / Overage Analysis")
        fig_carry.update_xaxes(tickformat="%d %b", row=1, col=1)
        fig_carry.update_xaxes(tickformat="%d %b", row=1, col=2)
        charts['fig_carryover'] = _apply_chart_spacing(fig_carry)

    # 6. SKU Comparison (now includes Reserved pricing)
    sku_names = [f"{s['SKU']}   " for s in sku_analysis]
    sku_utils = [s['Avg Util %'] for s in sku_analysis]
    sku_colors = ['#dc3545' if s['Status'] in ['THROTTLING RISK', 'TOO SMALL'] else '#fd7e14' if s['Status'] == 'TIGHT' else '#28a745' if s['Status'] == 'GOOD FIT' else '#667eea' if s['Status'] == 'COMFORTABLE' else '#6c757d' for s in sku_analysis]

    fig_sku = go.Figure(go.Bar(y=sku_names, x=[min(u, 100) for u in sku_utils], orientation='h', marker_color=sku_colors,
        text=[f"{u:.0f}%" + (" (over)" if u > 100 else "") for u in sku_utils], textposition='outside', cliponaxis=False))
    rec_idx = [i for i, s in enumerate(sku_analysis) if s['SKU'] == recommended_sku['name']][0]
    rec_util = min(sku_utils[rec_idx], 100)
    fig_sku.add_annotation(x=rec_util, y=sku_names[rec_idx], ax=50, ay=0, text="<b>RECOMMENDED</b>", showarrow=True, arrowhead=2, arrowsize=1.2, arrowwidth=2,
        arrowcolor='#28a745', font=dict(color='#28a745', size=11), xanchor='left', bgcolor='rgba(255,255,255,0.8)', bordercolor='#28a745', borderwidth=1, borderpad=4)
    fig_sku.add_vline(x=80, line_dash="dash", line_color="green", annotation_text="80% Target")
    fig_sku.add_vline(x=100, line_dash="solid", line_color="red", annotation_text="100% Limit")
    fig_sku.update_layout(title="SKU Comparison (Average Utilisation)", xaxis_title="Utilisation %", xaxis=dict(range=[0, 130]), height=450, margin=dict(l=120, r=80), template="plotly_white")
    charts['fig_sku'] = _apply_chart_spacing(fig_sku)

    # 7. Top Workloads
    if len(df_items_cap) > 0:
        name_cols = [c for c in df_items_cap.columns if 'item' in c.lower() and 'name' in c.lower()]
        id_cols = [c for c in df_items_cap.columns if 'Item' in c and ('Id' in c or 'Kind' in c)]
        cu_cols = [c for c in df_items_cap.columns if 'CU' in c.upper() and 'Total' in c]
        label_col = name_cols[0] if name_cols else (id_cols[0] if id_cols else None)

        if label_col and cu_cols:
            cu_col = cu_cols[0]
            item_totals = df_items_cap.groupby(label_col)[cu_col].sum().reset_index()
            item_totals = item_totals.sort_values(cu_col, ascending=True).tail(10)
            fig_items = go.Figure(go.Bar(y=item_totals[label_col], x=item_totals[cu_col], orientation='h', marker_color='#667eea',
                text=[f"{v/1e6:,.1f}M CUs" if v >= 1e6 else f"{v/1e3:,.0f}K CUs" if v >= 1e3 else f"{v:,.0f} CUs" for v in item_totals[cu_col]], textposition='outside', cliponaxis=False))
            max_items_x = float(item_totals[cu_col].max()) if len(item_totals) > 0 else 0
            fig_items.update_layout(title="Top 10 Workloads by CU Consumption", xaxis_title="Total CUs", xaxis=dict(range=[0, max_items_x * 1.2 if max_items_x > 0 else 1]), height=420, margin=dict(l=280, r=180), template="plotly_white")
            charts['fig_items'] = _apply_chart_spacing(fig_items)

    # 8. Weekday vs Weekend comparison (if split is enabled)
    if WEEKDAY_WEEKEND_SPLIT and len(daily_summary) > 0:
        wd = daily_summary[daily_summary['IsWeekday']]
        we = daily_summary[~daily_summary['IsWeekday']]
        if len(wd) > 0 and len(we) > 0:
            categories = ['Avg CUs', 'P80 CUs', 'Avg Util %']
            wd_vals = [metrics['weekday_avg_cus'], metrics['weekday_p80_cus'], metrics['weekday_avg_util']]
            we_vals = [metrics['weekend_avg_cus'], metrics['weekend_p80_cus'], metrics['weekend_avg_util']]

            fig_wdwe = make_subplots(rows=1, cols=3, subplot_titles=categories)
            for i, cat in enumerate(categories):
                fig_wdwe.add_trace(go.Bar(x=['Weekday', 'Weekend'], y=[wd_vals[i], we_vals[i]],
                    marker_color=['#667eea', '#b0c4de'],
                    text=[f"{wd_vals[i]:,.0f}", f"{we_vals[i]:,.0f}"] if i < 2 else [f"{wd_vals[i]:.1f}%", f"{we_vals[i]:.1f}%"],
                    textposition='outside', showlegend=False, cliponaxis=False
                ), row=1, col=i+1)
                _max_col = max(wd_vals[i], we_vals[i])
                fig_wdwe.update_yaxes(range=[0, _max_col * 1.2 if _max_col > 0 else 1], row=1, col=i+1)
            fig_wdwe.update_layout(title="Weekday vs Weekend Comparison", height=360, template="plotly_white")
            charts['fig_weekday_weekend'] = _apply_chart_spacing(fig_wdwe)

    # 9. Workspace Breakdown (if data available)
    ws_breakdown = get_workspace_breakdown(df_items_cap)
    if len(ws_breakdown) > 0:
        top_ws = ws_breakdown.head(10)
        fig_ws = go.Figure(go.Bar(
            y=top_ws['Workspace'], x=top_ws['Total CUs'], orientation='h',
            marker_color='#764ba2',
            text=[f"{s:.0f}%" for s in top_ws['Share %']],
            textposition='outside',
            cliponaxis=False
        ))
        max_ws_x = float(top_ws['Total CUs'].max()) if len(top_ws) > 0 else 0
        fig_ws.update_layout(
            title="Top Workspaces by CU Consumption",
            xaxis_title="Total CUs", xaxis=dict(range=[0, max_ws_x * 1.2 if max_ws_x > 0 else 1]),
            height=max(320, len(top_ws) * 40 + 120),
            margin=dict(l=280, r=130), template="plotly_white"
        )
        charts['fig_workspace'] = _apply_chart_spacing(fig_ws)

    return charts


print("[OK] Chart functions defined")


In [None]:
# =============================================================================
# Analysis Functions: Summary, HTML Report, Lakehouse Export
# =============================================================================

DISCLAIMER_TEXT = (
    "DISCLAIMER: This analysis is based on data from the Capacity Metrics semantic model "
    "over the configured time period. Recommendations are indicative, not prescriptive. "
    "This tool can make mistakes. Key limitations: (1) Spark Autoscale workloads billed "
    "separately are NOT included in these metrics. (2) Pricing shown uses published PAYG/reserved "
    "list prices (USD) which vary by region, currency, and agreement. (3) The analysis window is "
    "limited to the last 14 days of Capacity Metrics retention. (4) If Capacity Overage or Surge "
    "Protection is enabled, throttling metrics may not reflect true overuse. "
    "Always validate recommendations against your own workload knowledge and business context "
    "before making capacity changes."
)

def print_capacity_summary(capacity_id, metrics, current_sku, recommended_sku, sku_analysis, daily_summary=None, capacity_name=None):
    """Print formatted summary for a single capacity."""
    m = metrics
    print(f"""
{'='*70}
FABRIC SKU ADVISOR - ANALYSIS: {capacity_name or capacity_id[:8]}
{'='*70}

CAPACITY HEALTH SCORE: {m['health_score']}/100 ({m['health_rating']})
{'='*50}

UTILISATION SUMMARY
  Days Analyzed:        {m['days_analyzed']}
  Average Daily CUs:    {m['avg_daily_cus']:,.0f}
  Peak Daily CUs:       {m['max_daily_cus']:,.0f}
  80th Percentile:      {m['p80_daily_cus']:,.0f}

  Avg Utilisation:      {m['avg_util']:.1f}%
  Peak Utilisation:     {m['max_util']:.1f}%
  Interactive:          {m['avg_interactive']:.1f}%
  Background:           {m['avg_background']:.1f}%
""")

    # Weekday vs Weekend
    if WEEKDAY_WEEKEND_SPLIT:
        print(f"""WEEKDAY vs WEEKEND
  Weekday Avg CUs:      {m['weekday_avg_cus']:,.0f}  (P80: {m['weekday_p80_cus']:,.0f})
  Weekend Avg CUs:      {m['weekend_avg_cus']:,.0f}  (P80: {m['weekend_p80_cus']:,.0f})
  Weekday Avg Util:     {m['weekday_avg_util']:.1f}%
  Weekend Avg Util:     {m['weekend_avg_util']:.1f}%
  Weekday/Weekend Ratio:{m['weekday_weekend_ratio']}x
  SKU sized on:         Weekday P80 (filtered)
""")

    # Spike filtering
    if SPIKE_FILTERING and m.get('spike_days_detected', 0) > 0:
        print(f"""SPIKE FILTERING
  Suspected spikes:     {m['spike_days_detected']} day(s) flagged as pause/settlement catch-up
  P80 (all days):       {m['p80_daily_cus']:,.0f} CUs
  P80 (filtered):       {m['p80_daily_cus_filtered']:,.0f} CUs
""")

    # Trend
    if TREND_ANALYSIS and m.get('trend', {}).get('has_trend'):
        t = m['trend']
        print(f"""CONSUMPTION TREND
  Direction:            {t['direction']}
  Weekly Growth:        {t['weekly_growth_pct']:+.1f}%
  Confidence (R2):      {t['r_squared']}
""")
        if t['direction'] == 'GROWING' and t['weekly_growth_pct'] > 5:
            print("  WARNING: Consumption is growing significantly. Review capacity plan.")

    print(f"""THROTTLING ASSESSMENT
  Days with delays:     {m['days_with_delay']}/{m['days_analyzed']}
  Days with rejections: {m['days_with_rejection']}/{m['days_analyzed']}
  Avg delay %:          {m['avg_delay_pct']:.2f}%
  Max delay %:          {m['max_delay_pct']:.2f}%

CARRYFORWARD/OVERAGE
  Days with overage:    {m['days_with_carryover']}/{m['days_analyzed']}
  Avg carryover %:      {m['avg_carryover_pct']:.2f}%
  Max carryover %:      {m['max_carryover_pct']:.2f}%
  Avg burndown time:    {format_duration(m['avg_burndown_min'])}
  Max burndown time:    {format_duration(m['max_burndown_min'])}

SKU RECOMMENDATION
  Current SKU:          {current_sku['name'] if current_sku else 'Unknown'}
  Recommended SKU:      {recommended_sku['name']}
  Daily CU Budget:      {recommended_sku['budget_30s'] * 2880:,} CUs
  Expected Utilisation: {calculate_utilisation(m['avg_daily_cus'], recommended_sku)*100:.1f}%
""")

    # Action recommendation
    if current_sku:
        cur_idx = next((i for i, s in enumerate(SKUS) if s['name'] == current_sku['name']), -1)
        rec_idx = next((i for i, s in enumerate(SKUS) if s['name'] == recommended_sku['name']), -1)
        if rec_idx > cur_idx:
            print(f"  ACTION: Upgrade from {current_sku['name']} to {recommended_sku['name']}")
        elif rec_idx < cur_idx:
            print(f"  ACTION: Consider downsizing from {current_sku['name']} to {recommended_sku['name']} to save costs")
        else:
            print(f"  ACTION: Stay on {current_sku['name']} \u2013 it is the recommended fit")

    # Cost comparison: PAYG and Reserved
    if current_sku and recommended_sku:
        cur_payg = current_sku.get("monthly_usd", 0)
        rec_payg = recommended_sku.get("monthly_usd", 0)
        rec_reserved = recommended_sku.get("monthly_reserved_usd", 0)
        diff = cur_payg - rec_payg

        print(f"\nCOST COMPARISON (published list prices, USD)")
        print(f"  Current ({current_sku['name']}) PAYG:       ${cur_payg:,}/mo")
        print(f"  Recommended ({recommended_sku['name']}) PAYG:  ${rec_payg:,}/mo")
        print(f"  Recommended ({recommended_sku['name']}) Reserved (1yr): ${rec_reserved:,}/mo")

        if RESERVED_VS_PAYG:
            saving_vs_payg = rec_payg - rec_reserved
            print(f"\n  RESERVED vs PAYG ANALYSIS")
            print(f"    PAYG to Reserved savings:  ${saving_vs_payg:,}/mo (~{RESERVED_DISCOUNT_PCT}%)")
            avg_util_pct = m['avg_util']
            if avg_util_pct > RESERVED_BREAKEVEN_UTIL * 100:
                print(f"    Your avg utilisation ({avg_util_pct:.0f}%) is ABOVE the {RESERVED_BREAKEVEN_UTIL*100:.0f}% break-even.")
                print(f"    Recommendation: Reserved instance likely offers better value.")
            else:
                print(f"    Your avg utilisation ({avg_util_pct:.0f}%) is BELOW the {RESERVED_BREAKEVEN_UTIL*100:.0f}% break-even.")
                print(f"    Recommendation: PAYG with pause/resume scheduling may be cheaper.")

        if diff > 0:
            print(f"\n  ESTIMATED SAVINGS vs current: ~${diff:,}/mo (PAYG list prices)")
        elif diff < 0:
            print(f"\n  ESTIMATED UPGRADE COST: ~${abs(diff):,}/mo (PAYG list prices)")
        print("  Note: Actual costs depend on your region, currency, and agreement.")

    # Surge protection note
    if m['avg_util'] > 100 and m['days_with_delay'] == 0:
        print(f"\n  NOTE: Utilisation exceeds 100% but no throttling detected.")
        print(f"  This may indicate Capacity Overage or Surge Protection is enabled.")
        print(f"  Check Azure billing for additional consumption costs.")

    print(f"\n{'='*70}")
    print(f"  {DISCLAIMER_TEXT}")
    print(f"{'='*70}")

    print("\nFull SKU Comparison:")
    display(pd.DataFrame(sku_analysis))


def save_capacity_to_lakehouse(capacity_id, html_content, daily_summary, sku_analysis, metrics, recommended_sku, current_sku, html_filename):
    """Save per-capacity outputs to Lakehouse.

    Strategy:
    1) Use attached default Lakehouse when available (/lakehouse/default/Files)
    2) Otherwise get/create LAKEHOUSE_NAME (default: LH_Capacity_Advisor) and write via ABFS path
    3) File naming follows REPLACE_EXISTING_OUTPUTS policy
    """
    cap_short = capacity_id[:8]
    run_suffix = datetime.utcnow().strftime("%Y%m%d_%H%M%S")

    try:
        import io
        try:
            import notebookutils
            has_notebookutils = True
            try:
                from notebookutils import mssparkutils
                has_mssparkutils = True
            except Exception:
                mssparkutils = None
                has_mssparkutils = False
        except ImportError:
            has_notebookutils = False
            has_mssparkutils = False
            notebookutils = None
            mssparkutils = None

        if not has_notebookutils:
            print("  [WARN] notebookutils not available. Cannot save to Lakehouse.")
            return

        workspace_id = fabric.get_notebook_workspace_id()
        default_lakehouse_id = notebookutils.runtime.context.get('defaultLakehouseId')
        default_lakehouse_name = notebookutils.runtime.context.get('defaultLakehouseName')

        target_lakehouse_name = str(LAKEHOUSE_NAME).strip() if LAKEHOUSE_NAME else "LH_Capacity_Advisor"
        use_default_mount = False

        if default_lakehouse_id:
            # Best case: notebook already attached to a default lakehouse.
            lakehouse_path = "/lakehouse/default/Files"
            use_default_mount = True
            print(f"  [INFO] Using attached default Lakehouse: {default_lakehouse_name or default_lakehouse_id[:8]}")
        else:
            # No default lakehouse attached: resolve/create target and write through ABFS.
            if not has_mssparkutils:
                print("  [WARN] No default Lakehouse and mssparkutils unavailable; cannot create/select Lakehouse.")
                return

            lakehouse_obj = None
            try:
                lakehouse_obj = mssparkutils.lakehouse.get(target_lakehouse_name)
                print(f"  [INFO] Using existing Lakehouse: {target_lakehouse_name}")
            except Exception:
                try:
                    print(f"  [INFO] Lakehouse '{target_lakehouse_name}' not found. Creating...")
                    mssparkutils.lakehouse.create(
                        name=target_lakehouse_name,
                        description="Auto-created by Fabric SKU Advisor for report outputs",
                        workspaceId=workspace_id,
                    )
                    lakehouse_obj = mssparkutils.lakehouse.get(target_lakehouse_name)
                    print(f"  [SUCCESS] Created Lakehouse: {target_lakehouse_name}")
                except Exception as create_err:
                    print(f"  [ERROR] Could not create/access Lakehouse '{target_lakehouse_name}': {create_err}")
                    return

            lakehouse_id = lakehouse_obj.get('id') if isinstance(lakehouse_obj, dict) else None
            if not lakehouse_id:
                print("  [ERROR] Could not resolve Lakehouse ID for ABFS export.")
                return

            lakehouse_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files"
            print(f"  [INFO] Writing via ABFS path: {lakehouse_path}")

        def _write_text(path: str, text: str):
            if use_default_mount:
                with open(path, "w", encoding="utf-8") as fh:
                    fh.write(text)
            else:
                if has_mssparkutils:
                    mssparkutils.fs.put(path, text, True)
                else:
                    notebookutils.fs.put(path, text, True)

        def _write_df_csv(df: pd.DataFrame, path: str, index: bool):
            _write_text(path, df.to_csv(index=index))

        def _read_csv_if_exists(path: str):
            if use_default_mount:
                try:
                    return pd.read_csv(path)
                except FileNotFoundError:
                    return None
            if has_mssparkutils and mssparkutils.fs.exists(path):
                try:
                    txt = mssparkutils.fs.head(path, 10 * 1024 * 1024)
                    if txt:
                        return pd.read_csv(io.StringIO(txt))
                except Exception:
                    return None
            return None

        def _build_output_name(prefix: str, ext: str = "csv"):
            if REPLACE_EXISTING_OUTPUTS:
                return f"{prefix}_{cap_short}.{ext}"
            return f"{prefix}_{cap_short}_{run_suffix}.{ext}"

        if html_content:
            html_path = f"{lakehouse_path}/{html_filename}"
            _write_text(html_path, html_content)
            print(f"  [SUCCESS] HTML report: {html_filename}")

        if len(daily_summary) > 0:
            summary_name = _build_output_name("capacity_summary")
            _write_df_csv(daily_summary, f"{lakehouse_path}/{summary_name}", index=True)
            print(f"  [SUCCESS] Summary CSV: {summary_name}")

        if sku_analysis:
            sku_name = _build_output_name("sku_analysis")
            _write_df_csv(pd.DataFrame(sku_analysis), f"{lakehouse_path}/{sku_name}", index=False)
            print(f"  [SUCCESS] SKU analysis: {sku_name}")

        # Recommendation summary (for multi-run comparison)
        rec_data = {
            "capacity_id": [capacity_id],
            "generated_at": [datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")],
            "current_sku": [current_sku["name"] if current_sku else "Unknown"],
            "recommended_sku": [recommended_sku["name"]],
            "health_score": [metrics['health_score']],
            "health_rating": [metrics['health_rating']],
            "avg_daily_cus": [metrics['avg_daily_cus']],
            "p80_daily_cus": [metrics['p80_daily_cus']],
            "avg_utilisation_pct": [metrics['avg_util']],
            "weekday_avg_cus": [metrics.get('weekday_avg_cus', 0)],
            "weekend_avg_cus": [metrics.get('weekend_avg_cus', 0)],
            "trend_direction": [metrics.get('trend', {}).get('direction', 'N/A')],
            "trend_weekly_growth_pct": [metrics.get('trend', {}).get('weekly_growth_pct', 0)],
            "spike_days_filtered": [metrics.get('spike_days_detected', 0)],
        }
        rec_df = pd.DataFrame(rec_data)

        rec_name = _build_output_name("recommendation")
        _write_df_csv(rec_df, f"{lakehouse_path}/{rec_name}", index=False)
        print(f"  [SUCCESS] Recommendation: {rec_name}")

        history_path = f"{lakehouse_path}/run_history.csv"
        existing = _read_csv_if_exists(history_path)
        updated = pd.concat([existing, rec_df], ignore_index=True) if existing is not None else rec_df
        _write_df_csv(updated, history_path, index=False)
        print("  [SUCCESS] Run history updated: run_history.csv")

    except Exception as e:
        print(f"  [ERROR] Lakehouse export failed: {e}")


print("[OK] Summary and export functions defined")


In [None]:
# =============================================================================
# Run Analysis for All Capacities
# =============================================================================
# Single capacity:  full inline display (charts, summary, HTML preview)
# Multiple capacities: compact progress per capacity, HTML file per capacity,
#                       cross-capacity summary table at the end (Cell 26)
#
from datetime import datetime
from plotly.io import to_html as plotly_to_html
from IPython.display import display, HTML

all_capacity_results = {}
is_multi = ANALYSIS_MODE.strip().lower() == 'multi'

print(f"\n{'='*70}")
print(f"MULTI-CAPACITY ANALYSIS: {len(CAPACITY_ID_LIST)} capacit{'y' if len(CAPACITY_ID_LIST) == 1 else 'ies'}")
if is_multi:
    print(f"Mode: MULTI - one HTML report per capacity, compact notebook output.")
else:
    print(f"Mode: SINGLE - full charts and summary displayed inline.")
print(f"{'='*70}")

for cap_num, cap_id in enumerate(CAPACITY_ID_LIST, 1):
    _line = '\u2500' * 70
    print(f"\n{_line}")
    cap_name = CAPACITY_NAMES.get(cap_id, cap_id[:8])
    print(f"\u25B6 Capacity {cap_num}/{len(CAPACITY_ID_LIST)}: {cap_name} ({cap_id[:8]}...)")
    print(f"{_line}")

    # --- 1. Process timepoints ---
    daily_summary, current_sku = process_timepoints_for_capacity(df_timepoints, cap_id)

    # If timepoints didn't reveal the SKU, use the Capacities table lookup
    if current_sku is None and 'CAPACITY_SKUS' in dir():
        _sku_name = CAPACITY_SKUS.get(cap_id, '')
        if _sku_name:
            current_sku = next((s for s in SKUS if s['name'] == _sku_name), None)

    if len(daily_summary) == 0:
        print(f"  [WARNING] No timepoint data for {cap_id}, skipping...")
        continue
    print(f"  [OK] Processed {len(daily_summary)} days of timepoint data")
    if current_sku:
        print(f"  [INFO] Current SKU: {current_sku['name']}")

    # --- 2. Calculate metrics ---
    metrics = calculate_capacity_metrics(daily_summary)
    if metrics is None:
        print(f"  [WARNING] Could not calculate metrics for {cap_id}, skipping...")
        continue
    print(f"  [OK] Health score: {metrics['health_score']}/100 ({metrics['health_rating']})")
    if WEEKDAY_WEEKEND_SPLIT:
        print(f"  [INFO] Weekday avg: {metrics['weekday_avg_cus']:,.0f} CUs | Weekend avg: {metrics['weekend_avg_cus']:,.0f} CUs")
    if metrics.get('spike_days_detected', 0) > 0:
        print(f"  [INFO] {metrics['spike_days_detected']} suspected spike day(s) detected and filtered")
    if metrics.get('trend', {}).get('has_trend'):
        t = metrics['trend']
        print(f"  [INFO] Trend: {t['direction']} ({t['weekly_growth_pct']:+.1f}%/week)")

    # --- 3. SKU recommendation ---
    sku_analysis, recommended_sku = recommend_sku_for_capacity(metrics, SKUS, NEEDS_FREE_VIEWERS)
    print(f"  [OK] Recommended SKU: {recommended_sku['name']}")

    # --- 4. Filter items for this capacity ---
    df_items_cap = filter_items_for_capacity(df_items, cap_id)

    # --- 5. Create charts ---
    charts = create_capacity_charts(daily_summary, metrics, sku_analysis, recommended_sku, current_sku, df_items_cap)
    print(f"  [OK] Created {len(charts)} charts")

    # --- 6. Inline display (single capacity only) ---
    if not is_multi:
        for fig_name, fig in charts.items():
            fig.show()
        print_capacity_summary(cap_id, metrics, current_sku, recommended_sku, sku_analysis, daily_summary, cap_name)
    else:
        # Compact one-line summary for multi-capacity mode
        _action = ""
        if current_sku:
            cur_idx = next((i for i, s in enumerate(SKUS) if s['name'] == current_sku['name']), -1)
            rec_idx = next((i for i, s in enumerate(SKUS) if s['name'] == recommended_sku['name']), -1)
            if rec_idx > cur_idx:
                _action = f"Upgrade {current_sku['name']} -> {recommended_sku['name']}"
            elif rec_idx < cur_idx:
                _action = f"Downsize {current_sku['name']} -> {recommended_sku['name']}"
            else:
                _action = f"Stay on {current_sku['name']}"
        print(f"  [RESULT] {cap_name}: {metrics['health_rating']} ({metrics['health_score']}/100) | Avg util: {metrics['avg_util']:.1f}% | {_action}")

    # --- 7. HTML report (always generated) ---
    html_content = None
    html_filename = None
    if SAVE_HTML_REPORT:
        _safe_name = "".join(c if c.isalnum() or c in "-_" else "_" for c in CAPACITY_NAMES.get(cap_id, cap_id[:8]))
        if REPLACE_EXISTING_OUTPUTS:
            html_filename = f"sku_advisor_{_safe_name}.html"
        else:
            _run_suffix = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
            html_filename = f"sku_advisor_{_safe_name}_{_run_suffix}.html"
        try:
            _cap_display = CAPACITY_NAMES.get(cap_id, cap_id[:8])
            _cur_sku_name = current_sku["name"] if current_sku and isinstance(current_sku, dict) else "Unknown"
            _dates = pd.to_datetime(daily_summary["Date"])
            _date_range = f"{_dates.min().strftime('%d %b %Y')} to {_dates.max().strftime('%d %b %Y')}"

            html_parts = []
            # --- HTML header + CSS ---
            html_parts.append("""<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1">
<title>Fabric Capacity Health Report | Data Nova</title>
<style>
*{margin:0;padding:0;box-sizing:border-box}
body{font-family:'Segoe UI',Arial,sans-serif;background:linear-gradient(135deg,#f5f7fa 0%,#e4e8ec 100%);min-height:100vh;color:#333;line-height:1.6;padding:24px}
.container{max-width:1200px;margin:0 auto}
.header{background:linear-gradient(135deg,#667eea 0%,#764ba2 100%);color:white;padding:40px;border-radius:16px;margin-bottom:24px;box-shadow:0 10px 40px rgba(102,126,234,0.3)}
.header h1{font-size:2.5em;font-weight:700;margin-bottom:8px}
.header .subtitle{opacity:0.9;font-size:1.1em}
.header .meta{margin-top:16px;font-size:0.9em;opacity:0.8}
.section{background:white;border-radius:16px;padding:24px;margin-bottom:24px;box-shadow:0 2px 12px rgba(0,0,0,0.08)}
.section h2{font-size:1.2em;color:#333;margin-bottom:16px;padding-bottom:12px;border-bottom:2px solid #667eea}
.kpi-grid{display:grid;grid-template-columns:repeat(auto-fit,minmax(200px,1fr));gap:16px}
.kpi{background:white;border-radius:12px;padding:24px;text-align:center;box-shadow:0 2px 12px rgba(0,0,0,0.08)}
.kpi .value{font-size:2.2em;font-weight:700;color:#667eea;margin-bottom:4px}
.kpi .label{color:#666;font-size:0.9em;font-weight:500}
.chart-wrap{margin:16px 0}
.section-header{display:flex;align-items:center;justify-content:space-between;gap:12px;margin-bottom:16px;padding-bottom:12px;border-bottom:2px solid #667eea;flex-wrap:wrap}
.section-header h2{font-size:1.2em;color:#333;margin:0;padding-bottom:0;border-bottom:none}
.rec-card{background:#f8f9fa;border-radius:12px;padding:20px;border-left:4px solid #667eea}
.rec-card p{margin-bottom:8px}
.rec-card ul{padding-left:24px;margin-bottom:12px}
.rec-card li{margin-bottom:4px}
.disclaimer{background:#fff3cd;border-radius:12px;padding:16px;margin-top:16px;border-left:4px solid #ffc107;font-size:0.85em;color:#856404}
.insight-box{background:#e8f4f8;border-radius:12px;padding:16px;margin:12px 0;border-left:4px solid #17a2b8}
.cost-table{width:100%;border-collapse:collapse;margin:12px 0}
.cost-table th,.cost-table td{padding:10px 16px;text-align:left;border-bottom:1px solid #eee}
.cost-table th{background:#f8f9fa;font-weight:600;color:#333}
.cost-table .highlight{background:#e8f5e9;font-weight:600}
.footer{text-align:center;padding:32px;color:#666;font-size:0.9em}
.help-btn{background:#f0f4ff;border:1px solid #667eea;color:#667eea;width:28px;height:28px;border-radius:50%;cursor:pointer;font-size:14px;font-weight:600;display:flex;align-items:center;justify-content:center;flex-shrink:0}
.help-btn:hover{background:#667eea;color:white}
.about-link-btn{background:#f0f4ff;border:1px solid #667eea;color:#667eea;padding:8px 14px;border-radius:999px;cursor:pointer;font-size:13px;font-weight:600;display:inline-flex;align-items:center;justify-content:center;text-decoration:none;line-height:1;min-height:34px}
.about-link-btn:hover{background:#667eea;color:white}
.modal-overlay{display:none;position:fixed;top:0;left:0;width:100%;height:100%;background:rgba(0,0,0,0.5);z-index:1000;justify-content:center;align-items:center}
.modal-overlay.active{display:flex}
.modal{background:white;border-radius:16px;padding:32px;max-width:500px;width:90%;max-height:80vh;overflow-y:auto;position:relative;box-shadow:0 20px 60px rgba(0,0,0,0.3)}
.modal h4{color:#667eea;margin-bottom:16px;font-size:1.3em}
.modal p,.modal ul,.modal li{color:#555;line-height:1.7;margin-bottom:8px;padding-left:20px}
.modal-close{position:absolute;top:16px;right:16px;background:#f0f0f0;border:none;width:32px;height:32px;border-radius:50%;cursor:pointer;font-size:18px;color:#666;display:flex;align-items:center;justify-content:center}
</style></head><body><div class="container">
<div class="header">
  <h1>Fabric Capacity Health Report</h1>
  <p class="subtitle">Capacity analysis report</p>
  <p class="meta" style="margin-bottom:8px;"><strong>Capacity:</strong> """ + _cap_display + """ (""" + _cur_sku_name + """) &bull; <strong>Period:</strong> """ + _date_range + """</p>
  <p class="meta">Prathy Kamasani | Data Nova &mdash; Generated """ + datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC") + """</p>
</div>""")

            # --- Disclaimer banner at top ---
            html_parts.append('<div class="disclaimer"><strong>Important:</strong> ' + DISCLAIMER_TEXT + '</div>')

            # --- KPIs ---
            kpi_lines = []
            kpi_lines.append(f'<div class="kpi"><div class="value">{metrics["health_score"]}%</div><span class="label">Health Score \u00b7 {metrics["health_rating"]}</span></div>')

            if current_sku and recommended_sku:
                _c_idx = next((i for i, s in enumerate(SKUS) if s['name'] == current_sku['name']), -1)
                _r_idx = next((i for i, s in enumerate(SKUS) if s['name'] == recommended_sku['name']), -1)
                _arrow = '&uarr;' if _r_idx > _c_idx else ('&darr;' if _r_idx < _c_idx else '&check;')
                _word = 'Upgrade' if _r_idx > _c_idx else ('Downsize' if _r_idx < _c_idx else 'Stay')
                kpi_lines.append(f'<div class="kpi"><div class="value">{current_sku["name"]} {_arrow} {recommended_sku["name"]}</div><span class="label">{_word} \u00b7 SKU Recommendation</span></div>')
            elif recommended_sku:
                kpi_lines.append(f'<div class="kpi"><div class="value">{recommended_sku["name"]}</div><span class="label">Recommended SKU</span></div>')

            if current_sku and recommended_sku:
                _diff = current_sku.get("monthly_usd", 0) - recommended_sku.get("monthly_usd", 0)
                if _diff > 0:
                    kpi_lines.append(f'<div class="kpi"><div class="value" style="color:#28a745;">~${_diff:,}/mo</div><span class="label">Est. Savings (list price)</span></div>')
                elif _diff < 0:
                    kpi_lines.append(f'<div class="kpi"><div class="value" style="color:#fd7e14;">~+${abs(_diff):,}/mo</div><span class="label">Est. Upgrade Cost (list price)</span></div>')

            kpi_lines.append(f'<div class="kpi"><span class="label">Days Analyzed</span><div class="value">{metrics["days_analyzed"]}</div></div>')
            kpi_lines.append(f'<div class="kpi"><span class="label">Avg Daily CUs</span><div class="value">{metrics["avg_daily_cus"]:,.0f}</div></div>')
            kpi_lines.append(f'<div class="kpi"><span class="label">Avg Util %</span><div class="value">{metrics["avg_util"]:.1f}%</div></div>')
            if metrics['days_with_delay'] > 0:
                kpi_lines.append(f'<div class="kpi"><span class="label">Days with Throttling</span><div class="value">{metrics["days_with_delay"]}/{metrics["days_analyzed"]}</div></div>')

            # Trend KPI
            _trend = metrics.get('trend', {})
            if _trend.get('has_trend'):
                _t_color = '#dc3545' if _trend['direction'] == 'GROWING' else '#28a745' if _trend['direction'] == 'DECLINING' else '#6c757d'
                kpi_lines.append(f'<div class="kpi"><div class="value" style="color:{_t_color};">{_trend["weekly_growth_pct"]:+.1f}%/wk</div><span class="label">Consumption Trend \u00b7 {_trend["direction"]}</span></div>')

            html_parts.append('<div class="section"><div class="section-header"><h2>Key Metrics</h2><button class="help-btn" onclick="showHelp(\'metrics\')" title="Help">?</button></div><div class="kpi-grid">' + "".join(kpi_lines) + "</div></div>")

            # --- Weekday vs Weekend insight ---
            if WEEKDAY_WEEKEND_SPLIT and metrics.get('weekday_count', 0) > 0 and metrics.get('weekend_count', 0) > 0:
                _ratio = metrics['weekday_weekend_ratio']
                _insight = f"Weekday consumption is {_ratio}x higher than weekends. " if _ratio > 1.5 else "Weekday and weekend consumption are similar. "
                _insight += f"SKU recommendation is based on weekday P80 ({metrics['weekday_p80_cus']:,.0f} CUs) for accurate working-day sizing."
                html_parts.append(f'<div class="section"><div class="section-header"><h2>Weekday vs Weekend</h2><button class="help-btn" onclick="showHelp(\'weekday\')" title="Help">?</button></div>'
                    f'<div class="insight-box"><strong>Insight:</strong> {_insight}</div>'
                    f'<div class="kpi-grid">'
                    f'<div class="kpi"><div class="value">{metrics["weekday_avg_cus"]:,.0f}</div><span class="label">Weekday Avg CUs</span></div>'
                    f'<div class="kpi"><div class="value">{metrics["weekend_avg_cus"]:,.0f}</div><span class="label">Weekend Avg CUs</span></div>'
                    f'<div class="kpi"><div class="value">{metrics["weekday_avg_util"]:.1f}%</div><span class="label">Weekday Avg Util</span></div>'
                    f'<div class="kpi"><div class="value">{metrics["weekend_avg_util"]:.1f}%</div><span class="label">Weekend Avg Util</span></div>'
                    f'</div></div>')

            # --- Spike filtering note ---
            if SPIKE_FILTERING and metrics.get('spike_days_detected', 0) > 0:
                _n = metrics['spike_days_detected']
                html_parts.append(f'<div class="section"><div class="section-header"><h2>Spike Filtering</h2><button class="help-btn" onclick="showHelp(\'spike\')" title="Help">?</button></div>'
                    f'<div class="insight-box"><strong>Note:</strong> {_n} day(s) showed anomalously high peak CUs, '
                    f'likely from capacity pause/resume or settlement catch-up processing. '
                    f'These were excluded from the P80 calculation used for SKU sizing. '
                    f'P80 (all days): {metrics["p80_daily_cus"]:,.0f} CUs vs P80 (filtered): {metrics["p80_daily_cus_filtered"]:,.0f} CUs.</div></div>')

            # --- Charts ---
            chart_list = [
                ("fig_util_gauges", f"Utilisation on {recommended_sku['name']}", "util"),
                ("fig_daily", "Daily Utilisation (lighter bars = weekends)", "daily"),
                ("fig_weekday_weekend", "Weekday vs Weekend Comparison", "weekday-chart"),
                ("fig_throttle", "Throttling (delay & rejection %)", "throttle"),
                ("fig_carryover", "Carryforward & Expected Burndown", "carryover"),
                ("fig_sku", "SKU Utilisation Comparison", "sku"),
                ("fig_items", "Top Workloads by CUs", "items"),
                ("fig_workspace", "Top Workspaces by CU Consumption", "workspace"),
            ]
            for fig_name, title, help_id in chart_list:
                fig = charts.get(fig_name)
                if fig is not None and hasattr(fig, "to_html"):
                    html_parts.append(f'<div class="section"><div class="section-header"><h2>{title}</h2><button class="help-btn" onclick="showHelp(\'{help_id}\')" title="Help">?</button></div><div class="chart-wrap">')
                    html_parts.append(fig.to_html(full_html=False, include_plotlyjs="cdn"))
                    html_parts.append("</div></div>")

            # --- Reserved vs PAYG comparison ---
            if RESERVED_VS_PAYG and recommended_sku:
                rec_payg = recommended_sku.get("monthly_usd", 0)
                rec_reserved = recommended_sku.get("monthly_reserved_usd", 0)
                rec_saving = rec_payg - rec_reserved
                avg_util_decimal = metrics['avg_util'] / 100.0

                _payg_advice = ""
                if avg_util_decimal > RESERVED_BREAKEVEN_UTIL:
                    _payg_advice = f'<div class="insight-box" style="background:#e8f5e9;border-color:#28a745;"><strong>Recommendation:</strong> Your average utilisation ({metrics["avg_util"]:.0f}%) is above the ~{RESERVED_BREAKEVEN_UTIL*100:.0f}% break-even point. A <strong>reserved instance</strong> would likely save ~${rec_saving:,}/mo vs PAYG for {recommended_sku["name"]}.</div>'
                else:
                    _payg_advice = f'<div class="insight-box" style="background:#fff3cd;border-color:#ffc107;"><strong>Recommendation:</strong> Your average utilisation ({metrics["avg_util"]:.0f}%) is below the ~{RESERVED_BREAKEVEN_UTIL*100:.0f}% break-even point. <strong>PAYG with pause/resume scheduling</strong> may be more cost-effective than a reserved instance.</div>'

                _cost_rows = ""
                for sa in sku_analysis:
                    _hl = ' class="highlight"' if sa['SKU'] == recommended_sku['name'] else ''
                    _cost_rows += f'<tr{_hl}><td>{sa["SKU"]}</td><td>{sa["Avg Util %"]}%</td><td>${sa["PAYG $/mo"]:,}</td><td>${sa["Reserved $/mo"]:,}</td><td>${sa["Savings $/mo"]:,}</td></tr>'

                html_parts.append(f'<div class="section"><div class="section-header"><h2>Reserved vs PAYG Cost Comparison</h2><button class="help-btn" onclick="showHelp(\'reserved\')" title="Help">?</button></div>'
                    f'{_payg_advice}'
                    f'<table class="cost-table"><thead><tr><th>SKU</th><th>Avg Util %</th><th>PAYG $/mo</th><th>Reserved $/mo</th><th>Savings $/mo</th></tr></thead>'
                    f'<tbody>{_cost_rows}</tbody></table>'
                    f'<p style="font-size:0.85em;color:#666;margin-top:8px;">Published list prices (USD). Reserved = 1-year commitment. Actual costs vary by region, currency, and agreement. Break-even at ~{RESERVED_BREAKEVEN_UTIL*100:.0f}% utilisation.</p></div>')

            # --- Recommendations ---
            rec_html = '<div class="section"><div class="section-header"><h2>Recommendations</h2><button class="help-btn" onclick="showHelp(\'rec\')" title="Help">?</button></div><div class="rec-card">'
            _h = metrics['health_rating']
            _rn = recommended_sku['name']
            if _h == 'CRITICAL':
                rec_html += f'<p style="color:#dc3545;font-weight:600;">URGENT: Critically overloaded.</p><ul><li>Upgrade to <strong>{_rn}</strong> immediately</li><li>Reschedule heavy background jobs</li><li>Check if job-level bursting is enabled (default: ON). Disable if single jobs are monopolising capacity.</li></ul>'
            elif _h == 'POOR':
                rec_html += f'<p style="color:#fd7e14;font-weight:600;">WARNING: Significant stress.</p><ul><li>Plan upgrade to <strong>{_rn}</strong></li><li>Spread workloads across off-peak hours</li><li>Review job-level bursting settings if concurrency is a concern</li></ul>'
            elif _h == 'FAIR':
                rec_html += f'<p style="color:#856404;font-weight:600;">ATTENTION: Running warm.</p><ul><li>Consider <strong>{_rn}</strong></li><li>Optimise heavy refresh schedules</li></ul>'
            elif _h == 'GOOD':
                rec_html += '<p style="color:#28a745;font-weight:600;">HEALTHY: Well-sized.</p><ul><li>Continue monitoring for growth trends</li></ul>'
            else:
                rec_html += '<p style="color:#28a745;font-weight:600;">EXCELLENT: Plenty of headroom.</p><ul><li>Consider downsizing to save costs</li></ul>'

            # Trend warning
            _trend = metrics.get('trend', {})
            if _trend.get('has_trend') and _trend['direction'] == 'GROWING' and _trend['weekly_growth_pct'] > 3:
                rec_html += f'<li style="color:#dc3545;">Consumption is growing at {_trend["weekly_growth_pct"]:+.1f}%/week. Plan for the next SKU tier within the coming weeks.</li>'

            # Surge/overage note
            if metrics['avg_util'] > 100 and metrics['days_with_delay'] == 0:
                rec_html += '<li>No throttling detected despite high utilisation. Check if Capacity Overage or Surge Protection is enabled, as these incur additional Azure charges.</li>'

            rec_html += '</ul></div>'

            # Spark autoscale disclaimer
            rec_html += '<div class="insight-box"><strong>Spark Autoscale Note:</strong> If Spark Autoscale Billing is enabled for this capacity, Spark workloads are billed separately on a pay-as-you-go basis and are NOT reflected in these metrics. Factor in those costs separately when evaluating total Fabric spend.</div>'

            rec_html += '</div>'
            html_parts.append(rec_html)

            # --- Help modals ---
            html_parts.append('''<div class="modal-overlay" id="modal-metrics" onclick="closeModal(event, 'metrics')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'metrics')">×</button><h4>Understanding Key Metrics</h4><p>These metrics summarise your Fabric capacity health:</p><ul><li><strong>Health Score:</strong> Composite 0–100 score from utilisation, throttling and carryforward.</li><li><strong>SKU Recommendation:</strong> Upgrade / downsize / stay based on usage.</li><li><strong>Avg Daily CUs, Avg Util %, Days with Throttling:</strong> Core consumption and stress signals.</li><li><strong>Consumption Trend:</strong> Week-over-week growth when available.</li></ul></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-weekday" onclick="closeModal(event, 'weekday')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'weekday')">×</button><h4>Weekday vs Weekend</h4><p>Capacity Metrics can split consumption by weekday vs weekend. SKU recommendation uses weekday P80 for sizing so working-day demand is reflected. If weekend usage is much lower, PAYG with pause/resume may be cost-effective.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-spike" onclick="closeModal(event, 'spike')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'spike')">×</button><h4>Spike Filtering</h4><p>Days with anomalously high peak CUs (e.g. after capacity pause/resume or settlement catch-up) can be excluded from the P80 used for SKU sizing. This avoids one-off spikes from driving an oversized recommendation.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-util" onclick="closeModal(event, 'util')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'util')">×</button><h4>Utilisation Gauges</h4><p>Shows average and peak utilisation on the recommended SKU. Target is typically 60–80% average with peak below 95% to avoid throttling. Green = good fit; red = risk.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-daily" onclick="closeModal(event, 'daily')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'daily')">×</button><h4>Daily Utilisation</h4><p>Daily consumption over time. Lighter bars = weekends. Use this to see spikes, weekday/weekend patterns and outliers.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-weekday-chart" onclick="closeModal(event, 'weekday-chart')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'weekday-chart')">×</button><h4>Weekday vs Weekend Comparison</h4><p>Compares average consumption on weekdays vs weekends. Helps decide if pause/resume or reserved capacity is better.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-throttle" onclick="closeModal(event, 'throttle')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'throttle')">×</button><h4>Throttling (Delay & Rejection %)</h4><p>When capacity is overloaded, Fabric delays or rejects operations. This chart shows interactive delay %, interactive rejection % and background rejection % over time. High or rising values mean the capacity is undersized.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-carryover" onclick="closeModal(event, 'carryover')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'carryover')">×</button><h4>Carryforward & Expected Burndown</h4><p>Carryforward is accumulated capacity debt when consumption exceeds the SKU. Burndown is how long it takes to clear that debt. High carryforward or long burndown indicates sustained overuse.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-sku" onclick="closeModal(event, 'sku')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'sku')">×</button><h4>SKU Utilisation Comparison</h4><p>Compares your workload against all Fabric SKUs. Bar length = average utilisation %. Aim for the recommended SKU in the green zone (around 80% target).</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-items" onclick="closeModal(event, 'items')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'items')">×</button><h4>Top Workloads by CUs</h4><p>Which reports, semantic models, dataflows, etc. consume the most capacity. Use this to target optimisation (schedule, incremental refresh, or right-size).</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-workspace" onclick="closeModal(event, 'workspace')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'workspace')">×</button><h4>Top Workspaces by CU Consumption</h4><p>Which workspaces use the most capacity. Helps with chargeback or prioritising which areas to optimise.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-reserved" onclick="closeModal(event, 'reserved')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'reserved')">×</button><h4>Reserved vs PAYG</h4><p>Reserved capacity (1-year commitment) has a lower effective price at high utilisation. The table shows PAYG vs reserved list prices. Above the break-even utilisation (~60–70%), reserved usually saves money; below it, PAYG with pause/resume may be cheaper.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-rec" onclick="closeModal(event, 'rec')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'rec')">×</button><h4>Recommendations</h4><p>Actionable advice based on health rating (CRITICAL/POOR/FAIR/GOOD/EXCELLENT). Includes upgrade/downsize guidance, throttling and scheduling tips, and notes on trend and Spark Autoscale.</p></div></div>''')
            html_parts.append('''<div class="modal-overlay" id="modal-overview" onclick="closeModal(event, 'overview')"><div class="modal" onclick="event.stopPropagation()"><button class="modal-close" onclick="closeModal(event, 'overview')">×</button><h4>About This Report</h4><p>This Fabric Capacity Health Report is generated from the Capacity Metrics semantic model. It includes health score, SKU recommendation, throttling and carryforward analysis, and cost comparison. Click the <strong>?</strong> next to each section for more detail.</p><p>Data Nova | Microsoft Fabric Training &amp; Consulting</p></div></div>''')

            # --- Footer with disclaimer ---
            html_parts.append(f'<div class="section disclaimer"><strong>Disclaimer:</strong> {DISCLAIMER_TEXT}</div>')
            html_parts.append('<div class="footer"><p>Fabric SKU Advisor \u2013 Advanced | <a href="https://www.data-nova.io">Data Nova</a></p><p><button class="about-link-btn" onclick="showHelp(\'overview\')" style="margin-top:8px">About this report</button></p></div></div><script>function showHelp(type){document.getElementById("modal-"+type).classList.add("active");document.body.style.overflow="hidden"}function closeModal(event,type){document.getElementById("modal-"+type).classList.remove("active");document.body.style.overflow=""}document.addEventListener("keydown",function(e){if(e.key==="Escape"){document.querySelectorAll(".modal-overlay").forEach(function(m){m.classList.remove("active")});document.body.style.overflow=""}});</script></body></html>')

            html_content = "\n".join(html_parts)

            # HTML report generated as a file, not rendered inline
            # (inline would duplicate the charts already shown via fig.show())
            display(HTML(f"<p style='color:#28a745;font-weight:bold;'>\u2713 HTML report generated: {html_filename}</p>"))

            # Save HTML to Lakehouse if enabled
            if SAVE_TO_LAKEHOUSE:
                try:
                    lh_path = f"/lakehouse/default/Files/{html_filename}"
                    with open(lh_path, "w", encoding="utf-8") as f:
                        f.write(html_content)
                    print(f"  [SUCCESS] HTML report saved to Lakehouse: {html_filename}")
                except Exception:
                    print(f"  [WARNING] Could not save to attached Lakehouse.")
            else:
                print(f"  [INFO] Set SAVE_TO_LAKEHOUSE = True to persist HTML reports.")

        except Exception as e:
            print(f"  [ERROR] HTML report generation failed: {e}")

    # --- 8. Lakehouse export ---
    if SAVE_TO_LAKEHOUSE:
        save_capacity_to_lakehouse(cap_id, html_content, daily_summary, sku_analysis, metrics, recommended_sku, current_sku, html_filename)

    # --- 9. Store results ---
    all_capacity_results[cap_id] = {
        'daily_summary': daily_summary,
        'current_sku': current_sku,
        'recommended_sku': recommended_sku,
        'metrics': metrics,
        'sku_analysis': sku_analysis,
        'charts': charts,
        'html_content': html_content,
        'html_filename': html_filename if SAVE_HTML_REPORT else None,
    }

print(f"\n{'='*70}")
print(f"ANALYSIS COMPLETE: {len(all_capacity_results)}/{len(CAPACITY_ID_LIST)} capacities analyzed")
if is_multi and SAVE_HTML_REPORT:
    print(f"\nHTML reports generated:")
    for cid, r in all_capacity_results.items():
        if r.get('html_filename'):
            print(f"  {r['html_filename']}")
    if not SAVE_TO_LAKEHOUSE:
        print(f"\nTo save these reports, set SAVE_TO_LAKEHOUSE = True and re-run.")
print(f"{'='*70}")


In [None]:
# =============================================================================
# Cross-Capacity Summary
# =============================================================================
from IPython.display import display, HTML
from datetime import datetime

if len(all_capacity_results) > 1:

    # Build summary data
    rows = []
    for cap_id, r in all_capacity_results.items():
        m = r['metrics']
        cur = r['current_sku']
        rec = r['recommended_sku']
        t = m.get('trend', {})

        # Action word + arrow
        if cur:
            c_idx = next((i for i, s in enumerate(SKUS) if s['name'] == cur['name']), -1)
            r_idx = next((i for i, s in enumerate(SKUS) if s['name'] == rec['name']), -1)
            if r_idx > c_idx:
                action_html = f"{cur['name']} &rarr; <strong>{rec['name']}</strong>"
                action_class = "badge-upgrade"
            elif r_idx < c_idx:
                action_html = f"{cur['name']} &rarr; <strong>{rec['name']}</strong>"
                action_class = "badge-downsize"
            else:
                action_html = f"<strong>{cur['name']}</strong> (stay)"
                action_class = "badge-stay"
        else:
            action_html = f"<strong>{rec['name']}</strong>"
            action_class = "badge-stay"

        # Health badge colour
        hs = m['health_score']
        if hs >= 90:   h_color = '#20c997'
        elif hs >= 75: h_color = '#28a745'
        elif hs >= 50: h_color = '#ffc107'
        elif hs >= 25: h_color = '#fd7e14'
        else:          h_color = '#dc3545'

        # Trend indicator
        trend_html = ""
        if TREND_ANALYSIS and t.get('has_trend'):
            t_icon = "&#9650;" if t['direction'] == 'GROWING' else "&#9660;" if t['direction'] == 'DECLINING' else "&#9644;"
            t_color = '#dc3545' if t['direction'] == 'GROWING' else '#28a745' if t['direction'] == 'DECLINING' else '#6c757d'
            trend_html = f'<span style="color:{t_color};font-weight:600;">{t_icon} {t["weekly_growth_pct"]:+.1f}%/wk</span>'

        # Monthly cost (recommended SKU)
        payg = rec.get('monthly_usd', 0)
        reserved = rec.get('monthly_reserved_usd', 0)

        rows.append({
            'cap_id': cap_id,
            'cap_short': CAPACITY_NAMES.get(cap_id, cap_id[:8]),
            'action_html': action_html,
            'action_class': action_class,
            'health_score': hs,
            'health_rating': m['health_rating'],
            'h_color': h_color,
            'avg_util': m['avg_util'],
            'trend_html': trend_html,
            'payg': payg,
            'reserved': reserved,
            'html_filename': r.get('html_filename', ''),
        })


In [None]:
# =============================================================================
# Cross-Capacity Comparison Table
# =============================================================================
from IPython.display import display, HTML
from datetime import datetime

if len(all_capacity_results) > 1:

    # Build styled HTML table
    table_rows = ""
    for row in rows:
        avg_util_str = f"{row['avg_util']:.1f}"
        payg_str = f"${row['payg']:,}"
        reserved_str = f"${row['reserved']:,}"
        table_rows += (
            '<tr>'
            f'<td style="font-weight:600;">{row["cap_short"]}</td>'
            f'<td><span class="{row["action_class"]}">{row["action_html"]}</span></td>'
            f'<td style="text-align:center;"><span class="health-badge" style="background:{row["h_color"]};">{row["health_score"]}</span> {row["health_rating"]}</td>'
            f'<td style="text-align:center;">{avg_util_str}%</td>'
            f'<td style="text-align:center;">{row["trend_html"]}</td>'
            f'<td style="text-align:right;">{payg_str}</td>'
            f'<td style="text-align:right;">{reserved_str}</td>'
            '</tr>'
        )

    # File list
    file_list_html = ""
    for row in rows:
        if row['html_filename']:
            file_list_html += f'<li><code>{row["html_filename"]}</code></li>'

    save_note = ""
    if not SAVE_TO_LAKEHOUSE:
        save_note = '<p style="margin-top:8px;color:#856404;">Set <code>SAVE_TO_LAKEHOUSE = True</code> to persist these reports.</p>'

    ts_str = datetime.utcnow().strftime("%d %b %Y %H:%M UTC")
    n_caps = len(rows)

    summary_html = f"""
    <style>
    .xc-table {{font-family:'Segoe UI',Arial,sans-serif;border-collapse:collapse;width:100%;margin:16px 0;font-size:0.95em;}}
    .xc-table th {{background:linear-gradient(135deg,#667eea,#764ba2);color:white;padding:12px 16px;text-align:left;font-weight:600;}}
    .xc-table td {{padding:10px 16px;border-bottom:1px solid #eee;}}
    .xc-table tr:hover {{background:#f8f9fa;}}
    .health-badge {{display:inline-block;color:white;font-weight:700;padding:4px 10px;border-radius:12px;font-size:0.85em;min-width:36px;text-align:center;}}
    .badge-upgrade {{color:#dc3545;}}
    .badge-downsize {{color:#28a745;}}
    .badge-stay {{color:#6c757d;}}
    .xc-header {{background:linear-gradient(135deg,#667eea 0%,#764ba2 100%);color:white;padding:24px 32px;border-radius:12px 12px 0 0;}}
    .xc-header h2 {{margin:0;font-size:1.4em;font-weight:700;}}
    .xc-header p {{margin:4px 0 0;opacity:0.85;font-size:0.9em;}}
    .xc-wrap {{background:white;border-radius:12px;box-shadow:0 2px 12px rgba(0,0,0,0.08);margin-bottom:20px;overflow:hidden;}}
    .xc-footer {{padding:12px 24px;background:#f8f9fa;font-size:0.85em;color:#666;}}
    .xc-files {{padding:12px 24px;font-size:0.9em;}}
    .xc-files ul {{margin:8px 0 0 20px;}}
    .xc-files li {{margin-bottom:4px;}}
    </style>

    <div class="xc-wrap">
      <div class="xc-header">
    <h2>Cross-Capacity Comparison</h2>
    <p>{n_caps} capacities analysed &bull; {ts_str}</p>
      </div>
      <table class="xc-table">
    <thead>
      <tr>
        <th>Capacity</th>
        <th>SKU Action</th>
        <th style="text-align:center;">Health</th>
        <th style="text-align:center;">Avg Util</th>
        <th style="text-align:center;">Trend</th>
        <th style="text-align:right;">PAYG $/mo</th>
        <th style="text-align:right;">Reserved $/mo</th>
      </tr>
    </thead>
    <tbody>
      {table_rows}
    </tbody>
      </table>
      <div class="xc-files">
    <strong>Detailed reports (one per capacity):</strong>
    <ul>{file_list_html}</ul>
    {save_note}
      </div>
      <div class="xc-footer">
    Pricing: published list prices (USD). Actual costs vary by region, currency, and agreement.
    &bull; Prathy Kamasani | <a href="https://www.data-nova.io" style="color:#667eea;">Data Nova</a>
      </div>
    </div>
    """

    display(HTML(summary_html))


In [None]:
# =============================================================================
# Save Comparison CSV + Footer
# =============================================================================
from datetime import datetime

if len(all_capacity_results) > 1:

    # Save CSV if Lakehouse is enabled
    if SAVE_TO_LAKEHOUSE:
        try:
            csv_rows = []
            for cap_id, r in all_capacity_results.items():
                m = r['metrics']
                t = m.get('trend', {})
                csv_rows.append({
                    'capacity_id': cap_id,
                    'capacity_name': CAPACITY_NAMES.get(cap_id, cap_id[:8]),
                    'current_sku': r['current_sku']['name'] if r['current_sku'] else 'Unknown',
                    'recommended_sku': r['recommended_sku']['name'],
                    'health_score': m['health_score'],
                    'health_rating': m['health_rating'],
                    'avg_util_pct': round(m['avg_util'], 1),
                    'avg_daily_cus': round(m['avg_daily_cus']),
                    'weekday_avg_cus': round(m.get('weekday_avg_cus', 0)),
                    'weekend_avg_cus': round(m.get('weekend_avg_cus', 0)),
                    'trend_direction': t.get('direction', 'N/A'),
                    'trend_weekly_pct': t.get('weekly_growth_pct', 0),
                    'throttling_days': m['days_with_delay'],
                    'payg_usd': r['recommended_sku'].get('monthly_usd', 0),
                    'reserved_usd': r['recommended_sku'].get('monthly_reserved_usd', 0),
                })
            _ts = datetime.utcnow().strftime("%Y%m%d_%H%M")
            lh_path = f"/lakehouse/default/Files/capacity_comparison_{_ts}.csv"
            pd.DataFrame(csv_rows).to_csv(lh_path, index=False)
            print(f"[SUCCESS] Comparison CSV saved: capacity_comparison_{_ts}.csv")
        except Exception as e:
            print(f"[ERROR] Could not save comparison: {e}")

elif len(all_capacity_results) == 1:
    print("\n[INFO] Single capacity analysed. Full report displayed above.")

else:
    print("\n[WARNING] No capacities were successfully analysed.")

print(f"\nBuilt by Prathy Kamasani | Data Nova")
print("https://www.data-nova.io")
