In [4]:
import sys
sys.path.append('..')

from bayesian_hte.base import BayesianMetaLearner

import numpy as np
import pandas as pd
import pymc as pm

In [21]:
np.random.seed(0)

n = 100
p = 4

X = np.random.normal(size=(n, p))
treatment_prob_beta = np.random.normal(size=p)
treatment_probability = 1 / (1 + np.exp(-X @ treatment_prob_beta))

treatment = np.random.binomial(1, treatment_probability)
effect_beta = np.random.normal(size=p)
effect = X @ np.abs(effect_beta)

exog_beta = np.random.normal(size=p)
exog = X @ exog_beta

y = exog + treatment * effect + 0.5 * np.random.normal(size=n)

print("Treatment proportion", treatment.mean())
print("Average treatment effect", effect.mean())

y = pd.Series(y)
treatment = pd.Series(treatment)
X = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(p)])

Treatment proportion 0.58
Average treatment effect -0.18644847389182095


In [48]:
class LinearHierarchicalTLearner(BayesianMetaLearner):
    def _fit(self, X, y, treated):
        self.models["model"] = pm.Model()
        with self.models["model"]:
            X_ = pm.Data("X", X)
            y_ = pm.Data("y", y)
            treated_ = pm.Data("treated", treated.astype("float"))

            beta = pm.Normal("beta", mu=0, sigma=1, shape=X.shape[1])
            sigma = pm.HalfNormal("sigma", sigma=1)

            beta_treated = pm.Normal("beta_treated", mu=beta, sigma=1)
            beta_control = pm.Normal("beta_control", mu=beta, sigma=1)

            mu = pm.math.switch(
                treated_, pm.math.dot(X_, beta_treated), pm.math.dot(X_, beta_control)
            )

            cate_beta = pm.Deterministic("cate_beta", beta_treated - beta_control)
            pm.Deterministic("cate", pm.math.dot(X_, cate_beta))
            pm.Normal("y_hat", mu=mu, sigma=sigma, observed=y_)

            self.idata = pm.sample()

    def predict_cate(self, X):
        with self.models["model"]:
            pm.set_data({"X": X, "treated": np.ones(X.shape[0])})
            post_pred = pm.sample_posterior_predictive(
                self.idata, var_names=["cate"],
            )

        return post_pred

In [49]:
lht = LinearHierarchicalTLearner()
lht.fit(X, y, treatment)

Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [beta, sigma, beta_treated, beta_control]
  self.pid = os.fork()


Output()

  self.pid = os.fork()


Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 28 seconds.
Sampling: []


Output()

<__main__.LinearHierarchicalTLearner at 0x387022960>

In [50]:
cate = lht.predict_cate(X)

Sampling: []


Output()

In [None]:
np.abs(effect).mean()

1.7819801342793442

In [56]:
np.abs((cate["posterior_predictive"]["cate"].mean(dim=["chain", "draw"]) - effect)).mean()