In [1]:
import numpy as np
import numpy.linalg as la
import matplotlib.pyplot as plt
import scipy as sp
import scipy.stats as ss
import math
import random
from collections import namedtuple
%precision 4
%matplotlib inline

# Code optimization

In order to optimize the code import *numba*

In [2]:
import numba

# Cholesky Update

*Input*: the lower triangular Cholesky factor $L$ of the covariance matrix $\pmb{\Sigma} \in \mathbb{R}^{n \times n}$,
the vector $\pmb{x} \in \mathbb{R}^{n}$, and their weights $w_1$ and $w_2$.

*Output*: the lower triangular Cholesky factor $L^\prime$ of $\pmb{\Sigma}^\prime = w_1 \pmb{\Sigma} + w_2  \pmb{x}^\top \pmb{x}$

def cholupdate(L, x, sign='+'):
    d = len(x)
    for i in range(d):
        if sign == '+':
            r = np.sqrt(L[i,i]**2 + x[i]**2)
        elif sign == '-':
            r = np.sqrt(L[i,i]**2 - x[i]**2)
        c = r/L[i,i]
        s = x[i]/L[i,i]
        L[i,i] = r
        if sign == '+':
            L[i+1:d,i] = (L[i+1:d,i] + s*x[i+1:d]) / c
        elif sign == '-':
            L[i+1:d,i] = (L[i+1:d,i] - s*x[i+1:d]) / c
        x[i+1:d] = c*x[i+1:d] - s*L[i+1:d,i]
    return L

@numba.jit
def rank_1_update_opt(L, u, alpha, beta):
    #assert alpha > 0, 'Argument alpha should be positive'
    #assert beta > 0, 'Argument beta should be positive'
    v = np.copy(u)  #Added
    d = len(u)
    L = np.sqrt(alpha)*L  #Added
    nL = np.zeros_like(L)
    b = 1
    for i in np.arange(d):
        nL[i,i] = np.sqrt(L[i,i]**2 + (beta/b)*(v[i]**2))
        gamma = b*L[i,i]**2 + beta*v[i]**2
        v[i+1:d] = v[i+1:d] - (v[i]/L[i,i])*L[i+1:d,i]
        nL[i+1:d,i] = (nL[i,i]/L[i,i])*L[i+1:d,i] + (nL[i,i]*beta*v[i]/gamma)*v[i+1:d]
        b = b + beta*(v[i]**2/L[i,i]**2)
    return nL

In [3]:
@numba.jit(nopython=True) 
def rank_1_update(L, u, alpha, beta):
    #assert alpha > 0, 'Argument alpha should be positive'
    #assert beta > 0, 'Argument beta should be positive'
    d = len(u)
    L = np.sqrt(alpha)*L  #Added
    b = 1
    nL = np.zeros_like(L)
    v = np.copy(u)  #Added
    for j in np.arange(d):
        nL[j,j] = np.sqrt(L[j,j]**2 + (beta/b)*(v[j]**2))
        gamma = b*L[j,j]**2 + beta*v[j]**2
        for k in range(j+1, d):
            v[k] = v[k] - (v[j]/L[j,j])*L[k,j]
            nL[k,j] = (nL[j,j]/L[j,j])*L[k,j] + (nL[j,j]*beta*v[j]/gamma)*v[k]
        b = b + beta*(v[j]**2/L[j,j]**2)
    return nL

## Test rank 1 update

In [4]:
def random_Gaussian(d):
    mean = np.random.random(size=d)
    X = np.random.random(size=(10*d, d))
    cov = X.T@X
    return mean, cov

def samples_random_Gaussian(d, N):
    m, C = random_Gaussian(d=d)
    samples = ss.multivariate_normal.rvs(mean=m, cov=C, size=N)
    return samples, m, C

In [5]:
def check_rank_1_update(d, alpha, beta):
    # Create a random covariance matrix, i.e. C is symmetric and positive definite.
    m, C = random_Gaussian(d)
    L = la.cholesky(C)
    
    # Create a random vector
    v = np.random.random(size=d)
    
    # Update the covariance matrix
    uC = alpha*C + beta*np.outer(v,v)
    
    # Calculate its Cholesky factor
    uL = la.cholesky(uC)
    
    # Update the Cholesky factor of the initial covariance
    nL = rank_1_update(L=L, u=v, alpha=alpha, beta=beta)
    
    # The two return statements should be equivalent.
    return np.allclose(uC, nL@nL.T) and np.allclose(uL, nL)

