# Weekly SLA Calculation Engine — Google Colab Version

This notebook is **ready for Google Colab**.

## What it does
- Prompts you to **upload** the weekly Salesforce Case History Excel export from your computer
- Computes weekly **handle time** and **SLA compliance** using the agreed rules
- Exports an output Excel file with the exact required columns/order
- Lets you either:
  - **Save to Google Drive**, or
  - **Download** to your computer

## Rules (locked)
- **Timezone:** Qatar (Asia/Qatar, UTC+3)
- **Reporting cohort:** cases with **Date/Time Opened** within **Sunday 00:00:00 → Saturday 23:59:59** (Qatar time)
- **Counted statuses:** `New`, `To Do`, `In Progress`, `Reopen`
- **Stop statuses:** `Resolved`, `Closed`
- **Stop counting at:** `Resolved` (confirmed)
- **Status events only:** rows where `Field / Event == "status"` (case-insensitive)
- **Week selection:** derived from the dataset (anchor = max `Date/Time Opened`) unless overridden


In [None]:
# --- Imports (Colab) ---
import pandas as pd
import numpy as np
from zoneinfo import ZoneInfo
from google.colab import files, drive
import os
import pathlib


## Step 1 — Upload weekly input file + set config

In [None]:
# Upload the weekly Salesforce export from your computer
uploaded = files.upload()

# Use the first uploaded file
INPUT_XLSX = next(iter(uploaded))
print("Uploaded:", INPUT_XLSX)

# Output filename
OUTPUT_XLSX = "Case_SLA_Output.xlsx"

# Timezone + optional week override (Qatar time)
TZ_NAME = "Asia/Qatar"

# Optional override (set to None to auto-derive from anchor max created):
# Example:
# WEEK_START_OVERRIDE = "2025-12-07 00:00:00"
# WEEK_END_OVERRIDE   = "2025-12-13 23:59:59"
WEEK_START_OVERRIDE = None
WEEK_END_OVERRIDE = None

# Cap open cases at a fixed "as of" timestamp (optional).
# If None, uses the runtime current time in TZ_NAME.
# Example:
# AS_OF_OVERRIDE = "2025-12-31 23:59:59"
AS_OF_OVERRIDE = None


Saving report1767177384032.csv to report1767177384032.csv
Uploaded: report1767177384032.csv


## Step 2 — SLA engine (deterministic)

In [None]:
STOP_STATES = {"Resolved", "Closed"}
OPEN_STATES = {"New", "To Do", "In Progress", "Reopen"}

def localize_qatar(dt_series: pd.Series) -> pd.Series:
    s = pd.to_datetime(dt_series, errors="coerce")
    # If naive -> localize; if aware -> convert
    if getattr(s.dt, "tz", None) is None:
        return s.dt.tz_localize(TZ_NAME)
    return s.dt.tz_convert(TZ_NAME)

def week_bounds_from_anchor(anchor_ts: pd.Timestamp):
    """Compute Sun 00:00:00 → Sat 23:59:59 week bounds in Qatar time."""
    anchor_day = anchor_ts.floor("D")
    days_since_sun = (anchor_day.weekday() + 1) % 7  # Sunday->0, Monday->1, ..., Saturday->6
    week_start = anchor_day - pd.Timedelta(days=days_since_sun)
    as_of_ts = week_start + pd.Timedelta(days=6, hours=23, minutes=59, seconds=59)
    return week_start, as_of_ts

def sla_hours_for_reason(reason: str, product_count):
    """SLA policy mapping (as agreed in your master prompt)."""
    if reason == "Merchant Updates - With Translation":
        try:
            pc = float(product_count)
        except Exception:
            pc = np.nan
        if pd.isna(pc):
            return 24
        if pc <= 100: return 24
        if pc <= 200: return 36
        if pc <= 1000: return 48
        if pc <= 2000: return 60
        if pc <= 5000: return 84
        return 120
    if reason == "Promotion Request":
        return 12
    return 24

