## Simulation code for the project 'Testing and Identifying Substitution and Complementarity Patterns' 

It is implemneting four different estimators for the structural demand estimation model
- Estimator I: two-step estimator proposed in the paper
- Estimator II: mixed-effect parametric estimator
- Estimator III: fixed-effect logit estimator without bundles
- Estimator IV: nonparametric estimator without bundles


In [None]:
## import packages

import numpy as np
from scipy.stats import multivariate_normal, norm
from scipy.optimize import minimize

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import KFold
from sklearn.metrics import log_loss
from sklearn.ensemble import RandomForestClassifier


In [None]:
# Set seed for reproducibility

np.random.seed(1234)


### Initial configuration for simulation

In [None]:
# DGP parameters
B = 500                 # Number of simulations
N = 1000                # Sample size
T = 2                   # Two periods panel
dx = 2                  # Dimension of X
dz = 2                  # Dimension of Z

# True coefficients
beta0 = np.ones((dx, 1))    # True coefficient for a single good
gamma0 = np.ones((dz, 1))   # True coefficient for incremental utility

# Coefficients for fixed effects
eta1 = 0.5 * np.ones((dx, 1))
eta0 = 0

# Two-step estimator in the paper
beta_twostep = np.ones((dx - 1, B))  # Two-step estimator for beta
gamma_twostep = np.ones((dz - 1, B)) # Two-step estimator for gamma
err = np.ones(B)                     # Percentage of having wrong signs of substitution patterns

# Parametric estimator 
beta_par = np.ones((dx - 1, B))
gamma_par = np.ones((dz - 1, B))
err_par = np.ones(B)

# Two estimators assuming no bundles
beta_non = np.ones((dx - 1, B))  # Estimator under stationarity but assuming no bundles
beta_fl = np.ones((dx - 1, B))   # Fixed effect logit estimator


### Helper functions to help define moment functions


In [None]:
## sign function
def sg(y):
    return np.sign(y)

## differencing function for y
def D(x):
    return x[:, dx:] - x[:, :dx]

## differencing function for y
def dp(y):
    return y[:, 1] - y[:, 0]

## absolute function
def G(x):
    return np.abs(x)

## positive value function
def G1(x):
    return x * (x >= 0).astype(int)


### First-step estimator to predict consumers' choice 
- implement a single-layer neural network estimator and a random forest estimator
- use cross-validation to choose tuning parameters

#### First-step estimator I: neural network estimator


In [None]:
def compute_log_loss(y_true, y_pred):
    """Compute log loss for each binary target and return the average log loss."""
    return np.mean([log_loss(y_true[:, i], y_pred[:, i]) for i in range(y_true.shape[1])])

epoch_candidates = [50, 100, 200]               # number of iterations
learning_rate_candidates = [0.001, 0.01]   # learning rate #, 0.1

def neu(X, y):
    """
    Inputs:
    - X: input features which is an numpy array of shape (N, k)
    - y: N *dependent variable which is an numpy array of shape (N, s)
   
    Output:
    - predicted_probabilities: predicted probabilities for each y
    """
    kf = KFold(n_splits = 3, shuffle=True, random_state=42)   # kfold CV
    best_loss = float('inf')
    best_model = None
    best_epochs = 0
    best_learning_rate = 0.0
    
    for epochs in epoch_candidates:
        for lr in learning_rate_candidates:
            fold_losses = []
            
            for train_index, val_index in kf.split(X):
                X_train, X_val = X[train_index], X[val_index]
                y_train, y_val = y[train_index], y[val_index]
                
                # build the model
                model = Sequential([Input(shape=(X.shape[1],)),
                                    Dense(units=y.shape[1], activation='sigmoid')])
                model.compile(optimizer=Adam(learning_rate=lr), 
                              loss='binary_crossentropy', 
                              metrics=['accuracy'])
                # fit the model
                model.fit(X_train, y_train, epochs=epochs, batch_size=X_train.shape[0], verbose=0)
                
                # compute prediction and loss
                y_val_pred = model.predict(X_val)
                val_loss = compute_log_loss(y_val, y_val_pred)
                fold_losses.append(val_loss)
            
            # compute average loss and update the best model
            avg_loss = np.mean(fold_losses)
            if avg_loss < best_loss:
                best_loss = avg_loss
                best_epochs = epochs
                best_learning_rate = lr
                best_model = model
    
    # Train the best model on the entire dataset
    best_model.fit(X, y, epochs=best_epochs, batch_size=X.shape[0], verbose=0)
    
    # Predict probabilities
    predicted_probabilities = best_model.predict(X)
    
    return predicted_probabilities 


