In [21]:
import numpy as np
import scipy
from scipy.integrate import quad
from scipy.stats import multivariate_normal, norm
from scipy.optimize import minimize
import joblib
import pandas as pd
# Equivalent to R's logit function
def logit(x):
    return np.log(x / (1 - x))

# Equivalent to R's expit function
def expit(x):
    return 1 / (1 + np.exp(-x))
# Bivariate normal PDF
# Equivalent to R's biv_pdf function
def biv_pdf(x, y, mu, sigma, rho, log=False):
    rho_part = 1 - rho**2
    if rho_part <= 0:
        rho_part = 0.000001  # To prevent division by zero or negative values

    factor_part = 2 * np.pi * sigma[0] * sigma[1] * np.sqrt(rho_part)
    exp_part = ((x - mu[0]) / sigma[0])**2 + ((y - mu[1]) / sigma[1])**2
    exp_part -= 2 * rho * (x - mu[0]) * (y - mu[1]) / (sigma[0] * sigma[1])
    exp_part = -exp_part / (2 * rho_part)

    if log:
        return exp_part - np.log(factor_part)
    else:
        return np.exp(exp_part) / factor_part

# Equivalent to R's dmg_int_inner function
def dmg_int_inner(x, y, mu, sigma, rho, eta, alpha, l):
    sd_x = sigma[0]
    sd_y = sigma[1]
    # Adjusted value of y
    adjusted_y = y + (mu[1] / mu[0]) * alpha * (l / eta - x)
    return biv_pdf(x, adjusted_y, mu, sigma, rho)

# Equivalent to R's dmg_int function
def dmg_int(y, mu, sigma, rho, eta, alpha, l):
    # Use scipy's quad to perform the integration
    result, _ = quad(dmg_int_inner, l, l / eta, args=(y, mu, sigma, rho, eta, alpha, l), epsabs=1e-5)
    return result


def dmg_model(samples, eta, alpha, l, mu):
    # Check if l > samples[:, 0] * eta for each row
    condition = l > samples[:, 0] * eta
    adjusted_values = samples[:, 1] - (mu[1] / mu[0]) * alpha * (l / eta - samples[:, 0])
    
    # If condition is True, calculate damage; otherwise, use undamaged value
    return np.where(condition, adjusted_values, samples[:, 1])
def int_function(y_star, mu, sigma, rho, l):
    a_l = (l - mu[0] - rho * (sigma[0] / sigma[1]) * (y_star - mu[1])) / (sigma[0] * np.sqrt(1 - rho**2))
    return norm.pdf(y_star, loc=mu[1], scale=sigma[1]) * (1 - norm.cdf(a_l))


def PFY_lik(mu, sigma, rho, eta, alpha, l, data):
    part_1 = np.sum(norm.logpdf(data[data[:, 2] == 1, 0], loc=mu[0], scale=sigma[0]))
    
    dmg_int_values = np.array([dmg_int(y, mu, sigma, rho, eta, alpha, l) for y in data[data[:, 2] == 0, 1]])
    int_function_values = np.array([int_function(y, mu, sigma, rho, 1/eta * l) for y in data[data[:, 2] == 0, 1]])
    
    part_2 = np.sum(np.log(dmg_int_values + int_function_values))
    
    return part_1 + part_2


def pl_gen(mu, sigma, rho, eta, alpha, l, N):
    # Covariance matrix
    cov_matrix = np.array([
        [sigma[0]**2, sigma[0] * sigma[1] * rho],
        [sigma[0] * sigma[1] * rho, sigma[1]**2]
    ])
    # Generate samples from bivariate normal distribution
    samples = np.random.multivariate_normal(mean=mu, cov=cov_matrix, size=N)
    res = np.zeros((samples.shape[0], samples.shape[1] + 1))
    
    # Fail in the proof loading
    fail_ids = samples[:, 0] < l
    res[fail_ids, 0] = samples[fail_ids, 0]
    res[fail_ids, 2] = 1
    
    # Survive in the proof loading
    survive_ids = samples[:, 0] >= l
    res[survive_ids, 1] = dmg_model(samples[survive_ids], eta, alpha, l, mu)
    res[survive_ids, 2] = 0
    
    return res


