### DHL qualitative evaluation

In [None]:
# Imports
import re
from pathlib import Path

import numpy as np
import pandas as pd

In [None]:
# 1) Input file
csv_path = Path(r"5_analysis\random\DHL\dhl_features\k3\combined_sorted.csv")

# 2) Load data
df = pd.read_csv(csv_path)

# 3) Apply rule-quality filter: keep rules with LB OR > 1 and Conviction > 1
filtered = df[(df["LB odds ratio"] > 1) & (df["Conviction"] > 1)].copy()

print(f"Loaded {len(df)} rules, kept {len(filtered)} after LB OR > 1 and Conviction > 1 filter.")
filtered.head()


### Ranking per encoding

In [None]:
# Use the filtered DataFrame from the previous cell if available; otherwise fall back to df
_base = globals().get("filtered", globals().get("df"))
if _base is None:
    raise RuntimeError("No DataFrame named 'filtered' or 'df' found. Run the previous cell first.")
data = _base.copy()

# 1) Configuration
ENC_COL = "Feature Encoding"
METRICS = ["LB odds ratio", "Support LHS", "Confidence", "Lift", "Conviction"]
TOP_K = 2  # number of top rules to keep per encoding
TIE_BREAKERS = ["Support LHS", "LB odds ratio"]  # used to resolve ties (higher is better)

# 2) Sanity checks
missing = [c for c in [ENC_COL] + METRICS if c not in data.columns]
if missing:
    raise ValueError(f"Missing expected columns: {missing}")

def _rank_within_group(g: pd.DataFrame, col: str) -> pd.Series:
    """
    Rank a single metric within each encoding group (higher = better).
    NaNs receive the lowest rank (group_size + 1).
    """
    r = g[col].rank(method="dense", ascending=False)
    return r.fillna(len(g) + 1)

# 3) Rank each metric within its encoding group
rank_cols = []
for m in METRICS:
    rcol = f"rank::{m}"
    data[rcol] = data.groupby(ENC_COL, group_keys=False).apply(_rank_within_group, col=m)
    rank_cols.append(rcol)

# 4) Aggregate ranks (lower = better)
data["rank_agg"] = data[rank_cols].mean(axis=1)

# 5) Sort primarily by aggregate rank, then by tie-breakers
sort_cols = ["rank_agg"] + [c for c in TIE_BREAKERS if c in data.columns]
ascending = [True] + [False] * (len(sort_cols) - 1)

scored = data.sort_values(sort_cols, ascending=ascending)

# 6) Select top-K rules per encoding
topk_per_encoding = (
    scored.groupby(ENC_COL, group_keys=False)
          .head(TOP_K)
          .reset_index(drop=True)
)

print(f"Selected top-{TOP_K} rules per encoding using rank aggregation over: {', '.join(METRICS)}")
topk_per_encoding

### Ranking over all encodings

In [None]:
# Prefer the previously filtered DataFrame; otherwise use the raw 'df'
_base = globals().get("filtered", globals().get("df"))
if _base is None:
    raise RuntimeError("No DataFrame named 'filtered' or 'df' found. Run the previous cell first.")
data = _base.copy()

# Configuration
METRICS = ["Odds ratio", "Support LHS", "Confidence", "Lift", "Conviction"]
TOP_K = 5  # number of top rules to return overall
TIE_BREAKERS = ["Support LHS", "Odds ratio"]  # applied when aggregate ranks tie
ENC_COL = "Feature Encoding"
RULE_COL = "Rule"

# Validate inputs
missing = [c for c in METRICS if c not in data.columns]
if missing:
    raise ValueError(f"Missing expected metric columns: {missing}")

# 1) Rank each metric globally (higher value → better rank)
rank_cols = []
for m in METRICS:
    rcol = f"rank::{m}"
    data[rcol] = data[m].rank(method="dense", ascending=False)
    rank_cols.append(rcol)

# 2) Aggregate ranks (lower is better because rank 1 is best)
data["rank_agg"] = data[rank_cols].mean(axis=1)

# 3) Sort by aggregate rank, then apply tie-breakers
sort_cols = ["rank_agg"] + [c for c in TIE_BREAKERS if c in data.columns]
ascending = [True] + [False] * (len(sort_cols) - 1)
scored_overall = data.sort_values(sort_cols, ascending=ascending).reset_index(drop=True)

