# 🔎 Loki (multi-tenant) → Parquet → Episodes

This notebook pulls logs from **application**, **infrastructure**, and **audit** tenants on an OpenShift Loki gateway,
normalizes them with a JSON-first projector, writes `data/unified_logs/latest.parquet` & CSV, and builds 10‑minute episodes.
It includes robust auth/SSL handling, nanosecond timestamp parsing, and tenant-specific helpers.


## 0) Setup

In [None]:
# If needed, install deps (uncomment once)
# %pip install --quiet pandas numpy requests pyarrow


In [None]:
from pathlib import Path
import os, pandas as pd, numpy as np

# --- Storage locations
DATA_DIR = Path("data"); DATA_DIR.mkdir(parents=True, exist_ok=True)
UNIFIED_DIR = DATA_DIR / "unified_logs"; UNIFIED_DIR.mkdir(exist_ok=True, parents=True)
INCIDENTS_DIR = Path("incidents"); INCIDENTS_DIR.mkdir(exist_ok=True, parents=True)
RULES_DIR = Path("rules"); RULES_DIR.mkdir(exist_ok=True, parents=True)

# --- Time window (adjust as needed)
END   = pd.Timestamp.utcnow()
START = END - pd.Timedelta("90min")
print("Window:", START, "→", END)


## 1) Loki helpers (tenant‑aware + token + SSL toggle)

In [None]:
# Cell 1.1 — config + session + diagnostics
import pandas as pd, requests, urllib3

# ---- Config (set env vars or edit here) ----
LOKI_BASE       = os.environ.get("LOKI_BASE", "https://logging-loki-openshift-logging.apps.example.com")
LOKI_TOKEN      = os.environ.get("LOKI_TOKEN")                   # e.g. export LOKI_TOKEN="$(oc whoami -t)"
LOKI_INSECURE   = os.environ.get("LOKI_INSECURE", "true").lower() in ("1","true","yes")
LOKI_ORG_ID     = os.environ.get("LOKI_ORG_ID")                  # some gateways require X-Scope-OrgID
LOKI_BASIC_USER = os.environ.get("LOKI_BASIC_USER")
LOKI_BASIC_PASS = os.environ.get("LOKI_BASIC_PASS")

if LOKI_INSECURE:
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

_session = requests.Session()
_default_headers = {"Accept": "application/json"}
if LOKI_TOKEN:
    _default_headers["Authorization"] = f"Bearer {LOKI_TOKEN}"
if LOKI_ORG_ID:
    _default_headers["X-Scope-OrgID"] = LOKI_ORG_ID

def _debug_response(resp):
    ct = resp.headers.get("Content-Type", "")
    preview = (resp.text or "")[:500]
    return f"HTTP {resp.status_code} CT={ct} URL={resp.url}\nBody (first 500):\n{preview}"


In [None]:
# Cell 1.2 — tenant ping / labels / query_range (nanosecond safe)
def loki_ping_tenant(tenant: str):
    url = f"{LOKI_BASE.rstrip('/')}/api/logs/v1/{tenant}/loki/api/v1/labels"
    r = _session.get(url, headers=_default_headers, timeout=30, verify=not LOKI_INSECURE, allow_redirects=False)
    if not r.ok or "application/json" not in r.headers.get("Content-Type","").lower():
        raise RuntimeError(f"Ping failed for tenant={tenant}:\n" + _debug_response(r))
    return r.json()

def loki_labels_tenant(tenant: str):
    url = f"{LOKI_BASE.rstrip('/')}/api/logs/v1/{tenant}/loki/api/v1/labels"
    r = _session.get(url, headers=_default_headers, timeout=30, verify=not LOKI_INSECURE, allow_redirects=False)
    r.raise_for_status()
    return r.json().get("data", [])

def loki_query_range_tenant(tenant: str, expr, start_ts, end_ts, step='15s', limit=5000, direction='forward'):
    url = f"{LOKI_BASE.rstrip('/')}/api/logs/v1/{tenant}/loki/api/v1/query_range"
    params = {
        "query": expr,
        "start": int(pd.Timestamp(start_ts).value),  # ns
        "end": int(pd.Timestamp(end_ts).value),
        "step": step,
        "limit": str(limit),
        "direction": direction,
    }
    auth = (LOKI_BASIC_USER, LOKI_BASIC_PASS) if (LOKI_BASIC_USER and LOKI_BASIC_PASS) else None
    r = _session.get(url, params=params, headers=_default_headers, timeout=60,
                     verify=not LOKI_INSECURE, auth=auth, allow_redirects=False)
    if r.is_redirect or r.status_code in (301,302,303,307,308):
        raise RuntimeError(f"Auth redirect for tenant={tenant}. Token/headers likely missing.\n" + _debug_response(r))
    if not r.ok:
        raise RuntimeError(f"Loki query_range failed for tenant={tenant}:\n" + _debug_response(r))
    if "application/json" not in r.headers.get("Content-Type","").lower():
        raise RuntimeError(f"Non-JSON response for tenant={tenant}:\n" + _debug_response(r))

    payload = r.json()
    data = payload.get("data", {}).get("result", [])

    def _parse_ns(ts_val):
        s = str(ts_val)
        try: ns = int(s)
        except ValueError: ns = int(float(s))
        return pd.to_datetime(ns, unit="ns", utc=True)

    rows = []
    for series in data:
        labels = series.get("metric", {})
        for ts, line in series.get("values", []):
            rows.append({"ts": _parse_ns(ts), "line": line, **labels})
    return pd.DataFrame(rows)

