In [1]:
import cvxpy as cp
import numpy as np
import scipy as sp
import sys
import pandas as pd

In [2]:
def construct_T_matrix_simple(K, epsilon):
    # T_kl = P(Y_tilde = k | Y=l)
    T = (1-epsilon) * np.identity(K) + epsilon/K * np.ones((K,K))
    return T

def construct_T_matrix_block(K, epsilon):
    assert K%2 == 0
    K2 = int(K/2)
    J2 = np.ones((K2,K2))
    T = (1.0-epsilon) * np.identity(K) + epsilon/K2 * sp.linalg.block_diag(J2,J2)
    return T

def construct_T_matrix_block_RR(K, epsilon, nu):
    assert K%2 == 0
    K2 = int(K/2)
    B2 = ((1.0+nu)*epsilon/K) * np.ones((K2,K2))
    C2 = ((1.0-nu)*epsilon/K) * np.ones((K2,K2))
    T = (1.0-epsilon) * np.identity(K) + np.block([[B2,C2],[C2,B2]])
    return T

In [3]:
def construct_omega(W, beta_0, betas):
    # Ensure that W is a square matrix and betas has the correct length
    K = W.shape[0]
    assert W.shape[1] == K, "W must be a square matrix"
    assert len(betas) == K, "betas must have length k"

    # Identity matrix of size k
    I_K = np.eye(K)

    # Constructing Beta_matrix where each row is filled with corresponding beta_i
    Beta_matrix = np.array([betas[i] * np.ones(K) for i in range(K)]) / K

    # Constructing Omega matrix
    Omega = np.round(W - beta_0 * I_K - Beta_matrix,6)

    return Omega

In [4]:
def estimate_c_const(n, n_mc=1000):
    #c_n = np.sqrt(np.pi/6) / np.sqrt(n)
    # Monte Carlo simulation for estimating c(n)
    R = np.zeros((n_mc,))
    for b in range(n_mc):
        U = np.sort(np.random.uniform(0,1,size=(n,)))
        R[b] = np.max(np.arange(1,n+1)/n - U)
    c_n = np.mean(R)
    return c_n

def cvxpy_loss_function_a(beta_0, betas, W, n):
    K = W.shape[0]
    # Identity matrix of size K
    I_K = np.eye(K)

    # Constructing a matrix with the i-th row as beta_i
    Beta_matrix = cp.vstack([betas[i] * cp.Constant(np.ones(K)/K) for i in range(K)])

    # Constructing Omega matrix
    Omega = W - beta_0 * I_K - Beta_matrix

    # Constants for objective function
    const_a = np.sqrt(np.log(K*n+1))

    # Calculate c(n)
    c_n = estimate_c_const(n)

    # Loss functions
    loss_1 = c_n * (beta_0 + cp.sum(betas)/K)
    loss_2_a = cp.norm(Omega, 1) * const_a
    
    loss = loss_1 + 2/np.sqrt(n) * loss_2_a
    return loss
    
    
def cvxpy_loss_function_b(beta_0, betas, W, n):
    K = W.shape[0]

    # Identity matrix of size K
    I_K = np.eye(K)

    # Constructing a matrix with the i-th row as beta_i
    Beta_matrix = cp.vstack([betas[i] * cp.Constant(np.ones(K)/K) for i in range(K)])

    # Constructing Omega matrix
    Omega = W - beta_0 * I_K - Beta_matrix

    # Constants for objective function
    const_b = 24 * ((2 * np.log(K)+1)/(2 * np.log(K)-1)) * np.sqrt(2*K*np.log(K))

    # Calculate c(n)
    c_n = estimate_c_const(n)

    # Loss functions
    loss_1 = c_n * (beta_0 + cp.sum(betas)/K)
    loss_2_b = cp.norm(Omega, "inf") * const_b
    
    loss = loss_1 + 2/np.sqrt(n) * loss_2_b
    return loss

def numpy_loss_function(beta_0, betas, W, n):
    loss_a = cvxpy_loss_function_a(beta_0, betas, W, n).value
    loss_b = cvxpy_loss_function_b(beta_0, betas, W, n).value
    return np.minimum(loss_a, loss_b)


In [5]:
def solve_optim_problem_a(W, n):
    # Variables
    beta_0 = cp.Variable()
    betas = cp.Variable(K)
   
    # Solve problem A
    loss = cvxpy_loss_function_a(beta_0, betas, W, n)
    objective = cp.Minimize(loss)
    problem = cp.Problem(objective, [])
    problem.solve()

    # Return results
    optim_value = problem.value
    optim_beta_0 = beta_0.value
    optim_betas = betas.value
    return optim_value, optim_beta_0, optim_betas
    
def solve_optim_problem_b(W, n):
    # Variables
    beta_0 = cp.Variable()
    betas = cp.Variable(K)

    # Solve problem A
    loss = cvxpy_loss_function_b(beta_0, betas, W, n)
    objective = cp.Minimize(loss)
    problem = cp.Problem(objective, [])
    problem.solve()
    
    # Return results
    optim_value = problem.value
    optim_beta_0 = beta_0.value
    optim_betas = betas.value
    return optim_value, optim_beta_0, optim_betas
    
