In [1]:
import numpy as np
from sklearn.datasets import make_sparse_spd_matrix
from scipy import linalg as LA
import random

from infoband.band_info import InfoCorrBand
from wlpy.covariance import Covariance
from utils.adpt_correlation_threshold import AdptCorrThreshold

In [2]:
def cov2cor(S: np.ndarray):
    D = np.diag(np.sqrt(np.diag(S)))
    D_inv = np.linalg.inv(D)
    return D_inv @ S @ D_inv

In [3]:
def gen_S_AR1(rho = 0.8,N = 500) -> np.ndarray:
    # generate the covariance matrix of AR(1) process
    S_block = np.zeros(shape=[N, N])
    for j in range(0, N):
        S_block = S_block + np.diag(np.ones(N-j)*(rho**j), -j) + \
        np.diag(np.ones(N-j)*(rho**j), j)
    S = S_block - np.eye(N)
    return S

In [4]:
rng = np.random.RandomState(100)
N = 5
T = 80
alpha = 0.3

In [5]:
# S = gen_S_AR1(N = N)
S = make_sparse_spd_matrix(N, alpha = alpha, random_state = 100)
# print(S[:5, :5])
X = rng.multivariate_normal(mean = np.zeros(N), cov = S, size = T)
S[:5, :5]

array([[ 1.33993988,  0.10734891,  0.32566981, -0.58304363,  0.        ],
       [ 0.10734891,  1.19837415, -0.30406303, -0.18411815, -0.40555476],
       [ 0.32566981, -0.30406303,  3.12458156, -0.81232925, -0.88473669],
       [-0.58304363, -0.18411815, -0.81232925,  1.        ,  0.        ],
       [ 0.        , -0.40555476, -0.88473669,  0.        ,  1.        ]])

In [6]:
c = InfoCorrBand(X)
# c.sample_cov()[:3, :3]
# c.sample_corr()[:3, :3]



In [7]:
R = cov2cor(S)
L = abs(R)
c.feed_info(L)
# print(L[:5, :5])

In [8]:
# c.find_biggest_k_for_pd()
# c.plot_k_pd(range(N-50, N+1))

In [9]:
k = c.k_by_cv()
print(k)

4


In [10]:
R_est = c.fit_info_corr_band(k)
S_est = c.fit_info_cov_band(k)

In [11]:
def show_rs(S: np.ndarray, 
            c: InfoCorrBand, m: Covariance, 
            ord = 'fro'):
    # ord: norm type
    R = cov2cor(S)
    print('Correlation itself', LA.norm(R, ord))
    print('Error:')
    print('Sample', LA.norm(c.sample_corr() - R, ord))
    print('Linear Shrinkage', LA.norm(cov2cor(m.lw_lin_shrink()) - R, ord))
    print('Nonlinear Shrinkage', LA.norm(cov2cor(m.nonlin_shrink()) - R, ord))
    print()
    print('Covariance itself', LA.norm(S, ord))
    print('Error:')
    print('Sample', LA.norm(c.sample_cov() - S, ord))
    print('Linear Shrinkage', LA.norm(m.lw_lin_shrink() - S, ord))
    print('Nonlinear Shrinkage', LA.norm(m.nonlin_shrink() - S, ord))
    return

In [12]:
m = Covariance(X)

In [13]:
show_rs(S, c, m, 'fro')
print(LA.norm(R - R_est))
print(LA.norm(S - S_est))

Correlation itself 2.6222662577785822
Error:
Sample 0.32751543886522594
Linear Shrinkage 0.3802180838363591
Nonlinear Shrinkage 0.3506618624587565

Covariance itself 4.401878119040805
Error:
Sample 0.5895426668681583
Linear Shrinkage 0.7078858003552759
Nonlinear Shrinkage 0.6714543961928682
0.34269183856481744
0.6910584474904503


In [14]:
show_rs(S, c, m, 2)
print(LA.norm(R - R_est, 2))
print(LA.norm(S - S_est, 2))

Correlation itself 1.8974573235651087
Error:
Sample 0.24026463097272413
Linear Shrinkage 0.3054772769972123
Nonlinear Shrinkage 0.25534481925739366

Covariance itself 3.758470152622895
Error:
Sample 0.5130198573926674
Linear Shrinkage 0.45018745396773413
Nonlinear Shrinkage 0.5113428968345741
0.2684523169006721
0.5034816294812844


In [15]:
show_rs(S, c, m, 1)
print(LA.norm(R - R_est))
print(LA.norm(S - S_est))

Correlation itself 2.276366037990635
Error:
Sample 0.34922317222021954
Linear Shrinkage 0.4530580793230566
Nonlinear Shrinkage 0.3900350621700508

Covariance itself 5.451380330856558
Error:
Sample 0.6114668880730916
Linear Shrinkage 0.7868423475374023
Nonlinear Shrinkage 0.7114307028041474
0.34269183856481744
0.6910584474904503