#### First-step estimator II: random forest estimator


In [None]:
def random_forest(X, y):
    """
    Inputs:
    - X: input features which is an numpy array of shape (N, k)
    - y: N *dependent variable which is an numpy array of shape (N, s)
   
    Output:
    - predicted_probabilities: predicted probabilities for each y
    """
    n_estimators_candidates = [200, 300, 500]  # number of n_estimators
    max_depth_candidates = [10, 15, 20]        # number of depth
    
    kf = KFold(n_splits=3, shuffle=True, random_state=42)
    best_loss = float('inf')
    best_model = None
    best_params = None
    
    for n_estimators in n_estimators_candidates:
        for max_depth in max_depth_candidates:
            fold_losses = []
            
            for train_index, val_index in kf.split(X):
                X_train, X_val = X[train_index], X[val_index]
                y_train, y_val = y[train_index], y[val_index]
                
                # build a model
                model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
                model.fit(X_train, y_train)
                
                # compute prediction and loss
                y_val_pred = model.predict_proba(X_val)  
                y_val_pred_proba = np.array([y_val_pred[:, 1] for y_val_pred in y_val_pred]).T
                
                val_loss = compute_log_loss(y_val, y_val_pred_proba)
                fold_losses.append(val_loss)
            
            # compute average loss and update the best model
            avg_loss = np.mean(fold_losses)
            if avg_loss < best_loss:
                best_loss = avg_loss
                best_model = model
                best_params = {'n_estimators': n_estimators, 'max_depth': max_depth}
    
    # Train the best model on the entire dataset
    best_model.fit(X, y)
    
    # Predict probabilities on the entire dataset
    predicted_probabilities = np.array([probs[:, 1] for probs in best_model.predict_proba(X)]).T
    
    return predicted_probabilities


### Estimator I: two-step estimator in this paper


In [None]:
# define moment equality function

def mom(theta, XA, XB, Z, p):
    '''
    theta: model parameter
    XA(XB): covariate for good A(B) at all periods
    p: estimated choice probability for Y1, Y2, Y3, Y0
    '''
    beta = np.concatenate(([1], theta[:dx - 1]))
    gamma = np.concatenate(([1], theta[dx - 1:]))
    Gamma = Z @ gamma  # Incremental utility

    deltaA = D(XA) @ beta  # Variation in covariate index for good A
    deltaB = D(XB) @ beta  # Variation in covariate index for good B

    # Extracting probabilities
    p1 = p[:, :T]               # Estimated choice probability for choosing only good A
    p2 = p[:, T:2 * T]          # Estimated choice probability for choosing only good B
    p0 = p[:, 2 * T:3 * T]      # Estimated choice probability for outside option
    p3 = p[:, 3 * T:4 * T]      # Estimated choice probability for buying AB
    p13 = p1 + p3               # Demand for good A
    p23 = p2 + p3               # Demand for good B

    # Construct index for a single good
    I1 = 1 - ((sg(dp(p1)) * deltaA > 0) | (sg(dp(p1)) * deltaB < 0))
    I2 = 1 - ((sg(dp(p2)) * deltaA < 0) | (sg(dp(p2)) * deltaB > 0))
    I3 = 1 - ((sg(dp(p3)) * deltaA > 0) | (sg(dp(p3)) * deltaB > 0))
    I0 = 1 - ((sg(dp(p0)) * deltaA < 0) | (sg(dp(p0)) * deltaB < 0))

    # Objective function for single choice
    Q1 = np.mean(G1(dp(p1)) * I1 + G1(dp(p2)) * I2 + G1(dp(p3)) * I3 + G1(dp(p0)) * I0)

    # Construct index for demand of good A and good B
    I13 = 1 - ((sg(dp(p13)) * deltaA > 0) | (sg(dp(p13)) * (deltaA + sg(Gamma) * deltaB) > 0))
    I23 = 1 - ((sg(dp(p23)) * deltaB > 0) | (sg(dp(p23)) * (deltaB + sg(Gamma) * deltaA) > 0))

    # Objective function using conditional demand
    Q2 = np.mean(G1(dp(p13)) * I13 + G1(dp(p23)) * I23)

    # Functions for sum of two choice probabilities
    def Ib1(t1):
        return 1 - ((Gamma < np.minimum((-1) ** t1 * deltaA, (-1) ** (t1 - 1) * deltaB)) &
                ((-1) ** t1 * (deltaA - deltaB) > 0))

    def Ib2(t1):
        return 1 - ((Gamma > np.maximum((-1) ** (t1 - 1) * deltaA, (-1) ** (t1 - 1) * deltaB)) &
                ((-1) ** t1 * (deltaA + deltaB) > 0))

    def L1(t1, t2):
        return p1[:, t1-1] + p2[:, t2-1] - 1

    def L2(t1, t2):
        return p3[:, t1-1] + p0[:, t2-1] - 1

    Q3 = np.mean(G1(L1(2, 1)) * Ib1(2) + G1(L1(1, 2)) * Ib1(1) + G1(L2(2, 1)) * Ib2(2) + G1(L2(1, 2)) * Ib2(1))

    # Final objective function
    return Q1 + Q2 + Q3