In [6]:
check_rank_1_update(d=100, alpha=100, beta=100)

True

In [7]:
def test_rank_1_update(dimensions, alphas, betas, nbtrials=10):
    for d in dimensions:
        for alpha in alphas:
            for beta in betas:
                for n in range(nbtrials):
                    if not check_rank_1_update(d=d, alpha=alpha, beta=beta):
                        print('d: ', d, 'n: ', n)
    else: 
        print('Pass')

In [8]:
base = np.array([2, 10, 100, 1000, 10000, 100000, 1000000, 1000000, 10000000])
Alphas = np.array([(n-1)/n for n in base])
Betas =  np.array([n/(n+1) for n in base])

In [9]:
test_rank_1_update(dimensions=[200], 
                   alphas=Alphas,
                   betas=Betas)

Pass


# Incrementally update the first two moments

In [10]:
def update_moments(mean, M2, sample, n):
    w = 1/(n+1)
    new_mean = mean + w*(sample - mean)
    delta_bf, delta_af = sample - mean, sample - new_mean
    new_M2 = M2 + np.outer(delta_bf, delta_af)
    return new_mean, new_M2

In [11]:
def calculate_moments(samples):
    N, d = samples.shape
    mean, M2 = np.zeros(d), np.zeros(shape=(d, d))
    for n in range(N):
        z_sample = samples[n]
        mean, M2 = update_moments(mean=mean, M2=M2, sample=z_sample, n=n)
    return mean, M2/(N-1)

## Test incrementally update

The algorithms above have been tested up to dimension $d=200$ of the statespace and $N= 1,000,000$ samples
and each time they gave the same result, i.e. np.allclose returned True.

In [12]:
def test_moments(d, N):
    z_samples = ss.multivariate_normal.rvs(mean=np.zeros(d), cov=np.eye(d,d), size=N)
    moments_mean, moments_cov = calculate_moments(z_samples)
    emp_mean, emp_cov = np.mean(z_samples, axis=0), np.cov(z_samples, rowvar=False)
    return np.allclose(moments_mean, emp_mean) and np.allclose(moments_cov, emp_cov)

In [13]:
test_moments(d=200, N=100)

True

# Compare incremental update of $C$ and $L$

In case of the Cholesky update we 
- First, calculate the empirical covariance $C$ of the first $2 d$ generated samples
- Next, we calculate its Cholesky factor $L$
- From then on, $L$ is updated.

In [14]:
@numba.jit
def update_mean(samples):
    N, d = samples.shape
    initial_period = 2*d
    initial_cov = np.cov(samples[:initial_period], rowvar=False)
    initial_mean = np.mean(samples[:initial_period], axis=0)
    mean = initial_mean
    for n in range(initial_period, len(samples)):
        sample = samples[n]
        w = 1/(n+1)
        mean = (1-w)*mean + w*sample
    return mean

In [15]:
@numba.jit
def update_L(samples):
    N, d = samples.shape
    initial_period = 2*d
    initial_cov = np.cov(samples[:initial_period], rowvar=False)
    initial_mean = np.mean(samples[:initial_period], axis=0)
    C = initial_cov
    L = la.cholesky(initial_cov) 
    mean = initial_mean
    for n in range(initial_period, len(samples)):
        sample = samples[n]
        w = 1/(n+1)
        L = rank_1_update(L, sample-mean, alpha=(n-1)/n, beta=w)
        mean = (1-w)*mean + w*sample
    return L@L.T

# Time to Test

BM has succesfully done the test up to dimension 200 and 1 million samples 

In [16]:
def test_Cholesky_update(d, N):
    # Generate samples of a Gaussian with random mean and covariance
    samples, mean, cov = samples_random_Gaussian(d=d, N=N)
    
    # Use numpy functions to calculate the sample mean and variance
    sample_mean, sample_cov = np.mean(samples, axis=0), np.cov(samples, rowvar=False)
    
    # Use the update functions 'updated_mean' and 'updated_cov' to calculate again the sample mean and variance
    updated_mean, updated_cov = update_mean(samples=samples), update_L(samples=samples)
    
    # Check whether they give the same results
    return np.allclose(sample_mean, updated_mean) and np.allclose(sample_cov, updated_cov)

In [17]:
test_Cholesky_update(d=100, N=1000)

True