In [65]:
def gen_eta_sequence(N, eta = 0.5, draw_type = 'random', is_random = False, rand_seed = 100) -> np.ndarray:
    '''
    Generate a sequence b, which is a permutation of {1, ..., N}. 
    b satisfies the property: for any 0 < k < N+1, b[0]~b[k-1] include {1, ..., ceil(eta*k)}.  
    
    draw_type : {'random', 'near'}
        Algorithms about how to draw ( {b[0], ..., b[k-1]} - {1, ..., int(eta*k)} ). Here '-' is a subtraction between two sets.
    is_random : bool
        If False, we use random_seed as random seed, for repeat running results.
    random_seed : int
    '''
    if is_random:
        rng = random
    else:
        rng = np.random.RandomState(rand_seed)
        
    b = [1] # Default to keep the diagonal element in covariance estimation.
    b_complement = [i for i in range(2, N + 1)] # b's complement set
    
    for k in range(2, N + 1):
        # consider k-th element
        th = int(np.ceil(eta * k))
        # S^L_k include S^d_{th}
        cnt = sum([1 if num <= th else 0 for num in b])
        if cnt < th:
            for next_id in range(1, th + 1):
                if next_id not in b:
                    b.append(next_id)
                    b_complement.remove(next_id)
                    break
        else:
            # len(b_complement) == N + 1 - k
            j = rng.randint(0, N - k) if N - k > 0 else 0
            next_id = b_complement[j] 
            b.append(next_id)
            b_complement.remove(next_id)
    return np.array(b)

In [66]:
def gen_L(S, eta, verbose = False, draw_type = 'random', is_random = False, rand_seed = 100):
    N = S.shape[0]
    new_rowSort = np.zeros((N, N))
    
    R = cov2cor(S)
    L = abs(R)
    rowSort = InfoCorrBand(X = np.eye(N), L = L).rowSort # You can ignore the 'X = np.eye(N)' parameter. I create this temporary object solely to get 'rowSort' matrix.
    
    for i in range(N):
        row = rowSort[i]
        argst = row.argsort()
        b = gen_eta_sequence(N, eta, draw_type, is_random, rand_seed = 100)
        for j in range(N):
            new_rowSort[i][argst[j]] = b[j]
    
    L_eta = 1 / new_rowSort
    res = (L_eta, new_rowSort, rowSort)
    return res if verbose else L_eta

In [69]:
res = gen_L(S, 0.5, verbose = 1, is_random = 1)
print(res[2])
print(res[1]) 

[[1. 4. 3. 2. 5.]
 [5. 1. 4. 3. 2.]
 [4. 5. 1. 3. 2.]
 [2. 4. 3. 1. 5.]
 [5. 3. 2. 4. 1.]]
[[1. 3. 4. 2. 5.]
 [5. 1. 4. 2. 3.]
 [4. 3. 1. 2. 5.]
 [5. 4. 2. 1. 3.]
 [3. 2. 4. 5. 1.]]


In [None]:
gen_L(S, 0.5, verbose = 1)

(array([[1.        , 0.33333333, 0.25      , 0.5       , 0.2       ],
        [0.2       , 1.        , 0.33333333, 0.25      , 0.5       ],
        [0.33333333, 0.2       , 1.        , 0.25      , 0.5       ],
        [0.5       , 0.33333333, 0.25      , 1.        , 0.2       ],
        [0.2       , 0.25      , 0.5       , 0.33333333, 1.        ]]),
 array([[1., 3., 4., 2., 5.],
        [5., 1., 3., 4., 2.],
        [3., 5., 1., 4., 2.],
        [2., 3., 4., 1., 5.],
        [5., 4., 2., 3., 1.]]))

In [None]:
1 / gen_L(S, 0.5)

array([[1.        , 0.33333333, 0.25      , 0.5       , 0.2       ],
       [0.2       , 1.        , 0.33333333, 0.25      , 0.5       ],
       [0.33333333, 0.2       , 1.        , 0.25      , 0.5       ],
       [0.5       , 0.33333333, 0.25      , 1.        , 0.2       ],
       [0.2       , 0.25      , 0.5       , 0.33333333, 1.        ]])

In [None]:
gen_eta_sequence()

In [None]:
R

array([[ 1.        ,  0.0847147 ,  0.15916182, -0.50368428,  0.        ],
       [ 0.0847147 ,  1.        , -0.15713447, -0.16819008, -0.3704702 ],
       [ 0.15916182, -0.15713447,  1.        , -0.45955359, -0.50051616],
       [-0.50368428, -0.16819008, -0.45955359,  1.        ,  0.        ],
       [ 0.        , -0.3704702 , -0.50051616,  0.        ,  1.        ]])

In [None]:
S

array([[ 1.33993988,  0.10734891,  0.32566981, -0.58304363,  0.        ],
       [ 0.10734891,  1.19837415, -0.30406303, -0.18411815, -0.40555476],
       [ 0.32566981, -0.30406303,  3.12458156, -0.81232925, -0.88473669],
       [-0.58304363, -0.18411815, -0.81232925,  1.        ,  0.        ],
       [ 0.        , -0.40555476, -0.88473669,  0.        ,  1.        ]])

In [None]:
L_eta = np.zeros((N, N))
for i in range(N):
    row = rowSort[i]
    argst = row.argsort()
    b = gen_eta_sequence(N, rand_seed = 100)
    for j in range(N):
        L_eta[i][argst[j]] = b[j]

In [None]:
rowSort[0]

array([1., 4., 3., 2., 5.])

In [None]:
L_eta[0]

array([1., 3., 4., 2., 5.])

In [None]:
row = rowSort[0]
# np.array([1, 3, 4, 2]).argsort()
argst = row.argsort()
new_row = np.zeros(N)
for i in range(N):
    new_row[argst[i]] = b[i]
new_row

array([1., 3., 4., 2., 5.])

In [None]:
np.zeros((2, 2))

array([[0., 0.],
       [0., 0.]])

In [None]:
rng.randint(0, 1)

0

In [None]:
1 if False else 2

2

In [None]:
2 <= 4 * 0.5

True

In [None]:
int(3.5)

3

In [None]:
rng.randint(1, 2)

1