### Estimator II: Mixed-effect logit estimator
- assume a parametric model 
- use the simulated method of moments to calculate choice probabilities


#### 1. generate fixed effects and error terms according to mixed-effect logit model


In [None]:
# generate fixed effects and error terms according to mixed-effect logit model

S = 20                                       # Number of simulations
ep0_s = np.random.gumbel(0, 1, (N, T, S))    # error for outside option
epA_s = np.random.gumbel(0, 1, (N, T, S))    # error for good A
epB_s = np.random.gumbel(0, 1, (N, T, S))    # error for good B

# Simulated error terms in fixed effects for goods A and B
vA_s = np.random.randn(N, S)
vB_s = np.random.randn(N, S)

# Sum of two error terms: epA + vA and epB + vB
muA_s = np.zeros((N, T, S))
muB_s = np.zeros((N, T, S))

# Calculating the sum of error terms for each period and simulation
for t in range(T):  # Adjusting for Python's 0-based indexing
    muA_s[:, t, :] = vA_s + epA_s[:, t, :] - ep0_s[:, t, :]
    muB_s[:, t, :] = vB_s + epB_s[:, t, :] - ep0_s[:, t, :]


#### 2. define criterion function for Estimator II


In [None]:
## Criterion function for the parametric method
def par_sim(theta, XA, XB, Z, Y1, Y2, Y3):
    """
    - theta: true parameter
    - XA(XB): covariate for good A(B) at all periods
    - Z: covariate for substitution patterns
    - Y1, Y2, Y3: dependent variables
    """
    # Extract parameters from theta
    beta = np.concatenate(([1], theta[:dx-1]))
    gamma = np.concatenate(([1], theta[dx-1:dx+dz-2]))
    eta1 = theta[dx+dz-2:-1]
    eta0 = theta[-1]

    # Mean fixed effect for goods A and B
    alphaA = eta0 + np.dot((XA[:, :dx] + XA[:, dx:]) / T, eta1)
    alphaB = eta0 + np.dot((XB[:, :dx] + XB[:, dx:]) / T, eta1)

    # Mean utility for goods A and B
    delA = (np.column_stack((XA[:, :dx] @ beta, XA[:, dx:] @ beta)) + alphaA[:, None])[:, :, None]  # Mean utility for good A\\
    delB = (np.column_stack((XB[:, :dx] @ beta, XB[:, dx:] @ beta)) + alphaB[:, None])[:, :, None]  # Mean utility for good B

    # Incremental utility
    Gam = np.dot(Z, gamma)[:, None, None]

    # Simulated choice probabilities
    p1_s = np.mean((muA_s + delA >= np.maximum(muB_s + delB, 0)) & (muB_s <= -delB - Gam), axis = 2)
    p2_s = np.mean((muB_s + delB >= np.maximum(muA_s + delA, 0)) & (muA_s <= -delA - Gam), axis = 2)
    p3_s = np.mean((muA_s + delA + Gam >= np.maximum(-muB_s - delB, 0)) & (muB_s >= -delB - Gam), axis=2)

    # Concatenate covariates
    X = np.hstack([XA, XB, Z])

    # Moment conditions for the two periods
    Q_s1 = np.hstack([np.dot(X.T, (Y1 - p1_s)[:, 0]), np.dot(X.T, (Y2 - p2_s)[:, 0]), np.dot(X.T, (Y3 - p3_s)[:, 0])]) / N
    Q_s2 = np.hstack([np.dot(X.T, (Y1 - p1_s)[:, 1]), np.dot(X.T, (Y2 - p2_s)[:, 1]), np.dot(X.T, (Y3 - p3_s)[:, 1])]) / N
    
    # Sum of squares of the moment conditions
    Q = np.dot(Q_s1.T, Q_s1) + np.dot(Q_s2.T, Q_s2)
    
    return Q


