In [1]:
# %% ultra-clean A/B pipeline: prep -> run (subprocess) -> collect -> plot
from __future__ import annotations

import os, sys, re, json, time, subprocess, textwrap
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import requests
import pandas as pd
import matplotlib.pyplot as plt

# -----------------------
# Project paths
# -----------------------
PROJECT_ROOT = Path(os.getcwd()).parent.parent  # adjust if needed
DATA_DIR     = PROJECT_ROOT / "data" / "fuzzy"
RESULTS_DIR  = PROJECT_ROOT / "results" / "fuzzy"
for p in (DATA_DIR, RESULTS_DIR): p.mkdir(parents=True, exist_ok=True)

# -----------------------
# Step 1: Prep (URL + SF)
# -----------------------
def _filename_from_url(url: str) -> str:
    return Path(url.split("?")[0]).name

def _download(url: str, base_dir: Path) -> Path:
    filename = _filename_from_url(url)
    name_root = Path(filename).stem
    dst_dir = base_dir / name_root
    dst_dir.mkdir(parents=True, exist_ok=True)
    out = dst_dir / filename
    if out.exists():
        print(f"[download] Using cached: {out}")
        return out
    legacy = base_dir / filename
    if legacy.exists():
        out.write_bytes(legacy.read_bytes())
        print(f"[download] Moved legacy file -> {out}")
        return out
    print(f"[download] Fetch {url}")
    r = requests.get(url, timeout=60); r.raise_for_status()
    out.write_bytes(r.content)
    print(f"[download] Saved {out}")
    return out

def prepare_dataset(url: str, sf: int) -> Dict[str, Any]:
    """
    Returns:
      {
        'dataset_name', 'original',
        'floating_text', 'floating_parquet',
        'fixed_parquet', 'quant_mult'
      }
    Uses your repo scripts:
      - scripts.replicate_file.replicate_file
      - scripts.fixedpoint_normalize.normalize_file
    """
    if str(PROJECT_ROOT) not in sys.path:
        sys.path.insert(0, str(PROJECT_ROOT))

    original = _download(url, DATA_DIR)
    dataset_name = original.parent.name

    # 1) replicate floating text (SF-concatenated)
    from scripts.fuzzy.replicate_file import replicate_file
    sf = max(1, int(sf))
    floating_text = original.with_name(f"{original.stem}_SF{sf}_floating{original.suffix}")
    if floating_text.exists():
        print(f"[prep] Using existing: {floating_text.name}")
    else:
        replicate_file(str(original), sf, str(floating_text))
        print(f"[prep] Made: {floating_text.name}")

    # 2) make fixed parquet + quant_mult AND floating parquet
    from scripts.fuzzy.fixedpoint_normalize import normalize_file
    stem = floating_text.stem.replace("_floating", "")
    fixed_parquet    = floating_text.with_name(f"{stem}_fixed.parquet")
    quant_file       = floating_text.with_name(f"{stem}_quant_mult.txt")
    floating_parquet = floating_text.with_suffix(".parquet")

    if not fixed_parquet.exists() or not quant_file.exists() or not floating_parquet.exists():
        print("[prep] Running normalize_file to get parquet(s) & quant_mult …")
        _ = normalize_file(str(floating_text), write_fixed_text=False)

    if not fixed_parquet.exists():    raise FileNotFoundError(f"Missing: {fixed_parquet}")
    if not floating_parquet.exists(): raise FileNotFoundError(f"Missing: {floating_parquet}")
    if not quant_file.exists():       raise FileNotFoundError(f"Missing: {quant_file}")

    quant_mult = int(quant_file.read_text().strip())
    return {
        "dataset_name": dataset_name,
        "original": str(original),
        "floating_text": str(floating_text),
        "floating_parquet": str(floating_parquet),
        "fixed_parquet": str(fixed_parquet),
        "quant_mult": quant_mult,
    }