In [11]:
# Example usage (requires biv_pdf to be implemented)
mu = [45, 5.5]
sigma = [13, 1]
rho = .7
eta = .7
alpha = 0

# rho = correlation_coefficient
N = 87

R_pf = norm.ppf([0.2, 0.4, 0.6], loc=mu[0], scale=sigma[0])
T_pf = norm.ppf([0.2, 0.4, 0.6], loc=mu[1], scale=sigma[1])
l = R_pf[2]
pf_data = pl_gen(mu, sigma, rho, eta, alpha, l, N)
#print(pf_data)

In [12]:
theta0  = [mu[0],mu[1],
           sigma[0],sigma[1],logit(rho),logit(eta),alpha]


In [13]:
PFY_lik(mu, sigma, rho, eta, alpha, l, pf_data)

-269.3381497915118

In [14]:
def alpha_checkR(theta, R_group, T_group, R_pl, T_pl, shoulder_group):
    mu = theta[0:2]
    sigma = theta[2:4]
    rho = expit(theta[4])
    eta = expit(theta[5])
    alpha = theta[6]
    
    # Likelihood calculations
    lik = (PFY_lik(mu, sigma, rho, 1, 0, R_pl, R_group) +
           PFY_lik([mu[1], mu[0]], [sigma[1], sigma[0]], rho, eta, alpha, T_pl, T_group) +
           np.sum(norm.logpdf(shoulder_group[0], loc=mu[0], scale=sigma[0])) +
           np.sum(norm.logpdf(shoulder_group[1], loc=mu[1], scale=sigma[1])))
    
    # Handle infinite likelihood
    if np.isinf(lik):
        lik = -10000
    
    return -1 * lik

def single_fitalpha(theta, group, group_pl, group_name, shoulder_group):
    mu = theta[0:2]
    sigma = theta[2:4]
    rho = theta[4]
    eta = theta[5]
    alpha = theta[6]
    
    if group_name == "R":
        lik = PFY_lik(mu, sigma, rho, eta, alpha, group_pl, group)
    elif group_name == "T":
        lik = PFY_lik([mu[1], mu[0]], [sigma[1], sigma[0]], rho, eta, alpha, group_pl, group)
    
    lik += (np.sum(norm.logpdf(shoulder_group[0], loc=mu[0], scale=sigma[0])) +
            np.sum(norm.logpdf(shoulder_group[1], loc=mu[1], scale=sigma[1])))
    
    if np.isinf(lik):
        lik = -10000
    
    return -1 * lik

# Equivalent to R's single_alpha0 function
def single_alpha0(theta, group, group_pl, group_name, shoulder_group):
    mu = theta[0:2]
    sigma = theta[2:4]
    rho = expit(theta[4])
    
    if group_name == "R":
        lik = PFY_lik(mu, sigma, rho, 1, 0, group_pl, group)
    elif group_name == "T":
        lik = PFY_lik([mu[1], mu[0]], [sigma[1], sigma[0]], rho, 1, 0, group_pl, group)
    
    lik += (np.sum(norm.logpdf(shoulder_group[0], loc=mu[0], scale=sigma[0])) +
            np.sum(norm.logpdf(shoulder_group[1], loc=mu[1], scale=sigma[1])))
    
    if np.isinf(lik):
        lik = -10000
    
    return -1 * lik



def ecdf(x):
    x = np.sort(x)
    n = len(x)
    def _ecdf(v):
        # side='right' because we want Pr(x <= v)
        return (np.searchsorted(x, v, side='right') + 1) / n
    return _ecdf

In [15]:
np.random.seed(42)  # Set seed for reproducibility
pf_data = pl_gen(mu, sigma, rho, eta, alpha, l, N)
R100_data = np.random.normal(loc=mu[0], scale=sigma[0], size=2 * N)
T100_data = np.random.normal(loc=mu[1], scale=sigma[1], size=2 * N)
shoulder_group = [R100_data, T100_data]