def solve_optim_problem(W, n):
    value_a, beta_0_a, betas_a = solve_optim_problem_a(W, n)
    value_b, beta_0_b, betas_b = solve_optim_problem_b(W, n)
    
    if value_a <= value_b:
        value = value_a
        beta_0 = beta_0_a
        betas = betas_a
    else:
        value = value_b
        beta_0 = beta_0_b
        betas = betas_b
        
    return value, beta_0, betas


#print("Optimal value: {:.6f}".format(value))
#print(f"Optimal beta_0: {beta_0}")
#print(f"Optimal betas: {betas}")

## Experiments on the finite sample correction term

### Experiment 1 - Block randomized response model

#### Plot 1
Fix K and let n vary.

In [14]:
# Specification of model and model parameters
exp_num = 1
model_name = "B"
epsilon = 0.1
nu = "NA"
seed = 1

# Add important parameters to table of results
header = pd.DataFrame({'model_name':[model_name,model_name], 'epsilon':[epsilon,epsilon], 'nu':[nu,nu],
                       'seed':[seed,seed],'plot':["Klab","Klab"]})
# Output file
outfile_prefix = "exp"+str(exp_num) + "/simplified_methods/" + model_name
outfile_prefix += "_eps" + str(epsilon) + "_seed" + str(seed) + "_Klab"
print("Output file: {:s}.".format("results/"+outfile_prefix), end="\n")
sys.stdout.flush()

# Vector of sample sizes
K_values = [4, 8, 16]
n_cal_vals = [500, 1000, 2000, 5000, 10000, 20000, 50000]

results = pd.DataFrame({})

for K in K_values:
    
    for n_cal in n_cal_vals:
        # Construct the transition matrix T
        T = construct_T_matrix_block(K, epsilon)

        # Evaluate the inverse of T
        W = np.linalg.inv(T)

        # Define the basic solution of the Randomized Response model for K
        beta_0_rr = 1/(1-epsilon)
        betas_rr = np.ones((K,)) * (-epsilon/(1-epsilon))

        # Loss in correspondence of the basic solution
        value_rr = numpy_loss_function(beta_0_rr, betas_rr, W, n_cal)

        # Find the optimal beta vector by solving the minimization problem
        value_cvx, beta_0, betas = solve_optim_problem(W, n_cal)
        #value_cvx = numpy_loss_function(beta_0, betas, W, n_cal)

        ress = pd.DataFrame({'n_cal':[n_cal,n_cal], 'K':[K,K], 'values':[value_rr, value_cvx], 'Method':["RR", "CVX"]})
        res = pd.concat([header, ress], axis=1)

        # Save results
        results = pd.concat([results, res])
    
    print(K)

outfile = "experiments/results/" + outfile_prefix + ".txt"
results.to_csv(outfile, index=False, float_format="%.5f")

Output file: results/exp1/simplified_methods/B_eps0.1_seed1_Klab.
50000
50000
50000


#### Plot 2
Fix n and let K vary.

In [15]:
# Specification of model and model parameters
exp_num = 1
model_name = "B"
epsilon = 0.1
nu = "NA"
seed = 1

# Add important parameters to table of results
header = pd.DataFrame({'model_name':[model_name,model_name], 'epsilon':[epsilon,epsilon], 'nu':[nu,nu],
                       'seed':[seed,seed],'plot':["nlab","nlab"]})
# Output file
outfile_prefix = "exp"+str(exp_num) + "/simplified_methods/" + model_name
outfile_prefix += "_eps" + str(epsilon) + "_seed" + str(seed) + "_nlab"
print("Output file: {:s}.".format("results/"+outfile_prefix), end="\n")
sys.stdout.flush()

# Vector of sample sizes
n_cal_vals = [50, 100, 500]
K_values = np.arange(2,22,2)

results = pd.DataFrame({})

for n_cal in n_cal_vals:
    
    for K in K_values:
        # Construct the transition matrix T
        T = construct_T_matrix_block(K, epsilon)

        # Evaluate the inverse of T
        W = np.linalg.inv(T)

        # Define the basic solution of the Randomized Response model for K
        beta_0_rr = 1/(1-epsilon)
        betas_rr = np.ones((K,)) * (-epsilon/(1-epsilon))

        # Loss in correspondence of the basic solution
        value_rr = numpy_loss_function(beta_0_rr, betas_rr, W, n_cal)

        # Find the optimal beta vector by solving the minimization problem
        value_cvx, beta_0, betas = solve_optim_problem(W, n_cal)
        #value_cvx = numpy_loss_function(beta_0, betas, W, n_cal)

        ress = pd.DataFrame({'n_cal':[n_cal,n_cal], 'K':[K,K], 'values':[value_rr, value_cvx], 'Method':["RR", "CVX"]})
        res = pd.concat([header, ress], axis=1)

        # Save results
        results = pd.concat([results, res])
    
    print(n_cal)