# -------------------------------------------------
# Step 2: Run miners in separate processes (clean)
# -------------------------------------------------
def _run_subprocess(args: List[str], log_path: Path, err_path: Path, cwd: Path, extra_env: Optional[Dict[str, str]]=None) -> int:
    log_path.parent.mkdir(parents=True, exist_ok=True)
    err_path.parent.mkdir(parents=True, exist_ok=True)

    env = os.environ.copy()
    env["PYTHONPATH"] = str(PROJECT_ROOT) + os.pathsep + env.get("PYTHONPATH", "")
    if extra_env:
        env.update({k: str(v) for k, v in extra_env.items()})
    print("[run] ", " ".join(args))
    with open(log_path, "w") as out, open(err_path, "w") as err:
        proc = subprocess.run(args, cwd=cwd, env=env, stdout=out, stderr=err, text=True)
    print(f"[run] exit={proc.returncode}  log={log_path}  err={err_path}")
    return proc.returncode

def run_cuffi_cli(
    fixed_parquet: str,
    quant_mult: int,
    sup_int: int,
    out_dir: Path,
    allocator="rmm_managed",
    gds="off",
    pinned=True,
    managed_prefetch=True
) -> Dict[str, Any]:
    """
    cuFFIMiner expects pre-scaled parquet (item, prob:uint32, txn_id).
    NOTE (your toggle): gds='off' => force cuFile => GDS ON. gds='on' => POSIX => GDS OFF.
    """
    out_dir.mkdir(parents=True, exist_ok=True)
    logs_dir = out_dir / "logs"
    patterns_path = out_dir / f"patterns_cuffi_{allocator}_{gds}_{'pin' if pinned else 'nopin'}_sup{sup_int}.txt"
    log_path      = logs_dir / f"cuffi_sup{sup_int}.out"
    err_path      = logs_dir / f"cuffi_sup{sup_int}.err"

    cmd = [
        sys.executable, "-m", "src.algorithms.fuzzy.cuFFIMiner",
        str(fixed_parquet),
        str(int(sup_int)),
        str(int(quant_mult)),
        "-o", str(patterns_path),
        "--allocator", allocator,
        "--gds", gds,
    ]
    if pinned:           cmd.append("--pinned")
    if managed_prefetch: cmd.append("--managed-prefetch")

    rc = _run_subprocess(cmd, log_path, err_path, cwd=PROJECT_ROOT)
    return {"rc": rc, "patterns": str(patterns_path), "stdout": str(log_path), "stderr": str(err_path)}

def run_naive_cli(
    floating_text_or_parquet: str,
    quant_mult: int,
    sup_int: int,
    out_dir: Path
) -> Dict[str, Any]:
    """
    naiveFFIMiner expects float min_support (we convert from quantized int).
    """
    out_dir.mkdir(parents=True, exist_ok=True)
    logs_dir = out_dir / "logs"
    sup_float = sup_int / max(1, int(quant_mult))
    patterns_path = out_dir / f"patterns_naive_sup{sup_int}.txt"
    log_path      = logs_dir / f"naive_sup{sup_int}.out"
    err_path      = logs_dir / f"naive_sup{sup_int}.err"

    cmd = [
        sys.executable, "-m", "src.algorithms.fuzzy.naiveFFIMiner",
        str(floating_text_or_parquet),
        f"{sup_float:.12g}",
        "-o", str(patterns_path),
    ]
    rc = _run_subprocess(cmd, log_path, err_path, cwd=PROJECT_ROOT)
    return {"rc": rc, "patterns": str(patterns_path), "stdout": str(log_path), "stderr": str(err_path)}