def compute_case_metrics(case_df: pd.DataFrame, as_of_ts: pd.Timestamp):
    opened = case_df["Date/Time Opened"].dropna().iloc[0] if case_df["Date/Time Opened"].notna().any() else pd.NaT
    if pd.isna(opened):
        return None

    # Status-only history
    status_rows = case_df[case_df["Field / Event"].astype(str).str.strip().str.lower() == "status"].copy()
    status_rows = status_rows[status_rows["Edit Date"].notna()]

    # Build events with deterministic tie-break
    events = [{"time": opened, "old": None, "new": "_INITIAL_OPEN_", "priority": 0}]
    for _, r in status_rows.iterrows():
        if pd.isna(r["New Value"]):
            continue
        new_status = str(r["New Value"]).strip()
        old_status = None if pd.isna(r["Old Value"]) else str(r["Old Value"]).strip()
        priority = 2 if new_status in STOP_STATES else 1
        events.append({"time": r["Edit Date"], "old": old_status, "new": new_status, "priority": priority})

    events.sort(key=lambda e: (e["time"], e["priority"]))

    is_open = True
    open_time = opened
    total_open_seconds = 0.0
    times_reopened = 0

    for ev in events:
        t = ev["time"]
        if pd.isna(t):
            continue

        # Ignore events after the reporting window end
        if t > as_of_ts:
            break

        if ev["new"] == "_INITIAL_OPEN_":
            is_open = True
            open_time = t
            continue

        # Stop clock at Resolved/Closed
        if ev["new"] in STOP_STATES:
            if is_open:
                delta = (t - open_time).total_seconds()
                if delta > 0:
                    total_open_seconds += delta
                is_open = False
            continue

        # Reopen transitions: Resolved -> any counted/open state (Closed can't reopen in your org)
        if (ev["old"] == "Resolved") and (ev["new"] not in STOP_STATES):
            times_reopened += 1
            is_open = True
            open_time = t

    # Cap open cases at the report 'as_of' timestamp
    if is_open and as_of_ts > open_time:
        delta = (as_of_ts - open_time).total_seconds()
        if delta > 0:
            total_open_seconds += delta

    handle_time_hours = round(total_open_seconds / 3600.0, 2)

    reason = case_df["Main Contact Reason"].dropna().astype(str).iloc[0] if case_df["Main Contact Reason"].notna().any() else ""
    product_count = case_df["Product Count"].dropna().iloc[0] if case_df["Product Count"].notna().any() else np.nan
    sla_hours = sla_hours_for_reason(reason, product_count)
    within_sla = handle_time_hours <= sla_hours

    return {
        "Case Number": case_df["Case Number"].iloc[0],
        "Created Week": case_df["Created Week"].dropna().iloc[0] if ("Created Week" in case_df.columns and case_df["Created Week"].notna().any()) else np.nan,
        "handle_time_hours": handle_time_hours,
        "sla_hours": sla_hours,
        "Within SLA": bool(within_sla),
        "Times Reopened": int(times_reopened),
        "Main Contact Reason": reason,
        "Product Count": product_count,
        "Product Count.1": case_df["Product Count.1"].dropna().iloc[0] if case_df["Product Count.1"].notna().any() else np.nan,
        "Case Owner": case_df["Case Owner"].dropna().astype(str).iloc[0] if case_df["Case Owner"].notna().any() else ""
    }


## Step 3 — Run weekly computation + export

In [None]:
# --- LOAD DATA ---
# --- Robust input loader (supports .xlsx/.xls/.csv) ---
def load_input_table(path: str) -> pd.DataFrame:
    """Load Salesforce export from Excel or CSV with clear, deterministic behavior."""
    if not isinstance(path, str) or not path.strip():
        raise ValueError(f"INPUT file path is empty/invalid: {path!r}")
    path = path.strip()
    if not os.path.exists(path):
        raise FileNotFoundError(f"Input file not found: {path}")

    suffix = pathlib.Path(path).suffix.lower()

    # 1) Deterministic path based on extension (preferred)
    if suffix in [".xlsx", ".xlsm", ".xltx", ".xltm"]:
        return pd.read_excel(path, engine="openpyxl")
    if suffix == ".xls":
        try:
            return pd.read_excel(path, engine="xlrd")
        except Exception as e:
            raise ValueError(
                "Input appears to be .xls but required engine 'xlrd' is unavailable. "
                "Either convert to .xlsx or install xlrd.\n"
                f"Original error: {repr(e)}"
            )
    if suffix == ".csv":
        return pd.read_csv(path)

    # 2) Fallback: content sniffing (when extension is missing/wrong)
    with open(path, "rb") as f:
        head = f.read(8)

    if head[:2] == b"PK":
        return pd.read_excel(path, engine="openpyxl")

    if head == b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1":
        try:
            return pd.read_excel(path, engine="xlrd")
        except Exception as e:
            raise ValueError(
                "Input appears to be legacy .xls but required engine 'xlrd' is unavailable. "
                "Either convert to .xlsx or install xlrd.\n"
                f"Original error: {repr(e)}"
            )

    try:
        return pd.read_csv(path)
    except Exception as e:
        raise ValueError(
            f"Excel/CSV format could not be determined for: {path}\n"
            f"Extension: '{suffix or '[none]'}'\n"
            f"Header bytes: {head!r}\n"
            f"CSV read failed with: {repr(e)}"
        )

df = load_input_table(INPUT_XLSX)

# Normalize datetimes in Qatar time
df["Date/Time Opened"] = localize_qatar(df["Date/Time Opened"])
df["Edit Date"] = localize_qatar(df["Edit Date"])

df = df[df["Case Number"].notna()].copy()

# --- LEGACY WEEK BOUNDS (informational unless you filter by week) ---
if WEEK_START_OVERRIDE and WEEK_END_OVERRIDE:
    week_start = pd.Timestamp(WEEK_START_OVERRIDE, tz=TZ_NAME)
    week_end = pd.Timestamp(WEEK_END_OVERRIDE, tz=TZ_NAME)
else:
    anchor = df["Date/Time Opened"].max()
    week_start, week_end = week_bounds_from_anchor(anchor)

print("Report window (Qatar):", week_start, "→", week_end)
print("NOTE: This week window is informational only; handle time is computed for the full cohort unless you filter.")

