Wild Bootstrap Testing
1. Calculate the test statistic $V_{n}$.
2. Obtain wild bootstrap samples $\{B_{n}\}_{i=1}^{D}$ and estimate the 1 − α empirical quantile of these samples.
3. If $V_{n}$ exceeds the quantile, reject.

In [47]:
from scipy.spatial.distance import squareform, pdist
import numpy as np

scaling=2.0

In [48]:
def grad_log_correleted(x):
    sigmaInv = np.linalg.inv(sigma)
    return - np.dot(sigmaInv.T + sigmaInv, x) / 2.0

In [49]:
def Agrad_multiple(X):
    return np.array([grad_log_correleted(x) for x in X])

In [50]:
def kernel_matrix(X):

    assert X.shape[0] > X.shape[1]
    sq_dists = squareform(pdist(X, 'sqeuclidean'))
    K = np.exp(-sq_dists/ scaling)
    return K

In [51]:
def gradient_k_wrt_x( X, K, dim):

    X_dim = X[:, dim]
    assert X_dim.ndim == 1

    differences = X_dim.reshape(len(X_dim), 1) - X_dim.reshape(1, len(X_dim))

    return -2.0 / scaling * K * differences

In [52]:
def gradient_k_wrt_y( X, K, dim):
    return -gradient_k_wrt_x(X, K, dim)

In [53]:
def second_derivative_k(X, K, dim):
    X_dim = X[:, dim]
    assert X_dim.ndim == 1

    differences = X_dim.reshape(len(X_dim), 1) - X_dim.reshape(1, len(X_dim))

    sq_differences = differences ** 2

    return 2.0 * K * (scaling - 2 * sq_differences) / scaling ** 2

In [54]:
def get_statistic_multiple_dim(samples, dim):
    num_samples = len(samples)

    log_pdf_gradients = Agrad_multiple(samples)
    log_pdf_gradients = log_pdf_gradients[:, dim]
    K = kernel_matrix(samples)
    gradient_k_x = gradient_k_wrt_x(samples, K, dim)
    gradient_k_y = gradient_k_wrt_y(samples, K, dim)
    second_derivative = second_derivative_k(samples, K, dim)

    # use broadcasting to mimic the element wise looped call
    pairwise_log_gradients = log_pdf_gradients.reshape(num_samples, 1) \
                             * log_pdf_gradients.reshape(1, num_samples)
    A = pairwise_log_gradients * K

    B = gradient_k_x * log_pdf_gradients
    C = (gradient_k_y.T * log_pdf_gradients).T
    D = second_derivative

    V_statistic = A + B + C + D

    stat = num_samples * np.mean(V_statistic)
    return V_statistic, stat

In [55]:
def get_V_statistic(samples):
    num_samples = samples.shape[0]
    dims = samples.shape[1]
    U = np.zeros((num_samples, num_samples))
    for dim in range(dims):
        U2, _ = get_statistic_multiple_dim(samples, dim)
        U += U2
    return U

In [56]:
def simulatepm(N, p_change):
    '''

    :param N:
    :param p_change:
    :return:
    '''
    X = np.zeros(N) - 1
    change_sign = np.random.rand(N) < p_change
    for i in range(N):
        if change_sign[i]:
            X[i] = -X[i - 1]
        else:
            X[i] = X[i - 1]
    return X

In [57]:
temp_bootsraped_stats = np.zeros(600)

def compute_pvalues_for_processes( U_matrix, chane_prob, num_bootstrapped_stats=100):
    N = U_matrix.shape[0]
    bootsraped_stats = np.zeros(num_bootstrapped_stats)

    for proc in range(num_bootstrapped_stats):
            # W = np.sign(orsetinW[:,proc])
        W = simulatepm(N, chane_prob)
        WW = np.outer(W, W)

        st = np.mean(U_matrix * WW)

        bootsraped_stats[proc] = N * st
            #print(N * st)

    stat = N * np.mean(U_matrix)

    print(bootsraped_stats.shape)
    temp_bootsraped_stats = np.sort(bootsraped_stats)
    return float(np.sum(bootsraped_stats > stat)) / num_bootstrapped_stats,temp_bootsraped_stats

In [58]:
def is_from_null( alpha, samples, chane_prob):
    dims = samples.shape[1]
    boots = 10 * int(dims / alpha)
    num_samples = samples.shape[0]
    U = np.zeros((num_samples, num_samples))
    for dim in range(dims):
        U2, _ = get_statistic_multiple_dim(samples, dim)
        U += U2

    p,bootsraped_stats_sorted = compute_pvalues_for_processes(U, chane_prob, boots)
    return p,bootsraped_stats_sorted

In [59]:
sigma = np.array([[1, 0.2, 0.1], [0.2, 1, 0.4], [0.1, 0.4, 1]])

X = np.random.multivariate_normal(mean=[0, 0, 0], cov=sigma, size=200)
V_statistic = get_V_statistic(X)

p_value,bootsraped_stats_sorted = is_from_null(0.05, X, 0.1)
print("p_val",p_value)

threshold = bootsraped_stats_sorted[int(p_value*600)]
print(p_value,int(p_value*600))
print('阈值为',threshold)

(600,)
p_val 0.24
0.24 144
阈值为 5.311366764451844


In [60]:
temp_bootsraped_stats = np.zeros(600)

def compute_pvalues_for_processes_1( U_matrix, chane_prob, threshold,num_bootstrapped_stats=100):
    N = U_matrix.shape[0]
    bootsraped_stats = np.zeros(num_bootstrapped_stats)

    for proc in range(num_bootstrapped_stats):
            # W = np.sign(orsetinW[:,proc])
        W = simulatepm(N, chane_prob)
        WW = np.outer(W, W)

        st = np.mean(U_matrix * WW)

        bootsraped_stats[proc] = N * st
            #print(N * st)

    stat = N * np.mean(U_matrix)


    temp_bootsraped_stats = np.sort(bootsraped_stats)

    return np.sum(bootsraped_stats > threshold)

In [61]:
def test( alpha, samples, chane_prob,threshold):
    dims = samples.shape[1]
    boots = 10 * int(dims / alpha)
    num_samples = samples.shape[0]
    U = np.zeros((num_samples, num_samples))
    for dim in range(dims):
        U2, _ = get_statistic_multiple_dim(samples, dim)
        U += U2

    ans1 = compute_pvalues_for_processes_1(U, chane_prob,threshold,boots)
    return ans1

In [62]:
ans = 0

Y = np.random.multivariate_normal([0, 0, 0], sigma, 200)

ans1 = test(0.05, Y, 0.1,threshold)
print(ans1/600)

0.76