# ---------------------------------------
# Step 3: Parse logs -> metrics -> plots
# ---------------------------------------
_METRIC_PATTERNS = {
    "exec_time":          re.compile(r"Execution Time:\s*([0-9.]+)\s*seconds", re.I),
    "cpu_mem_mb":         re.compile(r"Peak CPU Memory Usage:\s*([0-9.]+)\s*MB", re.I),
    "gpu_mem_mb":         re.compile(r"Peak GPU \(driver\) Used:\s*([0-9.]+)\s*MB", re.I),
    "pool_used_mb":       re.compile(r"Peak Pool Used:\s*([0-9.]+)\s*MB", re.I),
    "pool_total_mb":      re.compile(r"Peak Pool Total:\s*([0-9.]+)\s*MB", re.I),
    "rmm_peak_mb":        re.compile(r"RMM Statistics Peak:\s*([0-9.]+)\s*MB", re.I),
    "patterns_found":     re.compile(r"Patterns Found:\s*([0-9]+)", re.I),
    # NEW: theory metrics printed by miners’ print_results()
    "theory_static_mb":   re.compile(r"Theoretical Static:\s*([0-9.,]+)\s*MB", re.I),
    "theory_peak_mb":     re.compile(r"Theoretical Peak:\s*([0-9.,]+)\s*MB", re.I),
}

def _parse_first_float_mb(text: str, rgx: re.Pattern) -> Optional[float]:
    m = rgx.search(text)
    if not m:
        return None
    # accept 1,234.56 formats
    return float(m.group(1).replace(",", ""))

def parse_metrics_from_log(log_path: Path) -> Dict[str, Optional[float]]:
    text = Path(log_path).read_text(errors="ignore")
    out: Dict[str, Optional[float]] = {}
    for k, rgx in _METRIC_PATTERNS.items():
        if k in ("theory_static_mb", "theory_peak_mb"):
            out[k] = _parse_first_float_mb(text, rgx)
        else:
            m = rgx.search(text)
            out[k] = float(m.group(1)) if m else None
    return out

def collect_results(dataset_name: str, sf: int, quant_mult: int, supports: List[int], ds_dir: Path) -> pd.DataFrame:
    logs_dir = ds_dir / "logs"
    rows: List[Dict[str, Any]] = []
    for sup in supports:
        cuffi_log = logs_dir / f"cuffi_sup{sup}.out"
        naive_log = logs_dir / f"naive_sup{sup}.out"

        # Backward-compat fallback if previous runs stored logs in ds_dir root
        if not cuffi_log.exists():
            alt = ds_dir / f"cuffi_sup{sup}.out"
            if alt.exists(): cuffi_log = alt
        if not naive_log.exists():
            alt = ds_dir / f"naive_sup{sup}.out"
            if alt.exists(): naive_log = alt

        if cuffi_log.exists():
            m = parse_metrics_from_log(cuffi_log)
            rows.append({
                "dataset": dataset_name, "sf": sf, "algorithm": "cuFFIMiner",
                "support_quant_int": sup, "quant_mult": quant_mult,
                **m
            })
        if naive_log.exists():
            m = parse_metrics_from_log(naive_log)
            rows.append({
                "dataset": dataset_name, "sf": sf, "algorithm": "naiveFFIMiner",
                "support_quant_int": sup, "quant_mult": quant_mult,
                **m
            })
    df = pd.DataFrame(rows)
    # Optional convenience bytes
    if "theory_peak_mb" in df.columns and df["theory_peak_mb"].notna().any():
        df["theory_peak_bytes"] = df["theory_peak_mb"] * (1024**2)
    if "gpu_mem_mb" in df.columns and df["gpu_mem_mb"].notna().any():
        df["gpu_mem_bytes"] = df["gpu_mem_mb"] * (1024**2)
    return df

# ---- plotting (PDF, LaTeX-friendly) ----
plt.rcParams.update({
    "pdf.fonttype": 42, "ps.fonttype": 42, "figure.dpi": 150,
    "font.size": 11, "axes.titlesize": 12, "axes.labelsize": 11, "legend.fontsize": 9,
})

_LABELS = {
    "exec_time": "Execution Time (s)",
    "cpu_mem_mb": "Peak CPU Memory (MB)",
    "gpu_mem_mb": "Peak GPU (driver) Used (MB)",
    "patterns_found": "Patterns Found",
    "theory_static_mb": "Theoretical Static Memory (MB)",
    "theory_peak_mb": "Theoretical Peak Memory (MB)",
}

