In [8]:
from __future__ import annotations

import argparse
import json
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

from load_data import (
    PdMConfig, 
    load_pdm_raw, 
    build_hourly_panel, 
    infer_data_types, 
    scale_continuous_columns, 
    panel_to_machine_datasets
)
from pcmci_mixed import pcmciplus_mixed
from graph_io import (
    aggregate_graphs, 
    generalgraph_to_dict, 
    save_graph_json,
    aggregate_info_asdict,
)

from nbcb_w import NBCBw
from nbcb_e import NBCBe
from cbnb_w import CBNBw
from cbnb_e import CBNBe

from causallearn.graph.Edge import Edge
from causallearn.graph.Endpoint import Endpoint
from causallearn.graph.GeneralGraph import GeneralGraph
from causallearn.graph.GraphNode import GraphNode

from sklearn.linear_model import LassoCV

In [9]:
TELEMETRY_COLS = ["volt", "rotate", "pressure", "vibration"]
HYBRID_ALGORITHMS_AVAILABLE = True

In [None]:
def granger_lasso_graph(
    data: pd.DataFrame, 
    *, 
    tau_max: int = 5, 
    sig_level: float = 0.05, 
    cv: int = 5
) -> GeneralGraph:
    """
    Re-implements algorithms.py::granger_lasso but without extra dependencies.
    (GCMVL in the IT paper codebase.)

    Important:
    - Uses LassoCV coefficients; then thresholds by |coef| > sig_level (as in the provided code).
    - Collapses all lags into a single summary graph.
    
    Parameters
    ----------
    data : pd.DataFrame
        Time series data (T x N)
    tau_max : int
        Maximum lag to consider (must be >= 1)
    sig_level : float
        Coefficient threshold for edge inclusion
    cv : int
        Cross-validation folds for LassoCV
        
    Returns
    -------
    GeneralGraph
        Summary causal graph
    """
    if tau_max < 1:
        raise ValueError("tau_max must be >= 1 for granger_lasso_graph")

    data = data.copy()
    data = data.replace([np.inf, -np.inf], np.nan).fillna(0.0)

    n, dim = data.shape
    if n <= tau_max + 1:
        raise ValueError(f"Not enough samples (n={n}) for tau_max={tau_max}")

    # Stack lagged features: [X_{t-1}, X_{t-2}, ..., X_{t-tau_max}]
    Y = data.values[tau_max:]
    X = np.hstack([data.values[tau_max - k:-k] for k in range(1, tau_max + 1)])

    lasso_cv = LassoCV(cv=cv)
    coeff = np.zeros((dim, dim * tau_max))
    for i in range(dim):
        lasso_cv.fit(X, Y[:, i])
        coeff[i] = lasso_cv.coef_

    names = list(data.columns)
    # dataset[target][source] = 2 means source -> target (same convention as algorithms.py)
    dataset = pd.DataFrame(np.zeros((dim, dim), dtype=int), columns=names, index=names)

    for i in range(dim):          # target index
        for lag in range(tau_max):  # lag index
            for j in range(dim):  # source index
                if abs(coeff[i, j + lag * dim]) > sig_level:
                    dataset.loc[names[j], names[i]] = 2

    nodes = [GraphNode(n) for n in names]
    node_map = {n.get_name(): n for n in nodes}
    g = GeneralGraph(nodes)

    for src in names:
        for dst in names:
            if src == dst:
                continue
            src_to_dst = dataset.loc[src, dst] == 2
            dst_to_src = dataset.loc[dst, src] == 2
            if src_to_dst and dst_to_src:
                # Bidirectional 
                g.add_edge(Edge(node_map[src], node_map[dst], Endpoint.ARROW, Endpoint.ARROW))
            elif src_to_dst:
                g.add_edge(Edge(node_map[src], node_map[dst], Endpoint.TAIL, Endpoint.ARROW))

    return g