outfile = "experiments/results/" + outfile_prefix + ".txt"
results.to_csv(outfile, index=False, float_format="%.5f")

Output file: results/exp1/simplified_methods/B_eps0.1_seed1_nlab.
50
100
500


### Experiment 2 - Two-level randomized response model

#### Plot 1
Fix K and let n vary.

In [None]:
# Specification of model and model parameters
exp_num = 1
model_name = "BRR"
epsilon = 0.1
nu = 0.8
seed = 1

# Add important parameters to table of results
header = pd.DataFrame({'model_name':[model_name,model_name], 'epsilon':[epsilon,epsilon], 'nu':[nu,nu],
                       'seed':[seed,seed],'plot':["Klab","Klab"]})
# Output file
outfile_prefix = "exp"+str(exp_num) + "/simplified_methods/" + model_name
outfile_prefix += "_eps" + str(epsilon) + "_nu" + str(nu) + "_seed" + str(seed) + "_Klab"
print("Output file: {:s}.".format("results/"+outfile_prefix), end="\n")
sys.stdout.flush()

# Vector of sample sizes
K_values = [4, 8, 16]
n_cal_vals = [500, 1000, 2000, 5000, 10000, 20000, 50000]

results = pd.DataFrame({})

for K in K_values:
    
    for n_cal in n_cal_vals:
        # Construct the transition matrix T
        T = construct_T_matrix_block_RR(K, epsilon, nu)

        # Evaluate the inverse of T
        W = np.linalg.inv(T)

        # Define the basic solution of the Randomized Response model for K
        beta_0_rr = 1/(1-epsilon)
        betas_rr = np.ones((K,)) * (-epsilon/(1-epsilon))

        # Loss in correspondence of the basic solution
        value_rr = numpy_loss_function(beta_0_rr, betas_rr, W, n_cal)

        # Find the optimal beta vector by solving the minimization problem
        value_cvx, beta_0, betas = solve_optim_problem(W, n_cal)
        #value_cvx = numpy_loss_function(beta_0, betas, W, n_cal)

        ress = pd.DataFrame({'n_cal':[n_cal,n_cal], 'K':[K,K], 'values':[value_rr, value_cvx], 'Method':["RR", "CVX"]})
        res = pd.concat([header, ress], axis=1)

        # Save results
        results = pd.concat([results, res])
    
    print(K)

outfile = "experiments/results/" + outfile_prefix + ".txt"
results.to_csv(outfile, index=False, float_format="%.5f")

Output file: results/exp1/simplified_methods/BRR_eps0.1_nu0.8_seed1_Klab.
50000
50000
50000


#### Plot 2
Fix n and let K vary.

In [None]:
# Specification of model and model parameters
exp_num = 1
model_name = "BRR"
epsilon = 0.1
nu = 0.8
seed = 1

# Add important parameters to table of results
header = pd.DataFrame({'model_name':[model_name,model_name], 'epsilon':[epsilon,epsilon], 'nu':[nu,nu],
                       'seed':[seed,seed],'plot':["nlab","nlab"]})
# Output file
outfile_prefix = "exp"+str(exp_num) + "/simplified_methods/" + model_name
outfile_prefix += "_eps" + str(epsilon) + "_nu" + str(nu) + "_seed" + str(seed) + "_nlab"
print("Output file: {:s}.".format("results/"+outfile_prefix), end="\n")
sys.stdout.flush()

# Vector of sample sizes
n_cal_vals = [50, 100, 500]
K_values = np.arange(2,22,2)

results = pd.DataFrame({})

for n_cal in n_cal_vals:
    
    for K in K_values:
        # Construct the transition matrix T
        T = construct_T_matrix_block_RR(K, epsilon, nu)

        # Evaluate the inverse of T
        W = np.linalg.inv(T)

        # Define the basic solution of the Randomized Response model for K
        beta_0_rr = 1/(1-epsilon)
        betas_rr = np.ones((K,)) * (-epsilon/(1-epsilon))

        # Loss in correspondence of the basic solution
        value_rr = numpy_loss_function(beta_0_rr, betas_rr, W, n_cal)

        # Find the optimal beta vector by solving the minimization problem
        value_cvx, beta_0, betas = solve_optim_problem(W, n_cal)
        #value_cvx = numpy_loss_function(beta_0, betas, W, n_cal)

        ress = pd.DataFrame({'n_cal':[n_cal,n_cal], 'K':[K,K], 'values':[value_rr, value_cvx], 'Method':["RR", "CVX"]})
        res = pd.concat([header, ress], axis=1)

        # Save results
        results = pd.concat([results, res])
    
    print(n_cal)

outfile = "experiments/results/" + outfile_prefix + ".txt"
results.to_csv(outfile, index=False, float_format="%.5f")

Output file: results/exp1/simplified_methods/BRR_eps0.1_nu0.8_seed1_nlab.
50
100
500