def _plot_metric(df: pd.DataFrame, metric: str, out_dir: Path, dataset_name: str):
    if metric not in df.columns:
        print(f"[plot] Skip missing metric: {metric}")
        return
    # drop NaNs for this metric
    dfm = df.dropna(subset=[metric])
    if dfm.empty:
        print(f"[plot] No data for {metric}, skipping.")
        return
    fig, ax = plt.subplots(figsize=(5.0, 3.0))
    for algo, sub in dfm.groupby("algorithm", sort=False):
        sub = sub.sort_values("support_quant_int")
        ax.plot(sub["support_quant_int"].values, sub[metric].values, marker="o", label=algo)
    ax.set_xlabel("Support Threshold (quantized int)")
    ax.set_ylabel(_LABELS.get(metric, metric))
    ax.set_title(f"{dataset_name} — {_LABELS.get(metric, metric)}")
    ax.grid(alpha=0.25, linestyle=":")
    ax.legend(loc="best")
    fig.tight_layout()
    out_dir.mkdir(parents=True, exist_ok=True)
    pdf = out_dir / f"{dataset_name}_{metric}.pdf"
    fig.savefig(pdf, format="pdf"); plt.close(fig)
    print(f"[plot] wrote {pdf}")

def plot_all(metrics_df: pd.DataFrame, dataset_name: str, figs_dir: Path, metrics: Optional[List[str]]=None):
    # Default now uses theoretical peak memory instead of driver memory
    ms = metrics or ["exec_time", "cpu_mem_mb", "theory_peak_mb", "patterns_found"]
    for m in ms: _plot_metric(metrics_df, m, figs_dir, dataset_name)
    print("[plot] done.")

# ----------------------------------------
# Orchestrator (one-liner for your runs)
# ----------------------------------------
def run_pipeline(
    dataset_url: str,
    sf: int,
    supports_quant_int: List[int],
    *,
    # cuFFI toggles
    cuffi_allocator: str = "rmm_managed",
    cuffi_gds: str = "off",      # 'off' => cuFile => GDS ON ; 'on' => POSIX => GDS OFF
    cuffi_pinned: bool = False,
    cuffi_prefetch: bool = True,
    force: bool = False,
) -> pd.DataFrame:
    """
    1) Prep artifacts (URL + SF)
    2) Run cuFFIMiner (GDS+UVM) AND naiveFFIMiner (no-GDS + device-only) in subprocesses
    3) Parse logs -> CSV -> plots (uses *theoretical* peak memory)
    """
    prep = prepare_dataset(dataset_url, sf)
    dataset = prep["dataset_name"]; quant_mult = prep["quant_mult"]
    ds_dir = RESULTS_DIR / dataset / f"SF{sf}"
    logs_dir = ds_dir / "logs"; logs_dir.mkdir(parents=True, exist_ok=True)

    # Run all supports
    for sup in supports_quant_int:
        cuffi_out = logs_dir / f"cuffi_sup{sup}.out"
        naive_out = logs_dir / f"naive_sup{sup}.out"

        if (not cuffi_out.exists()) or force:
            run_cuffi_cli(
                fixed_parquet=prep["fixed_parquet"],
                quant_mult=quant_mult,
                sup_int=sup,
                out_dir=ds_dir,
                allocator=cuffi_allocator,
                gds=cuffi_gds,
                pinned=cuffi_pinned,
                managed_prefetch=cuffi_prefetch,
            )
        else:
            print(f"[skip] cuFFI sup={sup} (log exists, use force=True to re-run)")

        if (not naive_out.exists()) or force:
            run_naive_cli(
                floating_text_or_parquet=prep["floating_parquet"],
                quant_mult=quant_mult,
                sup_int=sup,
                out_dir=ds_dir,
            )
        else:
            print(f"[skip] naive sup={sup} (log exists, use force=True to re-run)")

    # Collect -> CSV
    df = collect_results(dataset, sf, quant_mult, supports_quant_int, ds_dir)
    metrics_csv = ds_dir / f"metrics_SF{sf}.csv"
    df.to_csv(metrics_csv, index=False)
    print(f"[metrics] saved {metrics_csv}")

    # Plot (theory peak memory by default)
    plot_all(df, dataset, ds_dir / "figures")
    return df


