In [1]:
# basic libraries
import pandas as pd
import numpy as np
import random

# sklearn functions
from sklearn.linear_model import ElasticNetCV
from sklearn.ensemble import RandomForestRegressor
from sklearn.datasets import make_spd_matrix

In [66]:
class Metrics():
    
    def __init__(self):
        pass
    
    
    '''
    Method for creating synthetic data for use in OLS problems.
    
    User specifies:
       
       N - number of observations
       K - number of features
    seed - a seed for np.random.seed()
    '''
    def synthOLS(self, N = 500, K = 10, seed = None):
        
        # set the randomization seed
        np.random.seed(seed)

        # define coefficients
        b = (np.sin([1/K for K in range(1,(K + 1))]) + 0.01).reshape(-1, 1)
        
        # define the std deviates for the features
        sigma = make_spd_matrix(K, 1)
        
        # create the feature matrices
        X = np.random.multivariate_normal(np.ones(K), sigma, size = [N,])
        
        # define error terms
        e = np.random.standard_normal(size=[N,]).reshape(-1, 1)
        
        # create the variables
        y = X@b + e

        return y, X
    
    
    '''
    Method for creating synthetic data for use in instrumental variables
    problems.
    
    User specifies:
        
        N - number of observations
        K - number of features
     inst - number of instrumental variables
    theta - the true effect of the treatment variable on the outcome
     seed - a seed for np.random.seed()
    '''
    def synthIV(self, N = 500, K = 10, inst = 1, theta = 0.5, seed = None):
        
        # set the randomization seed
        np.random.seed(seed)

        # define coefficients
        b1 = (
            np.sin([1/K for K in range(1,(K + 1))]) + 0.01
        ).reshape(-1, 1)
        b2 = np.flip(b1)
        bz = (
            np.cos([1/inst for inst in range(1,(inst + 1))]) + 0.01
        ).reshape(-1, 1)

        # define the std deviates for the features
        sigma1 = make_spd_matrix(K, 1)
        sigma2 = make_spd_matrix(inst, 1)

        # create the feature matrices
        X = np.random.multivariate_normal(np.ones(K), sigma1, size = [N,])
        z = np.random.multivariate_normal(np.ones(inst), sigma2, size = [N,])

        # define error terms
        e1 = np.random.standard_normal(size=[N,]).reshape(-1, 1)
        e2 = np.random.standard_normal(size=[N,]).reshape(-1, 1)

        # create the variables
        d = z@bz + X@b1 + e1
        y = np.dot(theta,d) + X@b2 + e2

        return y, d, z, X
    
    
    '''
    Method for creating synthetic data for use in double machine
    learning (DML) problems. It is important to note that there are
    IV methods for DML and this function does not create data for those.
    This function makes synthetic data for the OLS equivalent of the
    problem.
    
    User specifies:
        
        N - number of observations
        K - number of features
    theta - the true effect of the treatment variable on the outcome
     seed - a seed for np.random.seed()
    '''
    def synthDML(self, N = 500, K = 10, theta = 0.5, seed = None):
    
        # set the randomization seed
        np.random.seed(seed)

        # define 
        b = np.sin([1/K for K in range(1,(K + 1))]) + 0.01
        sigma = make_spd_matrix(K, 1)
        X = np.random.multivariate_normal(np.ones(K), sigma, size = [N,])

        # if no functions are supplied for 
        def g(x):
            return np.power(np.sin(x),2)
        def m(x,nu=0.,gamma=1.):
            return 0.5/np.pi*(np.sinh(gamma))/(np.cosh(gamma)-np.cos(x-nu))

        # define error terms
        e1 = np.random.standard_normal(size=[N,])
        e2 = np.random.standard_normal(size=[N,])

        # compute the variables
        G = g(np.dot(X,b))
        M = m(np.dot(X,b))
        d = M + e1
        y = np.dot(theta,d) + G + e2

        return y, d, X
    
    
    '''
    Method for taking a list of numbers and returning n *mostly* equal
    partitions of randomly shuffeled elements of the list. Primary purpose
    is to act as a helper function for the dml method.
    
    User specifies:
    
    list_in - the list to be shuffeled and partitioned
          n - the number of partitions to split the list into
    '''
    # function to create splits for cross-fitting
    def partition (self, list_in, n):
        random.shuffle(list_in)
        return [list_in[i::n] for i in range(n)]

    # function to use splits for orthogonalization
    def orthog(self, ind, dep, indices, mod):

        # fit the model
        modfit = mod.fit(
            np.delete(ind, indices, 0),
            np.delete(dep, indices, 0).ravel()
        )

        # predict
        dephat = modfit.predict(
            ind[indices]
        ).reshape(-1, 1)

        # residualize
        depres = dep[indices] - dephat

        return depres
    
    
    '''
    Method for implementing the double machine learning (DML) algorithm
    as specified in Chernozhukov et al., 2016. Users interested in the
    detailed workings of the algorithm should consult that paper or
    Chernozhukov's github (https://github.com/VC2015/DMLonGitHub) where
    similar code and examples may be found.
    
    User specifies:
    
         X - a numpy array of exogenous covariates
         y - a numpy array of a single outcome variable
         d - a numpy array of a single treatment variable
      ymod - a scikit-learn ML model to residualize the outcome
      dmod - a scikit-learn ML model to residualize the treatment
    splits - number of splits to produce for cross fitting
    '''
    def dml(self, y, d, X, ymod, dmod = None, splits = 2):

        # reshape the data if necessary
        if len(y.shape) == 1:
            y = y.reshape(-1, 1)
        if len(d.shape) == 1:
            d = d.reshape(-1, 1)

        # double model logic
        if dmod is None:
            dmod = ymod

        # split indices
        I = self.partition(list(range(len(y))), splits)

        # initialize empty lists
        dlist  = np.empty((0,1), float)
        ylist  = np.empty((0,1), float)
        thetas = np.empty((0,1), float)

        # perform cross-fitting
        for i in I:

            # get orthogonalized treatment
            dorth = self.orthog(X, d, i, dmod)
            dlist = np.append(dlist, dorth, 0)

            # get orthogonalized response
            yorth = self.orthog(X, y, i, ymod)
            ylist = np.append(ylist, yorth, 0)

            # calculate intermediate thetas
            thetas = np.append(thetas,
                               np.mean(dorth*yorth)/np.mean(dorth**2))

        # prep post-orthogonalization regressors
        D = np.hstack( (np.ones((len(dlist), 1)) , dlist) )

        # fit the DML2 model
        coefs = np.linalg.lstsq(D, ylist, rcond = None)[0]

        # get var-cov matrix for DML2
        res = ylist - (coefs[0]*D[0] + coefs[1]*D[1])
        vcv = np.true_divide(1, len(y) - 2
        )*np.dot(np.dot(res.T,res), np.linalg.inv(np.dot(D.T, D)))

        # get DML1 and DML2 coefficient
        theta1 = np.mean(thetas)
        theta2 = coefs[1]

        # calculate the dml1 standard error
        se1 = np.sqrt(np.mean( (ylist - theta1*dlist)**2*dlist**2
                ) / (np.mean(dlist**2)**2)
            ) / np.sqrt(len(dlist) - 1)

        # calculate the dml2 standard error
        se2 = np.sqrt(np.diagonal(vcv))[1]

        # present the output
        return {
            'dml1':{
                'coef_se':np.hstack((theta1, se1))
            },
            'dml2':{
                'coef_se':np.hstack((theta2, se2))
            },
            'orth_data':np.hstack((ylist, dlist)),
            'indices':I
        }
    
    
    '''
    Method for implementing the instrumental variables double machine
    learning (DML) algorithm as specified in Chernozhukov et al., 2016.
    Users interested in the detailed workings of the algorithm should
    consult that paper or Chernozhukov's github
    (https://github.com/VC2015/DMLonGitHub) where similar code and
    examples may be found.
    
    User specifies:
    
         X - a numpy array of exogenous covariates
         y - a numpy array of a single outcome variable
         d - a numpy array of a single treatment variable
         z - a numpy array of instrumental variables
      ymod - a scikit-learn ML model to residualize the outcome
      dmod - a scikit-learn ML model to residualize the treatment
    splits - number of splits to produce for cross fitting
    '''
    def dmlIV(self, y, d, z, X, ymod, dmod = None, zmod = None, splits = 2):

        # reshape the data if necessary
        if len(y.shape) == 1:
            y = y.reshape(-1, 1)
        if len(d.shape) == 1:
            d = d.reshape(-1, 1)
        if len(z.shape) == 1:
            z = z.reshape(-1, 1)

        # double model logic
        if dmod is None:
            dmod = ymod
        if zmod is None:
            zmod = ymod

        # split indices
        I = self.partition(list(range(len(y))), splits)

        # initialize empty lists
        zlist  = np.empty((0,1), float)
        dlist  = np.empty((0,1), float)
        ylist  = np.empty((0,1), float)

        # perform cross-fitting
        for i in I:

            # get orthogonalized instrument
            zorth = self.orthog(X, z, i, zmod)
            zlist = np.append(zlist, zorth, 0)

            # get orthogonalized treatment
            dorth = self.orthog(X, d, i, dmod)
            dlist = np.append(dlist, dorth, 0)

            # get orthogonalized response
            yorth = self.orthog(X, y, i, ymod)
            ylist = np.append(ylist, yorth, 0)

        return tsls(ylist, dlist, zlist)
    
    
    
    def tsls(self, y, d, z, X = None, robust_se = False):
        
        # reshape arrays if necessary
        if X is not None:
            if len(X.shape) == 1:
                X = X.reshape(-1, 1)
        if len(y.shape) == 1:
            y = y.reshape(-1, 1)
        if len(d.shape) == 1:
            d = d.reshape(-1, 1)
        if len(z.shape) == 1:
            z = z.reshape(-1, 1)

        # define obs-column parameters
        n = d.shape[0]

        if X is not None:
            k = X.shape[1] + d.shape[1]
        else:
            k = d.shape[1]

        # restack arrays for matrix multiplication
        if X is not None:
            X = np.hstack((d, X))
            Z = np.hstack((z, X))
        else:
            X = d
            Z = z

        # 2SLS weighting matrix and X projection
        P = Z@np.linalg.inv(Z.T@Z)@Z.T
        X_h = P@X

        # get (X'PX)^(-1) in var-covar formula
        X_inv = np.linalg.inv(X_h.T@X_h)

        # 2SLS estimation
        b = X_inv@X_h.T@y

        # calculate error variance
        e = y - np.dot(X_h, b)
        SSE = e.T@e
        var_e = SSE/(n - k)

        # get either robust or standard middle term
        if robust_se:
            omega = np.diag(np.diag(e@e.T))
        else:
            omega = var_e*np.identity(n)

        # calculate var-covar matrix
        vcv = X_inv@X_h.T@omega@X_h@X_inv

        # get se by sqroot of diag
        se = np.sqrt(np.diagonal(vcv))

        return np.hstack((b, se.reshape(-1, 1)))
    
    
    
    def ols(self, y, X, robust_se = False):

        # reshape arrays if necessary
        if len(X.shape) == 1:
            X = X.reshape(-1, 1)
        if len(y.shape) == 1:
            y = y.reshape(-1, 1)

        n = X.shape[0]
        k = X.shape[1]

        # get (X'X)^(-1) in var-covar and coeff formula
        X_inv = np.linalg.inv(X.T@X)

        # coef estimation
        b = X_inv@X.T@y

        # calculate the error variance
        e = y - np.dot(X, b)
        SSE = e.T@e
        var_e = SSE/(n - k)

        # get either robust or standard middle term
        if robust_se:
            omega = (n/(n - k))*np.diag(np.diag(e@e.T))
        else:
            omega = var_e*np.identity(n)

        # calculate the var-covar matrix
        vcv = X_inv@X.T@omega@X@X_inv

        # get se by sqroot of diag
        se = np.sqrt(np.diagonal(vcv))

        return np.hstack((b, se.reshape(-1, 1)))

In [68]:
#X, y = make_regression(n_samples = 1000, n_features = 10, n_targets = 1)
y, d, z, X = Metrics().synthIV(N = 10000)

In [73]:
Metrics().dmlIV(y, d, z, X, ymod = ElasticNetCV(), splits = 2)

array([[0.51041291, 0.01546852]])

In [58]:
tsls(y, d, z)

array([[0.50758  , 0.0154988]])

In [59]:
tsls(y, d, z, robust_se = True)

array([[0.50758   , 0.01546897]])