In [1]:
# !pip install pyscipopt==4.3.0 --extra-index-url https://zenodo.org/api/files/kotdvw5srpqto3xslzj1

!pip install git+https://github.com/duarteguilherme/autobounds

Collecting git+https://github.com/duarteguilherme/autobounds
  Cloning https://github.com/duarteguilherme/autobounds to /tmp/pip-req-build-e_q0ml09
  Running command git clone --filter=blob:none --quiet https://github.com/duarteguilherme/autobounds /tmp/pip-req-build-e_q0ml09
  Resolved https://github.com/duarteguilherme/autobounds to commit 13b7520febdfe59d29c917a4501e69fde5f78c6d
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting pyscipopt (from autobounds==0.0.2)
  Downloading pyscipopt-6.0.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (6.5 kB)
Downloading pyscipopt-6.0.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (17.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.2/17.2 MB[0m [31m72.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: autobounds
  Building wheel

## Autobound simulation

In [2]:
from autobounds import *
import pandas
import numpy as np

In [3]:
confounding_model = DAG("D -> Y, X -> D, X -> Y, U -> D, U -> Y", unob="U")
confounding_problem = causalProblem(confounding_model)
confounding_problem.set_ate(ind="D", dep="Y")

In [4]:
# -------------------------------------------------------------------
strata_list = [
    ("00", "0000", 0.074576212961429),
    ("01", "0001", 0.0301702777607751),
    ("01", "0101", 0.186607173274612),
    ("01", "1101", 0.00798467737923486),
    ("10", "0010", 0.348297597134471),
    ("10", "1001", 0.134412818663854),
    ("10", "1010", 0.0480346194425266),
    ("10", "1111", 0.0940165222917155),
    ("11", "1100", 0.0345830964246296),
    ("11", "1101", 0.041317004666752),
]

df = pandas.DataFrame(strata_list, columns=["D_type", "Y_type", "prob"])

N = 10000   # simulation size

# sample D and Y according to the strata list
stratum_idx = np.random.choice(df.index, size=N, p=df["prob"].values)
sim = df.loc[stratum_idx].reset_index(drop=True)

# sample X ~ Bernoulli(0.6)
sim["X"] = np.random.binomial(1, 0.6, size=N)

# compute factual D and Y based on principal strata definitions
def compute_D(D_type, X):
    # D_type = d0d1
    return int(D_type[X])

def compute_Y(Y_type, D, X):
    # Y_type = y00 y01 y10 y11  (4 bits)
    # index = 2*D + X
    idx = 2 * D + X
    return int(Y_type[idx])

# factual D
sim["D"] = [compute_D(dt, x) for dt, x in zip(sim["D_type"], sim["X"])]

# factual Y
sim["Y"] = [compute_Y(yt, d, x)
            for yt, d, x in zip(sim["Y_type"], sim["D"], sim["X"])]

# store factual observed data
confounding_data = sim[["X", "D", "Y"]]

In [5]:
# solve without monotone response constraints
confounding_problem.read_data(raw=confounding_data, inference=True)
confounding_problem.solve(ci=True, nsamples=10)

Solving for point estimate bounds...
Point estimates

Dual: [-0.1659, 0.8341]
Primal: [-0.1659, 0.8341]
Generating samples:

Estimating CI: 


100%|██████████| 10/10 [00:07<00:00,  1.43it/s]

95% Confidence intervals. Lower: -0.1719595772592103,  Upper: 0.8355688151499093





{'point lb dual': -0.1659,
 'point ub dual': 0.8341,
 'point lb primal': -0.1659,
 'point ub primal': 0.8341,
 '2.5% lb bounds': np.float64(-0.1719595772592103),
 '97.5% ub bounds': np.float64(0.8355688151499093),
 '1% lb bounds': np.float64(-0.1721159266440455),
 '99% ub bounds': np.float64(0.8356221828684943)}

In [6]:
confounding_problem.read_data(raw=confounding_data.iloc[:1000], inference=True)

# add monotone-response assumptions (E[Y(d)|X=1] >= E[Y(d)|X=0] for d=0,1)
with respect_to(confounding_problem):
    turnout_if_college_in_highSES = E("Y(D=1)", cond="X=1")
    turnout_if_college_in_lowSES  = E("Y(D=1)", cond="X=0")
    turnout_if_nocollege_in_highSES = E("Y(D=0)", cond="X=1")
    turnout_if_nocollege_in_lowSES  = E("Y(D=0)", cond="X=0")

    add_assumption(turnout_if_college_in_highSES, '>=', turnout_if_college_in_lowSES)
    add_assumption(turnout_if_nocollege_in_highSES, '>=', turnout_if_nocollege_in_lowSES)

# solve LP and request inferential sampling
confounding_problem.solve(ci=True, nsamples=10)


Solving for point estimate bounds...
Point estimates

Dual: [0.100920326932022, 0.828]
Primal: [0.100920326932022, 0.828]
Generating samples:

Estimating CI: 


100%|██████████| 10/10 [00:06<00:00,  1.47it/s]

95% Confidence intervals. Lower: 0.04870077558369033,  Upper: 0.8373640605896486





{'point lb dual': 0.100920326932022,
 'point ub dual': 0.828,
 'point lb primal': 0.100920326932022,
 'point ub primal': 0.828,
 '2.5% lb bounds': np.float64(0.04870077558369033),
 '97.5% ub bounds': np.float64(0.8373640605896486),
 '1% lb bounds': np.float64(0.04855540791709363),
 '99% ub bounds': np.float64(0.8376593582976131)}

## Optimize only via the primal

In [15]:
import numpy as np
import pandas as pd
import torch

class ATEPrimalSolver:
    """
    Solves the primal linear program with gradient descent.
    Decision variables: p[x,d,y0,y1] for x,d,y0,y1 in {0,1}
    """

    def __init__(self, X, D, Y):
        """
        X, D, Y are numpy arrays of observed data
        """
        self.X = X
        self.D = D
        self.Y = Y

        # empirical joint P_obs(X=x, D=d, Y=y)
        self.q_obs = self._compute_observed_joint()

        # empirical marginal
        self.px = np.array([np.mean(X == 0), np.mean(X == 1)])

        # create trainable probabilities (16 variables)
        self.p = torch.nn.Parameter(
            torch.ones(2,2,2,2, dtype=torch.double) / 16,
            requires_grad=True
        )

        self.optimizer = torch.optim.Adam([self.p], lr=5e-2)

    # -----------------------------------------------------------
    def _compute_observed_joint(self):
        q = np.zeros((2,2,2))
        N = len(self.X)
        for x,d,y in zip(self.X, self.D, self.Y):
            q[x,d,y] += 1
        return q / N

    # -----------------------------------------------------------
    def _objective(self, maximize=False):
        """
        ATE = sum (y1 - y0) * p
        """
        y0 = torch.arange(2, dtype=torch.double)
        y1 = torch.arange(2, dtype=torch.double)
        Y0, Y1 = torch.meshgrid(y0, y1, indexing='ij')

        ATE = torch.sum((Y1 - Y0)[None,None,:,:] * self.p)

        return -ATE if maximize else ATE

    # -----------------------------------------------------------
    def _constraints_loss(self):
        loss = 0.0

        # (C1) observed joint constraints: sum_{y0,y1: y_d=y} p = q_obs
        for x in [0,1]:
            for d in [0,1]:
                for y in [0,1]:
                    mask = torch.zeros((2,2))
                    for y0 in [0,1]:
                        for y1 in [0,1]:
                            yd = y1 if d==1 else y0
                            if yd == y:
                                mask[y0,y1] = 1
                    loss += (torch.sum(self.p[x,d] * mask) - self.q_obs[x,d,y])**2

        # (C2) normalization
        loss += (torch.sum(self.p) - 1.0)**2

        # (C3) monotone-response:
        # E[Y(d)|X=1] >= E[Y(d)|X=0]
        for d in [0,1]:
            # LHS = p0 * E[Y(d)|X=1] - p1 * E[Y(d)|X=0]
            E1 = 0.0
            E0 = 0.0
            for d2 in [0,1]:
                for y0 in [0,1]:
                    for y1 in [0,1]:
                        yd = y1 if d==1 else y0
                        E1 += yd * self.p[1,d2,y0,y1]
                        E0 += yd * self.p[0,d2,y0,y1]

            lhs = self.px[0] * E1 - self.px[1] * E0
            loss += torch.relu(-lhs)   # penalty only if violated

        # (C2b) nonnegativity
        loss += torch.sum(torch.relu(-self.p))

        return loss

    # -----------------------------------------------------------
    def solve(self, maximize=False, iters=6000, l_constrained = 10):
        for it in range(iters):
            self.optimizer.zero_grad()
            loss = self._objective(maximize=maximize) + l_constrained * self._constraints_loss()
            loss.backward()
            self.optimizer.step()

            # project to >=0 to keep stable
            with torch.no_grad():
                self.p.data.clamp_(min=1e-9)
                # renormalize to keep mass ~1
                self.p.data /= self.p.data.sum()

        ATE_est = -self._objective(maximize=maximize).item() if maximize else self._objective().item()
        return ATE_est, self.p.data.clone()

# return upper and lower bounds
def solve_ATE_bounds(confounding_data, iters=6000):
    X = confounding_data["X"].values
    D = confounding_data["D"].values
    Y = confounding_data["Y"].values

    solver = ATEPrimalSolver(X, D, Y)

    lb, p_min = solver.solve(maximize=False, iters=iters, l_constrained=100)
    ub, p_max = solver.solve(maximize=True,  iters=iters, l_constrained=100)

    return {
        "ATE_lower": lb,
        "ATE_upper": ub,
        "p_min_solution": p_min,
        "p_max_solution": p_max,
    }

In [16]:
result = solve_ATE_bounds(confounding_data, iters=8000)

print("ATE lower bound =", result["ATE_lower"])
print("ATE upper bound =", result["ATE_upper"])

ATE lower bound = 0.13000004948273106
ATE upper bound = 0.7259100321970758