In [3]:
retail = "https://u-aizu.ac.jp/~udayrage/datasets/fuzzyDatabases/Fuzzy_retail.csv"
# retail_sup = [25_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000]
retail_sup = [25_000, 70_000, 100_000]


run_pipeline(retail, sf=100, supports_quant_int=retail_sup, force=True)

[download] Using cached: /export/home1/ltarun/cuda_pami/data/fuzzy/Fuzzy_retail/Fuzzy_retail.csv
[prep] Using existing: Fuzzy_retail_SF100_floating.csv
[run]  /export/home1/ltarun/miniforge3/envs/rapids-25.08/bin/python -m src.algorithms.fuzzy.cuFFIMiner /export/home1/ltarun/cuda_pami/data/fuzzy/Fuzzy_retail/Fuzzy_retail_SF100_fixed.parquet 25000 10 -o /export/home1/ltarun/cuda_pami/results/fuzzy/Fuzzy_retail/SF100/patterns_cuffi_rmm_managed_off_nopin_sup25000.txt --allocator rmm_managed --gds off --managed-prefetch
[run] exit=0  log=/export/home1/ltarun/cuda_pami/results/fuzzy/Fuzzy_retail/SF100/logs/cuffi_sup25000.out  err=/export/home1/ltarun/cuda_pami/results/fuzzy/Fuzzy_retail/SF100/logs/cuffi_sup25000.err
[run]  /export/home1/ltarun/miniforge3/envs/rapids-25.08/bin/python -m src.algorithms.fuzzy.naiveFFIMiner /export/home1/ltarun/cuda_pami/data/fuzzy/Fuzzy_retail/Fuzzy_retail_SF100_floating.parquet 2500 -o /export/home1/ltarun/cuda_pami/results/fuzzy/Fuzzy_retail/SF100/patterns_n

Unnamed: 0,dataset,sf,algorithm,support_quant_int,quant_mult,exec_time,cpu_mem_mb,gpu_mem_mb,pool_used_mb,pool_total_mb,rmm_peak_mb,patterns_found,theory_static_mb,theory_peak_mb,theory_peak_bytes,gpu_mem_bytes
0,Fuzzy_retail,100,cuFFIMiner,25000,10,22.4061,931.05,14048.56,0.0,0.0,10961.17,26746.0,7530.605,7874.331,8256835000.0,14730980000.0
1,Fuzzy_retail,100,naiveFFIMiner,25000,10,39.342,15618.12,40976.56,0.0,0.0,6799.45,26746.0,6253.23,6596.956,6917410000.0,42967040000.0
2,Fuzzy_retail,100,cuFFIMiner,70000,10,6.5886,928.52,11766.56,0.0,0.0,9036.76,6023.0,5699.432,5743.922,6022939000.0,12338130000.0
3,Fuzzy_retail,100,naiveFFIMiner,70000,10,22.4037,15443.61,41000.56,0.0,0.0,4762.63,6023.0,4691.932,4736.422,4966498000.0,42992200000.0
4,Fuzzy_retail,100,cuFFIMiner,100000,10,4.8357,936.66,11046.56,0.0,0.0,8317.5,3530.0,5014.862,5033.807,5278329000.0,11583160000.0
5,Fuzzy_retail,100,naiveFFIMiner,100000,10,20.1205,14443.91,40980.56,0.0,0.0,4161.71,3530.0,4131.612,4150.557,4352174000.0,42971230000.0


In [2]:
retail = "https://u-aizu.ac.jp/~udayrage/datasets/fuzzyDatabases/Fuzzy_retail.csv"
# retail_sup = [25_000, 50_000, 60_000, 70_000, 80_000, 90_000, 100_000]
retail_sup = [25_000, 70_000, 100_000]


run_pipeline(retail, sf=100, supports_quant_int=retail_sup, force=True)

