In [1]:
import numpy as np
import statsmodels.api as sm
import linearmodels as lm
import pandas as pd

import scipy
import torch
import torchmin

np.random.seed(42)

In [2]:
def dgp(n = 100_000,
        beta = np.array([-0.5, 1.2]),
        rho = 0.7, pi = 0.5):
    ε = np.random.normal(0, 1, n)
    z = np.random.normal(0, 1, n)
    # Generate endogenous x, influenced by the instrument
    x = z * pi + ε * rho + np.random.normal(0, 1, n)
    X = np.c_[np.ones(n), x]
    # Outcome variable with true relationship
    y = X @ beta + ε + (X[:, 1] > 0) * np.random.normal(0, 1, n)
    return y, X, z

In [3]:
class GMMEstimator:
    def __init__(self, moment_cond, weighting_matrix="optimal", opt="scipy"):
        """Generalized Method of Moments Estimator

        Args:
            moment_cond (function): Moment condition. Returns L X n matrix of moments
            weighting_matrix (str, optional): What kind of weight matrix to use. Defaults to 'optimal'.
            opt (str, optional): Optimization method. Defaults to 'scipy', numerical via 'torch' works more easily for large problems.

        ref: Hansen (1982), Cameron and Trivedi (2005) Chapter 6
        """
        self.moment_cond = moment_cond
        self.weighting_matrix = weighting_matrix
        self.opt = opt

    def gmm_objective(self, beta):
        """
        Quadratic form to be minimized.
        """
        moments = self.moment_cond(self.z, self.y, self.x, beta)
        if self.weighting_matrix == "optimal":
            self.W = self.optimal_weighting_matrix(moments)
        else:
            if self.opt == "scipy":
                self.W = np.eye(moments.shape[1])
            elif self.opt == "torch":
                self.W = torch.eye(moments.shape[1])
        if self.opt == "scipy":
            return (1 / self.n) * moments.sum(axis=0).T @ self.W @ moments.sum(axis=0)
        elif self.opt == "torch":
            return (1 / self.n) * torch.matmul(
                moments.sum(axis=0).T, torch.matmul(self.W, moments.sum(axis=0))
            )

    def optimal_weighting_matrix(self, moments):
        """
        Optimal Weight matrix
        """
        if self.opt == "scipy":
            return np.linalg.inv((1 / self.n) * (moments.T @ moments))
        elif self.opt == "torch":
            return torch.inverse((1 / self.n) * torch.matmul(moments.T, moments))

    def fit(self, z, y, x, verbose=False):
        if self.opt == "scipy":
            self.z, self.y, self.x = z, y, x
            self.n, self.k = x.shape
            # minimize the objective function
            result = scipy.optimize.minimize(
                self.gmm_objective,
                x0=np.random.rand(self.k),
                method="L-BFGS-B",
                options={"disp": verbose},
            )
        elif self.opt == "torch":
            # minimize blackbox using pytorch
            self.z, self.y, self.x = (
                torch.tensor(z, dtype=torch.float64),
                torch.tensor(y, dtype=torch.float64),
                torch.tensor(x, dtype=torch.float64),
            )
            self.n, self.k = x.shape
            beta_init = torch.tensor(
                np.random.rand(self.k), dtype=torch.float64, requires_grad=True
            )
            result = torchmin.minimize(
                self.gmm_objective, beta_init, method="l-bfgs", tol=1e-5, disp=verbose
            )
            self.W = self.W.detach().numpy()
        # solution
        self.theta = result.x

        # Standard error calculation
        try:
            self.Gamma = self.jacobian_moment_cond()
            self.vθ = np.linalg.inv(self.Gamma.T @ self.W @ self.Gamma)
            self.std_errors = np.sqrt(self.n * np.diag(self.vθ))
        except:
            self.std_errors = None

    def jacobian_moment_cond(self):
        """
        Jacobian of the moment condition
        """
        if self.opt == "scipy":  # Analytic Jacobian for linear IV; else use torch
            self.jac_est = -self.z.T @ self.x
        elif self.opt == "torch":
            self.jac = torch.func.jacfwd(self.moment_cond, argnums=3)
            self.jac_est = (
                self.jac(self.z, self.y, self.x, self.theta)
                .sum(axis=0)
                .detach()
                .numpy()
            )
        return self.jac_est

    def summary(self):
        return pd.DataFrame({"coef": self.theta, "std err": self.std_errors})