# Quick tenant sanity
for t in ["application","infrastructure","audit"]:
    try:
        info = loki_ping_tenant(t)
        print(f"tenant={t} OK, ~{len(info.get('data', []))} labels")
    except Exception as e:
        print(f"tenant={t} ping failed:", e)


## 2) Pull logs from all tenants

In [None]:
# Start wide ({}). Narrow with label filters once you confirm data exists.
LOGQL_ANY = r'{}'
END   = pd.Timestamp.utcnow()
START = END - pd.Timedelta("90min")

df_app   = loki_query_range_tenant("application",   LOGQL_ANY, START, END, step="30s", limit=5000)
df_infra = loki_query_range_tenant("infrastructure", LOGQL_ANY, START, END, step="30s", limit=5000)
df_audit = loki_query_range_tenant("audit",          LOGQL_ANY, START, END, step="30s", limit=5000)

print("Rows -> app/infra/audit:", len(df_app), len(df_infra), len(df_audit))
# Uncomment to preview columns:
# print("app columns:", list(df_app.columns))
# print("infra columns:", list(df_infra.columns))
# print("audit columns:", list(df_audit.columns))


## 3) JSON‑first projector (namespace/pod/node from labels or JSON body)

In [None]:
import json, re

def _maybe_json(s: str):
    if not isinstance(s, str): return None
    s = s.strip()
    if not s or s[0] not in "{[": return None
    try:
        return json.loads(s)
    except Exception:
        return None

def _get_any(obj, keys):
    for k in keys:
        cur = obj
        try:
            for part in k.split("."):
                if isinstance(cur, dict) and part in cur:
                    cur = cur[part]
                else:
                    raise KeyError
            return cur
        except Exception:
            continue
    return None

def _normalize_level(obj, line: str):
    if isinstance(obj, dict):
        v = _get_any(obj, ["level","severity","loglevel","lvl","logger_level"])
        if v is not None: return str(v).lower()
    s = (line or "").lower()
    if any(w in s for w in ["error","exception","fail","backoff","oomkilled","notready"]): return "error"
    if "warn" in s or "throttle" in s: return "warn"
    return "info"

def _extract_code(obj, line: str):
    if isinstance(obj, dict):
        v = _get_any(obj, ["status","status_code","code","http.status","response.status"])
        try:
            if v is not None: return int(v)
        except Exception:
            pass
    import re
    m = re.search(r"\s(1\d{2}|2\d{2}|3\d{2}|4\d{2}|5\d{2})\s", " " + (line or "") + " ")
    return int(m.group(1)) if m else None

def _extract_route(obj, line: str):
    if isinstance(obj, dict):
        v = _get_any(obj, ["path","route","url","request_path","http.path","request.url","endpoint"])
        if isinstance(v, str): return v.split("?")[0]
    import re
    m = re.search(r"\s(?:GET|POST|PUT|PATCH|DELETE)\s+(\S+)", " " + (line or "") + " ")
    return m.group(1) if m else None

def _label_or_json(series_or_none, objs, json_keys):
    if series_or_none is not None:
        return series_or_none
    vals = []
    for o in objs:
        v = _get_any(o, json_keys) if isinstance(o, dict) else None
        vals.append(v)
    import pandas as pd
    return pd.Series(vals)