[download] Using cached: /export/home1/ltarun/cuda_pami/data/fuzzy/Fuzzy_retail/Fuzzy_retail.csv
[prep] Using existing: Fuzzy_retail_SF100_floating.csv
[run]  /export/home1/ltarun/miniforge3/envs/rapids-25.08/bin/python -m src.algorithms.fuzzy.cuFFIMiner /export/home1/ltarun/cuda_pami/data/fuzzy/Fuzzy_retail/Fuzzy_retail_SF100_fixed.parquet 25000 10 -o /export/home1/ltarun/cuda_pami/results/fuzzy/Fuzzy_retail/SF100/patterns_cuffi_rmm_managed_off_nopin_sup25000.txt --allocator rmm_managed --gds off --managed-prefetch
[run] exit=0  log=/export/home1/ltarun/cuda_pami/results/fuzzy/Fuzzy_retail/SF100/logs/cuffi_sup25000.out  err=/export/home1/ltarun/cuda_pami/results/fuzzy/Fuzzy_retail/SF100/logs/cuffi_sup25000.err
[run]  /export/home1/ltarun/miniforge3/envs/rapids-25.08/bin/python -m src.algorithms.fuzzy.naiveFFIMiner /export/home1/ltarun/cuda_pami/data/fuzzy/Fuzzy_retail/Fuzzy_retail_SF100_floating.parquet 2500 -o /export/home1/ltarun/cuda_pami/results/fuzzy/Fuzzy_retail/SF100/patterns_n

Unnamed: 0,dataset,sf,algorithm,support_quant_int,quant_mult,exec_time,cpu_mem_mb,gpu_mem_mb,pool_used_mb,pool_total_mb,rmm_peak_mb,patterns_found,theory_static_mb,theory_peak_mb,theory_peak_bytes,gpu_mem_bytes
0,Fuzzy_retail,100,cuFFIMiner,25000,10,10.185,951.41,35170.56,0.0,0.0,10961.17,26746.0,7530.605,7874.331,8256835000.0,36879010000.0
1,Fuzzy_retail,100,naiveFFIMiner,25000,10,38.9993,15619.1,40996.56,0.0,0.0,6799.45,26746.0,6253.23,6596.956,6917410000.0,42988010000.0
2,Fuzzy_retail,100,cuFFIMiner,70000,10,5.8501,945.36,34590.56,0.0,0.0,9036.76,6023.0,5699.432,5743.922,6022939000.0,36270830000.0
3,Fuzzy_retail,100,naiveFFIMiner,70000,10,22.2785,15446.74,40984.56,0.0,0.0,4762.63,6023.0,4691.932,4736.422,4966498000.0,42975430000.0
4,Fuzzy_retail,100,cuFFIMiner,100000,10,4.7355,940.79,31878.56,0.0,0.0,8317.5,3530.0,5014.862,5033.807,5278329000.0,33427090000.0
5,Fuzzy_retail,100,naiveFFIMiner,100000,10,19.7776,14452.34,40976.56,0.0,0.0,4161.71,3530.0,4131.612,4150.557,4352174000.0,42967040000.0


In [None]:
kosarak = "https://u-aizu.ac.jp/~udayrage/datasets/fuzzyDatabases/Fuzzy_kosarak.csv"
kosarak_sup = [250_000, 300_000, 350_000, 400_000, 450_000, 500_000, 550_000, 600_000, 650_000, 700_000]

run_pipeline(kosarak, sf=100, supports_quant_int=kosarak_sup, force=True)


In [None]:
pumsb = "https://u-aizu.ac.jp/~udayrage/datasets/fuzzyDatabases/Fuzzy_pumsb.csv"
pumsb_sup = [20_000_000, 19_000_000, 18_000_000, 17_000_000,16_000_000]

run_pipeline(pumsb, sf=100, supports_quant_int=pumsb_sup, force=True)

In [None]:
pumsb = "https://u-aizu.ac.jp/~udayrage/datasets/fuzzyDatabases/Fuzzy_T10I4D100K.csv"
pumsb_sup = [550_000, 500_000, 450_000, 400_000, 350_000, 300_000, 200_000]

run_pipeline(pumsb, sf=100, supports_quant_int=pumsb_sup, force=True)