### Estimator III: Chamberlain's fixed-effect logit model which does not allow bundles


In [None]:
# define moment function

def momlogit(beta, XA, XB, Y):
    """
    - beta: true parameter (excluding the constant term)
    - XA(XB): covariate for good A(B) at all periods
    - Y: dependent variables
    """
    
    b = np.concatenate(([1], beta))

    # Indicator function for choosing c1 at t=1 and c2 at t=2
    def dc(c1, c2):
        return ((Y[:, 0] == c1) & (Y[:, 1] == c2)).astype(int)

    # Choice probability calculations using the logistic function
    p01 = np.exp(D(XA) @ b) / (1 + np.exp(D(XA) @ b))
    p02 = np.exp(D(XB) @ b) / (1 + np.exp(D(XB) @ b))
    p12 = np.exp((D(XB) - D(XA)) @ b) / (1 + np.exp((D(XB) - D(XA)) @ b))

    # Calculating the log-likelihood for each pair of choices
    Q1 = np.mean(dc(0, 1) * np.log(p01) + dc(1, 0) * np.log(1 - p01))
    Q2 = np.mean(dc(0, 2) * np.log(p02) + dc(2, 0) * np.log(1 - p02))
    Q3 = np.mean(dc(1, 2) * np.log(p12) + dc(2, 1) * np.log(1 - p12))

    # Return the negative log-likelihood function
    return -(Q1 + Q2 + Q3)


### Estimator IV: Nonparametric estimator which does not allow bundles


In [None]:
# Define moment equality function for this estimator

def mom1(beta, XA, XB, p):
    """
    - beta: true parameter 
    - XA(XB): covariate for good A(B) at all periods
    - p: estimated choice probability for four choices
    """
    b = np.concatenate(([1], beta))  # Prepend 1 to beta to include intercept

    # Variation in covariate index for goods A and B
    deltaA = D(XA) @ b
    deltaB = D(XB) @ b

    # Estimated choice probabilities
    p1 = p[:, :T]                    # For choosing only good A
    p2 = p[:, T:2*T]                 # For choosing only good B
    p0 = p[:, 2*T:3*T]               # For choosing neither

    # Construct index for a single good
    I1 = 1 - ((sg(dp(p1)) * deltaA > 0) | (sg(dp(p1)) * (deltaA - deltaB) > 0))
    I2 = 1 - ((sg(dp(p2)) * deltaB > 0) | (sg(dp(p2)) * (deltaB - deltaA) > 0))
    I0 = 1 - ((sg(dp(p0)) * deltaA < 0) | (sg(dp(p0)) * deltaB < 0))

    # Construct objective function using a single choice
    Q = np.mean(G(dp(p1)) * I1 + G(dp(p2)) * I2 + G(dp(p0)) * I0)
    
    return Q


### Simulation starts here
- implement four different estimators


### Performance comparison of the four estimators


