In [None]:
# =========================================================
# DML (mediated) with RandomForest-based Ensemble IV 
# =========================================================

# ---- Limit BLAS/OpenMP threads BEFORE importing heavy libs ----
import os as os
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
os.environ.setdefault("VECLIB_MAXIMUM_THREADS", "1")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "1")

# ---- Standard libs ----
import sys
import time
import platform
from pathlib import Path

# ---- Third-party ----
import numpy as np
from threadpoolctl import threadpool_limits
from sklearn.linear_model import LogisticRegression

# Keep native libraries (BLAS/OpenMP) to 1 thread
threadpool_limits(1)

# ---- Local repo imports (adjust path if needed) ----
sys.path.append(str(Path.cwd() / "../../simulations"))
import dgps_mediated as dgps  

from nnpiv.ensemble import EnsembleIV, Ensemble2IV  
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier  
from nnpiv.semiparametrics import DML_mediated  

# PyTorch presence for reproducibility reporting
try:
    import torch  
    TORCH_OK = True
except Exception:
    TORCH_OK = False


# -----------------------
# Reproducibility helpers
# -----------------------
def seed_everything(seed: int = 123) -> None:
    """Set seeds for reproducibility."""
    import random
    random.seed(seed)
    np.random.seed(seed)
    if TORCH_OK:
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
        try:
            torch.set_num_threads(1)
            torch.set_num_interop_threads(1)
        except Exception:
            pass

seed_everything(123)


# -----------------------
# Resource print utility
# -----------------------
def print_resources():
    """Print basic compute resource info (CPU, GPU, library versions)."""
    cpu_cores = os.cpu_count()
    pyver = sys.version.split()[0]
    npver = np.__version__
    torch_info = "not installed"
    gpu_info = "CUDA: not available"
    if TORCH_OK:
        torch_info = torch.__version__
        if torch.cuda.is_available():
            try:
                name = torch.cuda.get_device_name(0)
            except Exception:
                name = "Unknown GPU"
            gpu_info = f"CUDA: available — {name}"
    print("=== Compute resources ===")
    print(f"Python: {pyver}")
    print(f"NumPy: {npver}")
    print(f"PyTorch: {torch_info}")
    print(f"CPU cores: {cpu_cores}")
    print(gpu_info)
    print("Thread caps (env):")
    for k in ["OMP_NUM_THREADS","OPENBLAS_NUM_THREADS","MKL_NUM_THREADS",
              "VECLIB_MAXIMUM_THREADS","NUMEXPR_NUM_THREADS"]:
        print(f"  {k}={os.environ.get(k, 'unset')}")
    print(f"Platform: {platform.platform()}")
    print("=========================\n")


# -----------------------
# Result formatter
# -----------------------
def summarize_dml_result(name: str, result, elapsed: float):
    """
    Accepts result from .dml() and prints θ, SE, 95% CI when available.
    Compatible with returns like (theta, var, ci) or (theta, var, ci, cov).
    """
    if isinstance(result, tuple):
        if len(result) == 3:
            theta, var, ci = result
            cov = None
        elif len(result) == 4:
            theta, var, ci, cov = result
        else:
            print(f"[{name}] time={elapsed:.2f}s — result={result}")
            return
    else:
        print(f"[{name}] time={elapsed:.2f}s — result={result}")
        return

    theta = np.atleast_1d(theta).astype(float)
    var = np.atleast_1d(var).astype(float)
    se = np.sqrt(var)
    ci = np.array(ci, dtype=float) if ci is not None else None

    def fmt_arr(a):
        return f"{float(a[0]):.4f}" if a.size == 1 else np.array2string(a, precision=4)

    print(f"[{name}] time={elapsed:.2f}s")
    print(f"  theta: {fmt_arr(theta)}")
    print(f"  SE   : {fmt_arr(se)}")
    if ci is not None:
        if ci.ndim == 1 and ci.size == 2:
            print(f"  95% CI: [{ci[0]:.4f}, {ci[1]:.4f}]")
        else:
            print(f"  95% CI: {np.array2string(ci, precision=4)}")
    if 'cov' in locals() and cov is not None:
        print(f"  (cov shape: {cov.shape})")
    print("")

In [2]:
# -----------------------
# Print resources
# -----------------------
print_resources()

=== Compute resources ===
Python: 3.10.18
NumPy: 2.2.6
PyTorch: 2.5.0
CPU cores: 112
CUDA: not available
Thread caps (env):
  OMP_NUM_THREADS=1
  OPENBLAS_NUM_THREADS=1
  MKL_NUM_THREADS=1
  VECLIB_MAXIMUM_THREADS=1
  NUMEXPR_NUM_THREADS=1