# --- COHORT FILTER (by creation time) ---
cohort = df.copy()
# Cap timestamp for still-open cases
if AS_OF_OVERRIDE:
    as_of_ts = pd.Timestamp(AS_OF_OVERRIDE)
    if as_of_ts.tzinfo is None:
        as_of_ts = as_of_ts.tz_localize(TZ_NAME)
    else:
        as_of_ts = as_of_ts.tz_convert(TZ_NAME)
else:
    as_of_ts = pd.Timestamp.now(tz=TZ_NAME)
print("Capping open cases at (as_of):", as_of_ts)
print("Unique cases in cohort:", cohort["Case Number"].nunique())

# --- COMPUTE OUTPUT ---
outputs = []
for _, cdf in cohort.groupby("Case Number", sort=False):
    out = compute_case_metrics(cdf, as_of_ts)
    if out is not None:
        outputs.append(out)

out_df = pd.DataFrame(outputs)

# Ensure exact column order
cols = ["Case Number","Created Week","handle_time_hours","sla_hours","Within SLA","Times Reopened",
        "Main Contact Reason","Product Count","Product Count.1","Case Owner"]
out_df = out_df.reindex(columns=cols)

# --- EXPORT ---
# Excel does not support tz-aware datetimes; store run metadata as tz-naive.
with pd.ExcelWriter(OUTPUT_XLSX, engine="openpyxl") as writer:
    out_df.to_excel(writer, sheet_name="Case_SLA", index=False)
    meta = pd.DataFrame([
        {"key": "as_of_timestamp", "value": as_of_ts.tz_localize(None) if getattr(as_of_ts, "tzinfo", None) is not None else as_of_ts},
        {"key": "timezone", "value": TZ_NAME},
        {"key": "input_file", "value": INPUT_XLSX},
        {"key": "history_rows", "value": int(len(df))},
        {"key": "unique_cases", "value": int(cohort["Case Number"].nunique())},
    ])
    meta.to_excel(writer, sheet_name="Run_Metadata", index=False)
print("Saved output:", OUTPUT_XLSX)
out_df.head()


  return pd.read_csv(path)
  s = pd.to_datetime(dt_series, errors="coerce")


Report window (Qatar): 2025-12-28 00:00:00+03:00 → 2026-01-03 23:59:59+03:00
NOTE: This week window is informational only; handle time is computed for the full cohort unless you filter.
Capping open cases at (as_of): 2025-12-31 14:10:21.344436+03:00
Unique cases in cohort: 111874
Saved output: Case_SLA_Output.xlsx


Unnamed: 0,Case Number,Created Week,handle_time_hours,sla_hours,Within SLA,Times Reopened,Main Contact Reason,Product Count,Product Count.1,Case Owner
0,7709540,1,17.77,24,True,2,Activation Requests,,,Neriya William Bhatti
1,7710529,1,0.72,12,True,1,Promotion Request,,,Promotion Team
2,7661679,52,1.12,24,True,0,Merchant Updates - Without Translation,,,Khizar Mehmood
3,7716348,1,3.09,24,True,1,Credentials Request / Issue,,,Shaheer Abid
4,7712607,1,0.43,24,True,2,Device Request/ Issues,,,Partnerships Queue


## Step 4A — Save output to Google Drive (optional)

Run these cells if you want the Excel output saved to your Google Drive.


In [None]:
# Mount Google Drive (only needed once per session)
drive.mount("/content/drive")


Mounted at /content/drive


In [None]:
# Choose a folder path in Drive
DRIVE_FOLDER = "/content/drive/MyDrive/SLA_Reports"
os.makedirs(DRIVE_FOLDER, exist_ok=True)

drive_path = os.path.join(DRIVE_FOLDER, OUTPUT_XLSX)
!cp "{OUTPUT_XLSX}" "{drive_path}"
print("Saved to Drive:", drive_path)


Saved to Drive: /content/drive/MyDrive/SLA_Reports/Case_SLA_Output.xlsx


## Step 4B — Download output to your computer (optional)

Run this cell if you want to download the output Excel file locally.


In [None]:
files.download(OUTPUT_XLSX)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

## Optional — Manual validation spot-checks

If you want, add a few known cases and expected `handle_time_hours` values.
This will show mismatches (if any).


In [None]:
# Example:
# manual_checks = {7418194: 46.38, 7402136: 0.07}
manual_checks = {}

if manual_checks:
    rows = []
    for case_num, expected in manual_checks.items():
        cdf = cohort[cohort["Case Number"] == case_num]
        if cdf.empty:
            rows.append({"Case Number": case_num, "expected": expected, "engine": None, "match": False, "note": "Not in cohort"})
            continue
        out = compute_case_metrics(cdf, week_end)
        engine_val = out["handle_time_hours"] if out else None
        rows.append({
            "Case Number": case_num,
            "expected": expected,
            "engine": engine_val,
            "match": (engine_val == expected),
            "diff_hours": None if engine_val is None else round(engine_val - expected, 4),
        })
    pd.DataFrame(rows)
else:
    print("No manual checks provided.")


No manual checks provided.