Single endogeneous variable and single instrument DGP with varying instrument strength ($\pi$) and degree of endogeneity($\rho$).


# No Endogeneity 

OLS and IV with X as its own instrument should produce the same estimates.

In [4]:
y, X, z = dgp(n = 1_000, pi = 0, rho = 0)
print(sm.OLS(y, X).fit(cov_type = "HC2").summary().tables[1])

                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
const         -0.4714      0.038    -12.245      0.000      -0.547      -0.396
x1             1.2065      0.038     31.774      0.000       1.132       1.281


### GMM using Scipy Minimization

In [5]:
%%time
ψ = lambda z, y, x, beta: z * (y - x @ beta)[:, np.newaxis]
gmm_scipy = GMMEstimator(ψ)
gmm_scipy.fit(np.c_[np.ones(z.shape[0]), X[:, 1]], y, X)
gmm_scipy.summary()

CPU times: user 6.37 ms, sys: 5.86 ms, total: 12.2 ms
Wall time: 6.05 ms


Unnamed: 0,coef,std err
0,-0.471433,0.038463
1,1.206489,0.037892


### GMM using Torch Minimization

In [6]:
%%time
def moment_cond(z, y, x, beta):
    residuals = (y - x @ beta).unsqueeze(-1)
    return z * residuals

gmm = GMMEstimator(moment_cond, opt = "torch")
gmm.fit(np.c_[np.ones(z.shape[0]), X[:, 1]], y, X)
gmm.summary()

CPU times: user 330 ms, sys: 68.1 ms, total: 398 ms
Wall time: 85.1 ms


  moments.sum(axis=0).T, torch.matmul(self.W, moments.sum(axis=0))


Unnamed: 0,coef,std err
0,-0.471433,0.038463
1,1.206489,0.037892


Identical estimates and standard errors.

# With Endogeneity 

OLS is inconsistent. Also confirm `GMMEstimator` returns the same answer as IV2SLS.

In [7]:
y, X, z = dgp(n = 1_000, beta = np.array([0, 1.5]), rho = 0.8)
print(sm.OLS(y, X).fit().summary().tables[1])

                 coef    std err          t      P>|t|      [0.025      0.975]
------------------------------------------------------------------------------
const          0.0086      0.035      0.245      0.807      -0.061       0.078
x1             1.9329      0.025     76.192      0.000       1.883       1.983


In [8]:
%%time
ψ = lambda z, y, x, beta: z * (y - x @ beta)[:, np.newaxis]
gmm = GMMEstimator(ψ)
gmm.fit(np.c_[np.ones(z.shape[0]), z], y, X)
gmm.summary()

CPU times: user 5.19 ms, sys: 5.3 ms, total: 10.5 ms
Wall time: 5.01 ms


Unnamed: 0,coef,std err
0,-0.036383,0.043122
1,1.435081,0.086377


In [9]:
%%time
def moment_cond(z, y, x, beta):
    residuals = (y - x @ beta).unsqueeze(-1)
    return z * residuals

gmm = GMMEstimator(moment_cond, opt = "torch")
gmm.fit(np.c_[np.ones(z.shape[0]), z], y, X)
gmm.summary()

CPU times: user 705 ms, sys: 42.8 ms, total: 748 ms
Wall time: 67.9 ms


Unnamed: 0,coef,std err
0,-0.036384,0.043123
1,1.43508,0.086377


In [10]:
lm.iv.model.IV2SLS(y, None, X, np.c_[np.ones(z.shape[0]), z]).fit().summary.tables[1]

  return vecs @ np.diag(1 / np.sqrt(vals)) @ vecs.T


0,1,2,3,4,5,6
,Parameter,Std. Err.,T-stat,P-value,Lower CI,Upper CI
endog.0,-0.0364,0.0431,-0.8437,0.3988,-0.1209,0.0481
endog.1,1.4351,0.0864,16.614,0.0000,1.2658,1.6044


Identical estimates and standard errors.