Platform: Linux-4.18.0-553.44.1.el8_10.x86_64-x86_64-with-glibc2.28



In [3]:
# =========================================================
# Data generation
# =========================================================
# Function dictionary (for reference):
# {'abs': 0, '2dpoly': 1, 'sigmoid': 2, 'sin': 3, 'frequent_sin': 4, 'abs_sqrt': 5,
#  'step': 6, '3dpoly': 7, 'linear': 8, 'rand_pw': 9, 'abspos': 10, 'sqrpos': 11,
#  'band': 12, 'invband': 13, 'steplinear': 14, 'pwlinear': 15, 'exponential': 16}

fn_number = 0
tau_fn = dgps.get_tau_fn(fn_number)
tauinv_fn = dgps.get_tauinv_fn(fn_number)  # kept for parity with your original script
W, Z, X, M, D, Y, tau_fn = dgps.get_data(2000, tau_fn)

# If you have a known ground-truth estimand for this DGP, put it here:
TRUE_PARAM = 4.05
print(f"=== Ground truth (for log reference) ===\nTrue parameter for E[Y(1,M(0))] ≈ {TRUE_PARAM:.2f}\n")

=== Ground truth (for log reference) ===
True parameter for E[Y(1,M(0))] ≈ 4.05



In [4]:
# =========================================================
# Model builders (fresh instances to avoid state leakage)
# =========================================================
def build_ensemble_iv():
    """Fresh single-stage EnsembleIV used in sequential MR (both stages & q-models)."""
    return EnsembleIV(n_iter=200, max_abs_value=2)

def build_ensemble2_iv():
    """Fresh simultaneous Ensemble2IV with RF components."""
    adversary = RandomForestRegressor(
        n_estimators=50, max_depth=None, bootstrap=True,
        min_samples_leaf=40, min_impurity_decrease=0.001
    )
    learnerg = RandomForestClassifier(
        n_estimators=50, max_depth=None, criterion="gini",
        bootstrap=False, min_samples_leaf=40, min_impurity_decrease=0.001
    )
    learnerh = RandomForestClassifier(
        n_estimators=40, max_depth=None, criterion="gini",
        bootstrap=False, min_samples_leaf=40, min_impurity_decrease=0.001
    )
    return Ensemble2IV(
        n_iter=500, max_abs_value=2,
        adversary=adversary, learnerg=learnerg, learnerh=learnerh,
        n_burn_in=100
    )


# =========================================================
# 1) Sequential estimator (MR) with EnsembleIV
# =========================================================
dml_seq = DML_mediated(
    Y, D, M, W, Z, X,
    estimator="MR",
    estimand="E[Y(1,M(0))]",
    # fresh models for both stages and q-models
    model1=[build_ensemble_iv(), build_ensemble_iv()],
    modelq1=[build_ensemble_iv(), build_ensemble_iv()],
    nn_1=[False,False],
    nn_q1=[False,False],
    fitargs1=[None, None],
    fitargsq1=[None, None],
    prop_score=LogisticRegression(max_iter=2000),
    n_folds=5, n_rep=1,
)
t0 = time.perf_counter()
res_seq = dml_seq.dml()
t1 = time.perf_counter()
summarize_dml_result("Sequential (MR) with EnsembleIV", res_seq, t1 - t0)


# =========================================================
# 2) Simultaneous estimator (MR) with Ensemble2IV
# =========================================================
dml_sim = DML_mediated(
    Y, D, M, W, Z, X,
    estimator="MR",
    estimand="E[Y(1,M(0))]",
    model1=build_ensemble2_iv(),   # simultaneous nested model
    nn_1=False,
    # you used EnsembleIV for the q-models; keep that but use fresh instances
    modelq1=[build_ensemble_iv(), build_ensemble_iv()],
    nn_q1=[False, False],
    fitargsq1=[None, None],
    prop_score=LogisticRegression(max_iter=2000),
    n_folds=5, n_rep=1,
)
t0 = time.perf_counter()
res_sim = dml_sim.dml()
t1 = time.perf_counter()
summarize_dml_result("Simultaneous (MR) with Ensemble2IV", res_sim, t1 - t0)

Rep: 1


100%|██████████| 5/5 [02:26<00:00, 29.20s/it] 


[Sequential (MR) with EnsembleIV] time=146.01s
  theta: 3.8579
  SE   : 4.9284
  95% CI: [3.6419, 4.0739]

Rep: 1


100%|██████████| 5/5 [09:38<00:00, 115.76s/it]

[Simultaneous (MR) with Ensemble2IV] time=578.79s
  theta: 3.7609
  SE   : 4.9921
  95% CI: [3.5422, 3.9797]