In [16]:
def lrt_fit(jj,theta):
    np.random.seed(jj)  # Set seed for reproducibility
    pf_data = pl_gen(mu, sigma, rho, eta, alpha, l, N)
    R100_data = np.random.normal(loc=mu[0], scale=sigma[0], size=2 * N)
    T100_data = np.random.normal(loc=mu[1], scale=sigma[1], size=2 * N)
    shoulder_group = [R100_data, T100_data]



    optim_checkR = minimize(single_alpha0, theta0[0:5], args=(pf_data, l, "R", shoulder_group),
                           method = "Nelder-Mead")
    llr_checkR = 2 * optim_checkR.fun
    theta_est = optim_checkR.x
    # # Optimization step for single_fitalpha
    bounds = [(30, 70), (0.1, 40), (0.1, 30), (0.1, 10), (0.01, 0.99), (0.01, 0.99), (-2, 10)]
    optimout = minimize(single_fitalpha, theta0, 
                        args=(pf_data,l, "R", shoulder_group), 
                        method="L-BFGS-B", bounds=bounds)
    llr_full = 2 * optimout.fun
    llr_stat = llr_checkR - llr_full
    return [jj,llr_stat,*theta_est.tolist()]


In [17]:
def lrt_sim(jj,theta):
    np.random.seed(jj)  # Set seed for reproducibility
    mu = theta[0:2]
    sigma = theta[2:4]
    rho = expit(theta[4])
    pf_data = pl_gen(mu, sigma, rho, 1,0, l, N)
    R100_data = np.random.normal(loc=mu[0], scale=sigma[0], size=2 * N)
    T100_data = np.random.normal(loc=mu[1], scale=sigma[1], size=2 * N)
    shoulder_group = [R100_data, T100_data]



    optim_checkR = minimize(single_alpha0, theta0[0:5], args=(pf_data, l, "R", shoulder_group),
                           method = "Nelder-Mead")
    llr_checkR = 2 * optim_checkR.fun
    theta_est = optim_checkR.x
    # # Optimization step for single_fitalpha
    bounds = [(30, 70), (0.1, 40), (0.1, 30), (0.1, 10), (0.03, 0.97), (0.03, 0.97), (-2, 10)]
    optimout = minimize(single_fitalpha, theta0, 
                        args=(pf_data,l, "R", shoulder_group), 
                        method="L-BFGS-B", bounds=bounds)
    llr_full = 2 * optimout.fun
    llr_stat = llr_checkR - llr_full
    return [jj,llr_stat,*theta_est.tolist()]


In [18]:
res = np.zeros((200,3))
for ii in range(200):
    np.random.seed(ii)  # Set seed for reproducibility
    pf_data = pl_gen(mu, sigma, rho, eta, alpha, l, N)
    R100_data = np.random.normal(loc=mu[0], scale=sigma[0], size=2 * N)
    T100_data = np.random.normal(loc=mu[1], scale=sigma[1], size=2 * N)
    shoulder_group = [R100_data, T100_data]



    optim_checkR = minimize(single_alpha0, theta0[0:5], args=(pf_data, l, "R", shoulder_group),
                           method = "Nelder-Mead")
    llr_checkR = 2 * optim_checkR.fun
    theta_est = optim_checkR.x
    # # Optimization step for single_fitalpha
    bounds = [(30, 70), (0.1, 40), (0.1, 30), (0.1, 10), (0.01, 0.99), (0.01, 0.99), (-2, 10)]
    optimout = minimize(single_fitalpha, theta0, 
                        args=(pf_data,l, "R", shoulder_group), 
                        method="L-BFGS-B", bounds=bounds)
    llr_full = 2 * optimout.fun
    llr_stat = llr_checkR - llr_full
    
    N_sim = 1000
    numbers = range(N_sim)
    results = joblib.Parallel(n_jobs=150)(joblib.delayed(lrt_sim)(num,theta_est) for num in numbers)
    p_val = 1-ecdf(np.array(results)[:,1])(llr_stat )
    res[ii,:] = [ii,llr_stat,p_val]




In [27]:
(pd.DataFrame(res)).to_csv("R40_lrt.csv",index = False)