In [11]:
def varlingam_graph(
    data: pd.DataFrame, 
    *, 
    tau_max: int = 1, 
    min_abs_effect: float = 0.0
) -> GeneralGraph:
    """
    VARLiNGAM summary graph.

    Notes:
    - Requires `lingam` (PyPI) to be installed.
    - We add a directed edge src -> dst if ANY lagged adjacency matrix has a nonzero (or >min_abs_effect) effect.
    
    Parameters
    ----------
    data : pd.DataFrame
        Time series data (T x N)
    tau_max : int
        Number of lags for VAR model
    min_abs_effect : float
        Minimum absolute effect size for edge inclusion
        
    Returns
    -------
    GeneralGraph
        Summary causal graph
    """
    try:
        from lingam import VARLiNGAM
    except ImportError as e:
        raise ImportError(
            "VARLiNGAM requires the `lingam` package. Install with: pip install lingam"
        ) from e

    data = data.copy()
    data = data.replace([np.inf, -np.inf], np.nan).fillna(0.0)

    model = VARLiNGAM(lags=tau_max, criterion="bic", prune=True)
    model.fit(data)

    # In VARLiNGAM, adjacency matrices are usually list/array of shape (lags, dim, dim)
    mats = getattr(model, "_adjacency_matrices", None)
    if mats is None:
        raise RuntimeError("Could not access VARLiNGAM adjacency matrices (model._adjacency_matrices).")

    mats_arr = np.asarray(mats)
    if mats_arr.ndim == 2:
        # (dim, dim) single matrix
        mats_arr = mats_arr[None, :, :]

    names = list(data.columns)
    node_map = {n: GraphNode(n) for n in names}
    g = GeneralGraph(list(node_map.values()))

    # Track edges to avoid duplicates
    added_edges = set()
    
    # Convention: mats[l][i,j] is effect from j -> i (source j causes target i)
    for B in mats_arr:
        dim = B.shape[0]
        for i in range(dim):      # target
            for j in range(dim):  # source
                if i == j:
                    continue
                if abs(B[i, j]) > min_abs_effect:
                    edge_key = (names[j], names[i])
                    if edge_key not in added_edges:
                        g.add_edge(Edge(node_map[names[j]], node_map[names[i]], Endpoint.TAIL, Endpoint.ARROW))
                        added_edges.add(edge_key)

    return g

In [None]:
def _select_machines_by_model(
    machines: pd.DataFrame, 
    *, 
    n: int, 
    seed: int
) -> Dict[str, List[int]]:
    """
    Select n machines per model type.
    
    Parameters
    ----------
    machines : pd.DataFrame
        PdM_machines.csv as a DataFrame with columns 'machineID' and 'model'
    n : int
        Number of machines to select per model
    seed : int
        Random seed for reproducibility
        
    Returns
    -------
    Dict[str, List[int]]
        Dictionary mapping model type to list of machine IDs
    """
    if "model" not in machines.columns or "machineID" not in machines.columns:
        raise ValueError("machines dataframe must include columns: 'model', 'machineID'")

    rng = np.random.default_rng(seed)
    out: Dict[str, List[int]] = {}
    for model in sorted(machines["model"].unique().tolist()):
        ids = machines.loc[machines["model"] == model, "machineID"].astype(int).sort_values().tolist()
        if len(ids) <= n:
            out[model] = ids
        else:
            out[model] = rng.choice(ids, size=n, replace=False).astype(int).tolist()
    return out


def _prep_df(df: pd.DataFrame, *, max_rows: Optional[int] = None) -> pd.DataFrame:
    """Prepare dataframe: handle inf/nan, optionally truncate, drop constant columns."""
    df = df.sort_index().copy()
    df = df.replace([np.inf, -np.inf], np.nan).fillna(0.0)
    if max_rows is not None and len(df) > max_rows:
        df = df.iloc[:max_rows].copy()
        
    # Drop constant columns (some algorithms don't like them)
    nunique = df.nunique(dropna=False)
    keep = nunique[nunique > 1].index.tolist()
    return df[keep]

In [None]:
def _run_per_machine(
    algo_name: str,
    algo_fn,
    datasets: Dict[int, pd.DataFrame],
    *,
    min_support: int = 1,
) -> Tuple[Dict[int, Any], Any, Dict[str, Any]]:
    """
    Run a (DataFrame -> GeneralGraph) algorithm on each machine, then aggregate by edge-support.
    
    Returns
    -------
    Tuple[Dict[int, Any], Any, Dict[str, Any]]
        (per_machine_graphdict, aggregate_graphdict, aggregate_meta)
    """
    per_machine: Dict[int, Any] = {}
    graphs = []
    errors: Dict[int, str] = {}
    
    for mid, df in datasets.items():
        t0 = time.time()
        try:
            g = algo_fn(df)
            per_machine[mid] = generalgraph_to_dict(g, metadata={"runtime_s": time.time() - t0})
            graphs.append(g)
        except Exception as e:
            errors[mid] = f"{type(e).__name__}: {e}"
            per_machine[mid] = None

    agg_graph = None
    agg_meta: Dict[str, Any] = {"min_support": min_support, "errors": errors}
    
    if len(graphs) > 0:
        g_agg, info = aggregate_graphs(graphs, min_support=min_support)
        agg_meta["n_graphs_ok"] = info.n_graphs
        agg_meta["edge_support"] = {str(k): int(v) for k, v in info.edge_support.items()}
        agg_graph = generalgraph_to_dict(g_agg, metadata={"aggregation": aggregate_info_asdict(info)})
    else:
        agg_meta["n_graphs_ok"] = 0

    return per_machine, agg_graph, agg_meta