# 4) Select top-K overall
topk_overall = scored_overall.head(TOP_K).copy()
print(f"Selected top-{TOP_K} rules overall using rank aggregation over: {', '.join(METRICS)}")

# 5) Arrange columns for display
display_cols = []
if RULE_COL in topk_overall.columns:
    display_cols.append(RULE_COL)
if ENC_COL in topk_overall.columns:
    display_cols.append(ENC_COL)
display_cols += [c for c in METRICS if c in topk_overall.columns] + ["rank_agg"]

topk_overall.loc[:, display_cols]


### Global but then only positive deviance

In [None]:
# Prefer the previously filtered DataFrame; otherwise use 'df'
_base = globals().get("filtered", globals().get("df"))
if _base is None:
    raise RuntimeError("No DataFrame named 'filtered' or 'df' found. Run the previous cell first.")
data = _base.copy()

# 0) Keep only rules with RHS 'Label' (exclude '!Label')
# Matches patterns ending with "... --> Label" (trailing spaces allowed)
rhs_label_mask = data["Rule"].astype(str).str.contains(r"-->\s*Label\s*$", regex=True, na=False)
data = data[rhs_label_mask].copy()

# Configuration
METRICS = ["Odds ratio", "Support LHS", "Confidence", "Lift", "Conviction"]
TOP_K = 50  # number of top rules to return overall
TIE_BREAKERS = ["Support LHS", "Odds ratio"]  # applied when aggregate ranks tie
ENC_COL = "Feature Encoding"
RULE_COL = "Rule"

# Validate inputs
missing = [c for c in METRICS if c not in data.columns]
if missing:
    raise ValueError(f"Missing expected metric columns: {missing}")

# 1) Rank each metric globally (higher value → better rank)
rank_cols = []
for m in METRICS:
    rcol = f"rank::{m}"
    data[rcol] = data[m].rank(method="dense", ascending=False)
    rank_cols.append(rcol)

# 2) Aggregate ranks (lower is better; rank 1 is best)
data["rank_agg"] = data[rank_cols].mean(axis=1)

# 3) Sort by aggregate rank and apply tie-breakers
sort_cols = ["rank_agg"] + [c for c in TIE_BREAKERS if c in data.columns]
ascending = [True] + [False] * (len(sort_cols) - 1)
scored_overall = data.sort_values(sort_cols, ascending=ascending).reset_index(drop=True)

# 4) Select top-K overall
topk_overall = scored_overall.head(TOP_K).copy()
print(
    f"Selected top-{TOP_K} rules overall with RHS == 'Label', "
    f"using rank aggregation over: {', '.join(METRICS)} "
    f"(kept {len(data)} rules after RHS filter)."
)

# 5) Arrange columns for display
display_cols = []
if RULE_COL in topk_overall.columns:
    display_cols.append(RULE_COL)
if ENC_COL in topk_overall.columns:
    display_cols.append(ENC_COL)
display_cols += [c for c in METRICS if c in topk_overall.columns] + ["rank_agg"]

topk_overall.loc[:, display_cols]


In [None]:
# 1) Output paths
OUT_DIR = Path("5_analysis")
OUT_NAME = "top_deviant_rules_dhl.tex"
OUT_DIR.mkdir(parents=True, exist_ok=True)
OUT_PATH = OUT_DIR / OUT_NAME

# 2) Select columns to export
wanted_cols = [
    "Rule",
    "Feature Encoding",
    "Odds ratio",
    "Support LHS",
    "Confidence",
    "Lift",
    "Conviction",
]
available_cols = [c for c in wanted_cols if c in topk_overall.columns]
df_export = topk_overall[available_cols].copy()

# 3) LaTeX-safe wrapper for text columns
def detok(series: pd.Series) -> pd.Series:
    return (
        series.astype(str)
        .fillna("")
        .map(lambda x: rf"\detokenize{{{x}}}" if x != "" else "")
    )

# 4) Apply \detokenize to textual fields
for col in ["Rule", "Feature Encoding"]:
    if col in df_export.columns:
        df_export[col] = detok(df_export[col])

# 5) Render LaTeX (keep \detokenize unescaped)
latex_str = df_export.to_latex(
    index=False,
    escape=False,
    longtable=False,
    float_format="%.3f",
)

# 6) Write file
OUT_PATH.write_text(latex_str, encoding="utf-8")
print(f"Wrote LaTeX table to: {OUT_PATH}")