In [None]:
from unittest.mock import inplace
from nt import rename
import pandas as pd
import numpy as np
import re, unicodedata
from datetime import datetime, timedelta
from numpy.ma.extras import column_stack
from pathlib import Path

In [None]:
# CONFIG: assignment the paths
TEST_TIME_XLSX = r"C:\Users\..SERVICE_TICKETS_XLSX = r"C:\Users\..OUTPUT_ROWLEVEL_CSV = r"C:\Users\..OUTPUT_CSV = r"C:\Users\..

In [None]:
TESTS_SHEET = 0
SERVICE_SHEET = 0

In [None]:
# Column names assignment
COL_TS = "TIME STAMP"
COL_PN = "PN"
COL_SN = "SN"
COL_SYMPTOM = "BIN_DESCRIPTION"
COL_TEST_TYPE = "TESTER_TYPE"
COL_TESTER = "TESTER"
COL_TEST_SEC = "Sum of TEST_TIME in SECONDS"
COL_TEST_HHMMSS = "Sum of TEST_TIME [h]:mm:ss"

In [None]:
COL_HOSTNAME = "HostName"
COL_STATION_TYPE = "Station Type"
COL_LOCK = "LockDate"
COL_UNLOCK = "UnlockDate"
COL_DURATION = "Duration"
COL_FAILED_SN = "FailedSNList"

In [None]:
# 24h window
WINDOW_SECONDS = 24*3600

In [None]:
# Helpers
def safe_to_datetime(series_or_value):
    """Coerce to pandas Timestamp with NaT on failure."""
    return pd.to_datetime(series_or_value, errors="coerce")

In [None]:
def coerce_seconds(x):
    """Ensure numeric seconds. Accepts numbers, 'hh:mm:ss', or day-fraction string"""
    if pd.isna(x):
        return 0.0
    if isinstance(x,(int, float, np.integer, np.floating)):
        return float(x)
    if isinstance(x, str):
        s=x.strip()
        if ":" in s:
            parts=s.split(":")
            if len(parts)==3:
                try:
                    h, m, sec = map(float, parts)
                    return h*3600+m*60+sec
                except Exception:
                    pass
        # Numeric day-fraction
        try:
            return float(s)*86400.0
        except Exception:
            return 0.0
    return 0.0
def seconds_to_hhmmss(total_seconds: float) -> str:
    """Render >24h-capable [h]:mm:ss string (no day wrap)."""
    secs = int(round(max(0, total_seconds)))
    h=secs // 3600
    m=(secs % 3600) // 60
    s=secs%60
    return f"{h}:{m:02d}:{s:02d}"

In [None]:
def s_strip(s):
    return str(s).strip()

In [None]:
def merge_overlapping(intervals):
    """Merge overlapping/touching intervals: [(starts_ts, end_ts), ...] -> merged list."""
    if not intervals:
        return []
    intervals = sorted(intervals, key=lambda x: x[0])
    merged = [intervals[0]]
    for st, et in intervals[1:]:
        lst_st, lst_et = merged[-1]
        if st <= lst_et: #overlap or touch
            merged[-1] = (lst_st, max(lst_et, et))
        else:
            merged.append((st, et))
    return merged

In [None]:
# Loading
tests = pd.read_excel(TEST_TIME_XLSX, sheet_name=TESTS_SHEET)
svc = pd.read_excel(SERVICE_TICKETS_XLSX, sheet_name=SERVICE_SHEET)

In [None]:
# Force rename column header names
def normalize_headers(df):
    df.columns = (
        df.columns.str.strip()
        .str.replace(r"[/\\]", "/", regex=True)
        .str.replace(r"\s+", " ", regex=True)
        .str.lower()
    )
    return df

In [None]:
tests = normalize_headers(tests)
svc = normalize_headers(svc)

In [None]:
# Expected header mappings
tests.rename(
    columns={
        "time stamp": "TIME STAMP",
        "pn": "PN",
        "sn": "SN",
        "bin_description": "BIN_DESCRIPTION",
        "test_type": "TEST_TYPE",
        "tester": "TESTER",
        "sum of test_time in seconds": "Sum of TEST_TIME in SECONDS",
    },
    inplace=True,
)

In [None]:
svc.rename(
    columns={
        "pn (sfg/sa)": "PN",
        "failedsnlist": "SN",
        "hostname": "TESTER",
        "lockdate": "LockDate",
        "unlockdate": "UnlockDate",
        "duration": "Duration",
    },
    inplace=True,
)