def project_unified_stronger(df: pd.DataFrame, source_guess: str) -> pd.DataFrame:
    objs = df["line"].map(_maybe_json)

    # Prefer k8s_* label columns; else read from JSON
    ns_series   = df.get("k8s_namespace_name") or df.get("kubernetes_namespace_name") or df.get("namespace")
    pod_series  = df.get("k8s_pod_name")       or df.get("kubernetes_pod_name")       or df.get("pod")
    node_series = df.get("k8s_node_name")      or df.get("kubernetes_host")           or df.get("node")

    namespace = _label_or_json(ns_series,   objs, ["kubernetes.namespace_name","k8s.namespace.name","k8s.ns","namespace"])
    pod       = _label_or_json(pod_series,  objs, ["kubernetes.pod_name","k8s.pod.name","pod"])
    node      = _label_or_json(node_series, objs, ["kubernetes.host","kubernetes.node_name","k8s.node.name","node"])

    level = [_normalize_level(o, ln) for o, ln in zip(objs, df["line"])]
    code  = [_extract_code(o, ln)    for o, ln in zip(objs, df["line"])]
    route = [_extract_route(o, ln)   for o, ln in zip(objs, df["line"])]

    container_restart = df["line"].str.contains(r"\bRestarted container\b", case=False, na=False).astype(int)
    rollout_hit = df["line"].str.contains(
        r"Scaled up replica set|deployment (created|updated|rolled out)|\brollout\b",
        case=False, na=False, regex=True
    ).astype(float)

    return pd.DataFrame({
        "ts": df["ts"],
        "source": source_guess,
        "namespace": namespace,
        "pod": pod,
        "node": node,
        "level": level,
        "verb": None,
        "code": code,
        "route": route,
        "msg": df["line"].astype(str).str.slice(0, 400),
        "container_restart": container_restart,
        "rollout_in_window": rollout_hit,
    })


## 4) Concat, write `latest.parquet`, and show quick stats

In [None]:
from pathlib import Path

print("Sizes -> app/infra/audit:", len(df_app), len(df_infra), len(df_audit))

parts = []
if not df_app.empty:   parts.append(project_unified_stronger(df_app,   "app"))
if not df_infra.empty: parts.append(project_unified_stronger(df_infra, "infra"))
if not df_audit.empty: parts.append(project_unified_stronger(df_audit, "audit"))

if parts:
    unified = pd.concat(parts, ignore_index=True)
else:
    unified = pd.DataFrame(columns=[
        "ts","source","namespace","pod","node","level","verb","code","route","msg",
        "container_restart","rollout_in_window"
    ])

# Dtypes & cleanup
unified["ts"] = pd.to_datetime(unified["ts"], utc=True, errors="coerce")
unified = unified.dropna(subset=["ts"]).sort_values("ts").reset_index(drop=True)
unified["container_restart"] = pd.to_numeric(unified["container_restart"], errors="coerce").fillna(0).astype("int64")
unified["code"] = pd.to_numeric(unified["code"], errors="coerce")

# Write
unified_path = UNIFIED_DIR / "latest.parquet"
unified.to_parquet(unified_path, index=False)
unified_csv = UNIFIED_DIR / "latest.csv"
unified.to_csv(unified_csv, index=False)

print("Unified rows:", len(unified))
print("Nulls by column:\n", unified.isna().mean().round(3))
print("Level distribution:\n", unified["level"].value_counts(dropna=False).head(10))
print("HTTP status sample:\n", unified["code"].dropna().astype(int).value_counts().head(10))
unified.head(8)


## 5) Build 10‑minute episodes (namespace/pod/node groups)

In [None]:
def build_episodes(df: pd.DataFrame, window="10min", keys=("namespace","pod","node")):
    df = df.copy()
    df["ts"] = pd.to_datetime(df["ts"], utc=True)
    df.set_index("ts", inplace=True)
    episodes = []
    for wstart, wdf in df.groupby(pd.Grouper(freq=window)):
        if wdf.empty: continue
        wend = wstart + pd.to_timedelta(window)
        grp_cols = [k for k in keys if k in wdf.columns]
        groups = dict(tuple(wdf.groupby(grp_cols, dropna=False))) if grp_cols else {"_": wdf}
        for gkey, gdf in groups.items():
            total = len(gdf)
            errors = (gdf["level"]=="error").sum()
            err_ratio = (errors/total) if total else 0.0
            restarts = gdf.get("container_restart", pd.Series([0]*total, index=gdf.index)).sum()
            http5xx = (gdf.get("code", pd.Series(dtype=float))>=500).sum()
            rollout = 1.0 if (gdf.get("rollout_in_window", pd.Series(dtype=float))>0).any() else 0.0
            entities = {}
            for col in ["namespace","pod","node"]:
                vals = [v for v in gdf[col].astype(str).dropna().unique().tolist() if v and v!="None"]
                if vals: entities[col] = vals
            episodes.append({
                "episode_id": f"{int(wstart.value)}::{hash(str(gkey)) & 0xfffffff:07x}",
                "start": wstart, "end": wend,
                "entities": entities,
                "features": {"count": float(total), "error_ratio": float(err_ratio), "restarts": float(restarts), "http5xx": float(http5xx), "rollout_in_window": rollout},
            })
    return episodes

eps = build_episodes(unified, window="10min")
print("Episodes:", len(eps))
import pandas as pd
epi_dbg = pd.DataFrame([
    {"id": e["episode_id"], **e["features"], 
     "ent_namespace": ",".join(e["entities"].get("namespace", [])),
     "ent_pod": ",".join(e["entities"].get("pod", [])),
     "ent_node": ",".join(e["entities"].get("node", [])),}
    for e in eps
]).sort_values("id")
epi_dbg.head(12)