def _write_result(path: Path, result: Dict[str, Any]) -> None:
    """Write result dictionary to JSON file."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(result, indent=2))

In [None]:
def run_one_model(
    *,
    data_dir: Path,
    out_dir: Path,
    model: str,
    machine_ids: List[int],
    tau_max: int,
    sig_level: float,
    max_rows_per_machine: Optional[int],
    min_support: int,
) -> str:
    """
    Worker for one model type. Returns model name (for logging).
    """
    raw = load_pdm_raw(data_dir, PdMConfig())
    
    panel = build_hourly_panel(
        raw,
        machine_ids=machine_ids,
        include_model=True,
        include_age=True,
        one_hot_model=True,
        drop_constant_cols=True,
    )

    col_types = infer_data_types(panel)
    panel_scaled, scaler = scale_continuous_columns(panel, col_types)

    # split into per-machine datasets and prep
    datasets_all = {
        mid: _prep_df(df, max_rows=max_rows_per_machine) 
        for mid, df in panel_to_machine_datasets(panel_scaled, machine_ids).items()
    }
    datasets_tel = {
        mid: _prep_df(df[[c for c in TELEMETRY_COLS if c in df.columns]], max_rows=max_rows_per_machine)
        for mid, df in datasets_all.items()
    }

    model_dir = out_dir / f"model={model}"
    model_dir.mkdir(parents=True, exist_ok=True)

    # save a small metadata file for reproducibility
    meta = {
        "model": model,
        "machine_ids": machine_ids,
        "tau_max": tau_max,
        "sig_level": sig_level,
        "max_rows_per_machine": max_rows_per_machine,
        "n_vars_mixed": len(set().union(*[set(df.columns) for df in datasets_all.values()])) if datasets_all else 0,
        "n_vars_telemetry": len(set().union(*[set(df.columns) for df in datasets_tel.values()])) if datasets_tel else 0,
        "telemetry_cols_present": TELEMETRY_COLS,
        "col_types": {k: int(v) for k, v in col_types.items()},
    }
    (model_dir / "meta.json").write_text(json.dumps(meta, indent=2))

    pcmci_per: Dict[int, Any] = {}
    pcmci_graphs = []
    pcmci_errors: Dict[int, str] = {}
    
    for mid, df_raw in panel_to_machine_datasets(panel_scaled, machine_ids).items():
        df = _prep_df(df_raw, max_rows=max_rows_per_machine)
        if df.shape[1] < 2:
            pcmci_per[int(mid)] = None
            pcmci_errors[int(mid)] = 'Too few variables after preprocessing'
            continue
        ct = col_types.loc[df.columns]
        try:
            t0 = time.time()
            g = pcmciplus_mixed(df, tau_max=tau_max, sig_level=sig_level, col_types=ct, verbosity=0)
            pcmci_per[int(mid)] = generalgraph_to_dict(g, metadata={'runtime_s': time.time() - t0})
            pcmci_graphs.append(g)
        except Exception as e:
            pcmci_per[int(mid)] = None
            pcmci_errors[int(mid)] = f"{type(e).__name__}: {e}"

    pcmci_agg = None
    pcmci_agg_meta: Dict[str, Any] = {'min_support': min_support, 'errors': pcmci_errors}
    
    if len(pcmci_graphs) > 0:
        g_agg, info = aggregate_graphs(pcmci_graphs, min_support=min_support)
        pcmci_agg = generalgraph_to_dict(g_agg, metadata={'aggregation': aggregate_info_asdict(info)})
        pcmci_agg_meta['n_graphs_ok'] = info.n_graphs
    else:
        pcmci_agg_meta['n_graphs_ok'] = 0

    _write_result(
        model_dir / 'pcmci_mixed.json',
        {
            'algorithm': 'pcmci_plus_mixed',
            'feature_set': 'mixed',
            'model': model,
            'machine_ids': machine_ids,
            'params': {'tau_max': tau_max, 'pc_alpha': sig_level},
            'per_machine': pcmci_per,
            'aggregate': pcmci_agg,
            'aggregate_meta': pcmci_agg_meta,
        },
    )

    # ---- VARLiNGAM (per machine)
    # Fixed: Capture tau_max in lambda default argument to avoid late binding issues
    for feature_name, datasets in [("mixed", datasets_all), ("telemetry", datasets_tel)]:
        per_machine, agg_graph, agg_meta = _run_per_machine(
            "varlingam",
            lambda df, _tau=tau_max: varlingam_graph(df, tau_max=_tau, min_abs_effect=0.0),
            datasets,
            min_support=min_support,
        )
        _write_result(
            model_dir / f"varlingam_{feature_name}.json",
            {
                "algorithm": "varlingam",
                "feature_set": feature_name,
                "model": model,
                "machine_ids": machine_ids,
                "params": {"tau_max": tau_max},
                "per_machine": per_machine,
                "aggregate": agg_graph,
                "aggregate_meta": agg_meta,
            },
        )

    # ---- GCMVL / Granger-Lasso (per machine)
    for feature_name, datasets in [("mixed", datasets_all), ("telemetry", datasets_tel)]:
        per_machine, agg_graph, agg_meta = _run_per_machine(
            "gcmvl",
            lambda df, _tau=tau_max, _sig=sig_level: granger_lasso_graph(df, tau_max=_tau, sig_level=_sig, cv=5),
            datasets,
            min_support=min_support,
        )
        _write_result(
            model_dir / f"gcmvl_{feature_name}.json",
            {
                "algorithm": "gcmvl",
                "feature_set": feature_name,
                "model": model,
                "machine_ids": machine_ids,
                "params": {"tau_max": tau_max, "sig_level": sig_level, "cv": 5},
                "per_machine": per_machine,
                "aggregate": agg_graph,
                "aggregate_meta": agg_meta,
            },
        )

    # ---- NBCB / CBNB (per machine)
    if HYBRID_ALGORITHMS_AVAILABLE:
        def _make_nbcbw(df, _tau=tau_max, _sig=sig_level): 
            m = NBCBw(df, _tau, _sig, linear=True, model="linear", indtest="linear", cond_indtest="linear")
            m.run()
            return m.causal_graph

        def _make_nbcbe(df, _tau=tau_max, _sig=sig_level): 
            m = NBCBe(df, _tau, _sig, linear=True, model="linear", indtest="linear", cond_indtest="linear")
            m.run()
            return m.causal_graph

        def _make_cbnbw(df, _tau=tau_max, _sig=sig_level): 
            m = CBNBw(df, _tau, _sig, linear=True, model="linear", indtest="linear", cond_indtest="linear")
            m.run()
            return m.causal_graph

        def _make_cbnbe(df, _tau=tau_max, _sig=sig_level): 
            m = CBNBe(df, _tau, _sig, linear=True, model="linear", indtest="linear", cond_indtest="linear")
            m.run()
            return m.causal_graph

        algos = [
            ("nbcb_w", _make_nbcbw),
            ("nbcb_e", _make_nbcbe),
            ("cbnb_w", _make_cbnbw),
            ("cbnb_e", _make_cbnbe),
        ]

        for algo_name, algo_fn in algos:
            for feature_name, datasets in [("mixed", datasets_all), ("telemetry", datasets_tel)]:
                per_machine, agg_graph, agg_meta = _run_per_machine(
                    algo_name,
                    algo_fn,
                    datasets,
                    min_support=min_support,
                )
                _write_result(
                    model_dir / f"{algo_name}_{feature_name}.json",
                    {
                        "algorithm": algo_name,
                        "feature_set": feature_name,
                        "model": model,
                        "machine_ids": machine_ids,
                        "params": {"tau_max": tau_max, "sig_level": sig_level},
                        "per_machine": per_machine,
                        "aggregate": agg_graph,
                        "aggregate_meta": agg_meta,
                    },
                )
    else:
        print(f"[WARN] Skipping NBCB/CBNB algorithms - modules not found")

    return model

In [None]:
data_dir = Path("./data")   # Folder containing PdM_*.csv files
out_dir = Path("./outputs") # Output folder
seed = 1337                 # seed
n_machines = 20             # Machines per model type
models = ""                 # Comma-separated subset of model types (e.g., 'model1,model2'). Empty=all.
tau_max = 12                # max time lag
sig_level = 0.05            # threshold
max_rows_per_machine = 0    # If >0 truncate each machine time series to this many rows
min_support = 8             # Aggregation threshold for per-machine runs
n_jobs = 4                  # Parallel workers over model types

out_dir.mkdir(parents=True, exist_ok=True)

# Selection computed once
raw = load_pdm_raw(data_dir, PdMConfig())
selection = _select_machines_by_model(raw["machines"], n=n_machines, seed=seed)

# Optionally filter models
if models.strip():
    keep = [m.strip() for m in models.split(",") if m.strip()]
    selection = {m: selection[m] for m in keep if m in selection}

(out_dir / "machine_selection.json").write_text(json.dumps(selection, indent=2))

max_rows = max_rows_per_machine if max_rows_per_machine > 0 else None

jobs = [(m, ids) for m, ids in selection.items()]

if n_jobs <= 1:
    for m, ids in jobs:
        print(f"[RUN] model={m} machines={len(ids)}")
        run_one_model(
            data_dir=data_dir,
            out_dir=out_dir,
            model=m,
            machine_ids=ids,
            tau_max=tau_max,
            sig_level=sig_level,
            max_rows_per_machine=max_rows,
            min_support=min_support,
        )
else:
    # ProcessPoolExecutor parallelism over model types
    from concurrent.futures import ProcessPoolExecutor, as_completed

    futures = []
    with ProcessPoolExecutor(max_workers=n_jobs) as ex:
        for m, ids in jobs:
            futures.append(ex.submit(
                run_one_model,
                data_dir=data_dir,
                out_dir=out_dir,
                model=m,
                machine_ids=ids,
                tau_max=tau_max,
                sig_level=sig_level,
                max_rows_per_machine=max_rows,
                min_support=min_support,
            ))
        for fut in as_completed(futures):
            try:
                m = fut.result()
                print(f"[DONE] model={m}")
            except Exception as e:
                print(f"[ERROR] {type(e).__name__}: {e}")

print(f"All done. Outputs in: {out_dir.resolve()}")

In [19]:
from pathlib import Path
import json

from graphs import dict_to_generalgraph
from graph_utils import generalgraph_to_dot

def render_all_graphs(outputs_dir: str, *, include_per_machine: bool = False, fmt: str = "svg"):
    outputs_dir = Path(outputs_dir)
    viz_dir = outputs_dir / "viz"
    viz_dir.mkdir(parents=True, exist_ok=True)

    # optional: use python-graphviz rendering if available
    try:
        from graphviz import Source
        has_graphviz = True
    except Exception:
        has_graphviz = False

    def render_one(gdict, out_stem: Path, title: str):
        g = dict_to_generalgraph(gdict)
        dot = generalgraph_to_dot(g, title=title)

        # always save DOT (portable)
        (out_stem.with_suffix(".dot")).write_text(dot, encoding="utf-8")

        # optionally render an image if graphviz is installed
        if has_graphviz:
            Source(dot).render(str(out_stem), format=fmt, cleanup=True)

    skip_names = {"meta.json", "machine_selection.json"}

    for p in sorted(outputs_dir.rglob("*.json")):
        if p.name in skip_names or p.parts[-2:] == ("viz", p.name):
            continue

        d = json.loads(p.read_text(encoding="utf-8"))
        rel = p.relative_to(outputs_dir).as_posix()

        # Case 1: result file (has aggregate/per_machine)
        if isinstance(d, dict) and ("aggregate" in d or "per_machine" in d):
            if d.get("aggregate"):
                out_stem = viz_dir / (p.stem + "_aggregate")
                render_one(d["aggregate"], out_stem, title=f"{rel} [aggregate]")

            if include_per_machine and isinstance(d.get("per_machine"), dict):
                for mid, gdict in d["per_machine"].items():
                    if not gdict:
                        continue
                    out_stem = viz_dir / f"{p.stem}_machine_{mid}"
                    render_one(gdict, out_stem, title=f"{rel} [machine {mid}]")

        # Case 2: “pure graph” json (nodes/edges at top)
        elif isinstance(d, dict) and "nodes" in d and "edges" in d:
            out_stem = viz_dir / (p.stem + "_graph")
            render_one(d, out_stem, title=f"{rel} [graph]")

    print(f"Done. Wrote DOT files (and {fmt.upper()} if graphviz available) to: {viz_dir}")


In [20]:
render_all_graphs(out_dir, include_per_machine=True)

Done. Wrote DOT files (and SVG if graphviz available) to: outputs/viz