In [None]:
# Make sure svc has a numeric seconds column to merge
if "Duration_sec" not in svc.columns:
    if "Duration" in svc.columns:
        svc["Duration_sec"] = svc["Duration"].apply(coerce_seconds)
        print("Built svc['Duration_sec'] from svc['Duration']")
    else:
        svc["Duration_sec"] = pd.NA
        print("svc lacks 'Duration' and 'Duration_sec' - created empty seconds column.")

In [None]:
# Build join (merge) keys (Normalized)
tests["TIME STAMP"] = pd.to_datetime(tests["TIME STAMP"], errors="coerce")
tests["day"] = tests["TIME STAMP"].dt.floor("D")
tests["station_name"] = tests["TESTER"].astype(str).str.strip()
tests["run_seconds"] = pd.to_numeric(tests["Sum of TEST_TIME in SECONDS"], errors="coerce").fillna(0)

In [None]:
run_daily = (
    tests.groupby(["station_name", "day"], as_index=False)
         .agg(run_seconds=("run_seconds", "sum"),
              test_count=("run_seconds", "size"))
)

In [None]:
# Secondary prep (Service Tickets) merge key
svc["station_name"] = svc["TESTER"].astype(str).str.strip()
svc["LockDate"] = pd.to_datetime(svc["LockDate"], errors="coerce")
svc["UnlockDate"] = pd.to_datetime(svc["UnlockDate"], errors="coerce")

In [None]:
svc_valid = svc[
    pd.notna(svc["LockDate"]) & pd.notna(svc["UnlockDate"]) & (svc["UnlockDate"] > svc["LockDate"])
].copy()

In [None]:
rows = []
for _, r in svc_valid.iterrows():
    st, et, stn = r["LockDate"], r["UnlockDate"], r["station_name"]
    d = st.normalize()
    while d <= et.normalize():
        day_start = pd.Timestamp.combine(d.date(), datetime.min.time())
        day_end = day_start + timedelta(days=1)
        s = max(st, day_start)
        e = min(et, day_end)
        if e > s:
            rows.append({"station_name": stn, "day": d, "start": s, "end": e})
        d += timedelta(days=1)

In [None]:
svc_day = pd.DataFrame(rows)

In [None]:
def merge_overlapping(intervals):
    if not intervals:
        return []
    intervals = sorted(intervals, key=lambda t: t[0])
    merged = [intervals[0]]
    for s, e in intervals[1:]:
        ls, le = merged[-1]
        if s <= le:
            merged[-1] = (ls, max(le, e))
        else:
            merged.append((s, e))
    return merged

In [None]:
svc_daily_rows = []
for (stn, day), g in svc_day.groupby(["station_name", "day"]):
    merged_int = merge_overlapping(list(zip(g["start"], g["end"])))
    secs = sum((e - s).total_seconds() for s, e in merged_int)
    svc_daily_rows.append({"station_name": stn, "day": day, "service_seconds": secs})

In [None]:
svc_daily = pd.DataFrame(svc_daily_rows)

In [None]:
# Summary calc
summary = pd.merge(run_daily, svc_daily, on=["station_name", "day"], how="left")
summary["service_seconds"] = summary["service_seconds"].fillna(0.0)
summary["window_seconds"] = float(WINDOW_SECONDS)

In [None]:
summary["uptime_pct"] = (summary["run_seconds"] / summary["window_seconds"]).clip(0, 1)
summary["service_pct"] = (summary["service_seconds"] / summary["window_seconds"]).clip(lower=0)
summary["idle_seconds"] = (
    summary["window_seconds"] - summary["run_seconds"] - summary["service_seconds"]
).clip(lower=0)
summary["idle_pct"] = (summary["idle_seconds"] / summary["window_seconds"]).clip(0, 1)

In [None]:
def seconds_to_hhmmss(x):
    x = 0 if pd.isna(x) else float(x)
    s = int(round(max(0, x)))
    h, r = divmod(s, 3600)
    m, sec = divmod(r, 60)
    return f"{h}:{m:02d}:{sec:02d}"

In [None]:
summary["run_hhmmss"] = summary["run_seconds"].apply(seconds_to_hhmmss)
summary["service_hhmmss"] = summary["service_seconds"].apply(seconds_to_hhmmss)
summary["idle_hhmmss"] = summary["idle_seconds"].apply(seconds_to_hhmmss)

In [None]:
# Back extra columns
extra_cols = tests[["station_name", "day", "PN", "SN", "BIN_DESCRIPTION", "TEST_TYPE"]].drop_duplicates()
summary = pd.merge(summary, extra_cols, on=["station_name", "day"], how="left")

In [None]:
# Sort and save
summary.sort_values(["station_name", "day"], inplace=True)
summary.to_csv(OUTPUT_CSV, index=False)
print(f"[SUMMARY SAVED] {OUTPUT_CSV} rows={len(summary)}")
print(summary.head(10))