In [None]:
for b in range(B):

    # Generate covariates for good A
    XA1 = multivariate_normal.rvs(mean=np.zeros(dx), cov=np.eye(dx), size=N)
    XA2 = multivariate_normal.rvs(mean=np.zeros(dx), cov=np.eye(dx), size=N)
    XA = np.column_stack((XA1, XA2))
    
    # Generate covariates for good B
    XB1 = multivariate_normal.rvs(mean=np.zeros(dx), cov=np.eye(dx), size=N)
    XB2 = multivariate_normal.rvs(mean=np.zeros(dx), cov=np.eye(dx), size=N)
    XB = np.column_stack((XB1, XB2))

    # Generate substitution pattern covariates
    Z = np.column_stack((norm.rvs(loc=2, scale=2, size=N), norm.rvs(loc=0, scale=1, size=N)))

    # all covariates
    X = np.column_stack((XA, XB, Z))

    # Error terms and fixed effects setup
    rho = -0.7
    cov = [[1, rho], [rho, 1]]

    # Generate random samples
    ep1 = multivariate_normal.rvs([2, -2], cov, N)
    ep2 = multivariate_normal.rvs([2, -2], cov, N)
    epA = np.column_stack((ep1[:, 0], ep2[:, 0]))
    epB = np.column_stack((ep1[:, 1], ep2[:, 1]))
    ep0 = np.random.randn(N, T)
    alphaA = ((XA1 + XA2 - 2 * (XB1 + XB2)) @ beta0 / 4) * (1 + np.random.randn(N, 1))
    alphaB = ((XB1 + XB2 - 2 * (XA1 + XA2)) @ beta0 / 4) * (1 + np.random.randn(N, 1))

    # Latent utilities and choices
    uA = np.hstack((XA[:, :dx] @ beta0, XA[:, dx:] @ beta0)) + alphaA + epA - ep0
    uB = np.hstack((XB[:, :dx] @ beta0, XB[:, dx:] @ beta0)) + alphaB + epB - ep0
    u0 = np.zeros((N, T))
    uAB = uA + uB + Z @ gamma0

    um = np.maximum.reduce([uA, uB, uAB, np.zeros((N, T))])  # maximum utility
    
    # generate dependent variable
    Y = np.where(um == uA, 1, 0) + np.where(um == uB, 2, 0) + np.where(um == uAB, 3, 0)
    Yall = np.column_stack(((Y == 1).astype(int), (Y == 2).astype(int), (Y == 0).astype(int), (Y == 3).astype(int)))
    
    # First step estimator - Assuming function definition for 'neu'
    phat = neu(X, Yall)  # You need to define 'neu' to accept these inputs or adjust the call accordingly

    # Two-step estimator
    def objective_two_step(theta):
        return mom(theta, XA, XB, Z, phat)  # Define 'mom' as per your model

    initial_two_step = np.concatenate([np.ones(dx - 1), 0.5 * np.ones(dz - 1)])
    result_two_step = minimize(objective_two_step, initial_two_step, method = 'Powell')
    beta_twostep[:, b] = result_two_step.x[:dx - 1]
    gamma_twostep[:, b] = result_two_step.x[dx - 1:dx + dz - 2]
    err[b] = np.mean(np.abs((Z @ gamma0 >= 0).astype(int) - (Z @ np.concatenate([[1], gamma_twostep[:, b]]) >= 0).astype(int)))

    # Mixed effect parametric estimator
    def objective_par(theta):
        return par_sim(theta, XA, XB, Z, Y == 1, Y == 2, Y == 3)  # Define 'par_sim' as per your model

    initial_par = np.concatenate([np.ones(dx + dz), 0.5 * np.ones(dx - 1)])
    result_par = minimize(objective_par, initial_par, method = 'Nelder-Mead')
    beta_par[:, b] = result_par.x[:dx - 1]
    gamma_par[:, b] = result_par.x[dx - 1:dx + dz - 2]
    err_par[b] = np.mean(np.abs((Z @ gamma0 >= 0).astype(int) - (Z @ np.concatenate([[1], gamma_par[:, b]]) >= 0).astype(int)))

    # Logit estimator without bundles
    def objective_logit(beta):
        return momlogit(beta, XA, XB, Y)  # Define 'momlogit' as per your model

    result_logit = minimize(objective_logit, np.array([0]), method = 'Powell', bounds=[(-5, 5)])
    beta_fl[:, b] = result_logit.x[0]

    # Nonparametric estimator without bundles
    phat1 = phat[:, :3 * T]
    def objective_non(beta):
        return mom1(beta, XA, XB, phat1)  # Define 'mom1' as per your model

    result_non = minimize(objective_non, np.array([0]), method = 'Powell', bounds=[(-5, 5)])
    beta_non[:, b] = result_non.x[0]


In [None]:

## Evaluation function of estimators
def M(theta):
    """
    Evaluate the performance of estimators
    - theta: array of estimator values
    Returns: [Bias, SD, RMSE, MAD]
    """
    mean_theta = np.mean(theta)
    std_theta = np.std(theta)
    rmse = np.sqrt(np.var(theta) + (mean_theta - 1)**2)
    mad = np.median(np.abs(theta - 1))
    return [round(mean_theta - 1, 3), round(std_theta, 3), round(rmse, 3), round(mad, 3)]

# Performance of estimators
rMSEnon = M(beta_twostep)
rMSEpar = M(beta_par)
rMSEnon1 = M(beta_non)
rMSEfl = M(beta_fl)
rMSEgnon = M(gamma_twostep)
rMSEgpar = M(gamma_par)

# Printing the results
print("Performance of beta_twostep:", rMSEnon)
print("Performance of beta_par:", rMSEpar)
print("Performance of beta_non:", rMSEnon1)
print("Performance of beta_fl:", rMSEfl)
print("Performance of gamma_twostep:", rMSEgnon)
print("Performance of gamma_par:", rMSEgpar)
