In [746]:
import numpy as np
import matplotlib.pyplot as plt
import numpy.linalg as la
import numpy.random as rand
from scipy.stats import gaussian_kde as kde
from mpl_toolkits.mplot3d import Axes3D
from EDMtoolkit import *
from modelSystems import *

%matplotlib notebook

In [739]:
# takes a time series X with shape n,m and returns a 1 x n series of integers where
# each integer uniquely identifies the binned square where that entry of the time
# series came from.
def bin_series(X, n_bins):
    X1 = np.floor((X - np.min(X, axis=0)) / np.ptp(X, axis=0) * n_bins).astype(int)
    X1 = X1 - (X1 == n_bins).astype(int)
    return X1

def compute_keys(X, n_bins):
    num_system = np.vstack(n_bins ** np.arange(0, X.shape[1]))
    return X @ num_system

def get_prob(D, i):
    return np.mean(np.all(D == i, axis=1).astype(int))      

def entropy(D):
    entropy = 0
    for m in np.unique(D, axis=0):
        p = get_prob(D, m)
        if p > 0:
            entropy -= p * np.log2(p)
    return entropy

# conditional entropy H(X|Y) where X = D[:,:i] and Y=[:,i:]
def entropy_cond(X, Y):
    return entropy(np.hstack([X,Y])) - entropy(Y)

"""
def entropy_cond(D, i, j):
    entropy = 0
    for row in np.unique(D, axis=0):
        joint = get_prob(D, row)
        individual = get_prob(D[:,i:j,None], row[i:j,None])
        entropy -= joint * np.log2(joint / individual)
    return entropy
"""

def cond_mut_inf(X, Y, Z):
    return entropy_cond(X, Z) - entropy_cond(X, np.hstack([Y, Z]))
    # return entropy(np.hstack([X, Z])) + entropy(np.hstack([Y, Z])) - entropy(np.hstack([X, Y, Z])) - entropy(Z)
    
def create_grid(dim, res):
    n_rows = res ** dim
    grid = np.zeros((n_rows, dim))
    for i in range(n_rows):
        for j in range(dim):
            grid[i,j] = int(i / (res ** j)) % res / res
            
    return grid

In [751]:
## Drivers ##

settlingTime = 2 ** 9
tlen = 2 ** 9
end = 2 ** 12 # 2**5 # (3.498901098901099 / (12*reduction)) * tlen # 2**3
reduction = 2 ** 0

tr = np.linspace(0, end, num=tlen)
t = tr / end

In [752]:
## Drivers ##

r = lambda t: 4 # - t / tlen
# b1 = lambda t: 3 + 5 * t / end
# rho = lambda t : 28 + 50 * t / end
# sigma = lambda t : 10 + 10 * t / end
# beta = lambda t : 5.0 / 3.0 # + 3*t/end

# F = lambda t : 5 + 15 * t / end
# N = 50

In [756]:
## Time Series Generation ##

x0 = rand.rand(1)
Xr = generateTimeSeriesDiscrete("LogisticP", x0, tlen=tlen, nsargs=(r,), settlingTime=settlingTime)

# x0 = rand.random(N)
# x0 = np.array([1,0.5,7]) - rand.random(3) * 0.1

# Xr = generateTimeSeriesContinuous("Lorenz96P", x0, nsargs=(F, N), tlen=tlen, end=end, settlingTime=settlingTime, reduction= reduction)[:,N-1,None]
# Xr = generateTimeSeriesContinuous('HastingsPowellP', x0, nsargs=(b1,), end=end, tlen = tlen, reduction = reduction, settlingTime=settlingTime)[:,0,None]
# Xr = generateTimeSeriesContinuous('LorenzP', np.array([1,5,17]), nsargs=(rho, sigma, beta), tlen=tlen, end=end, settlingTime=settlingTime, reduction= reduction)[:,0,None]

## Add Noise ##
noise_magnitude = 0.0
Xr += rand.random((Xr.shape[0],1)) * np.ptp(Xr) * noise_magnitude

In [757]:
plotTS(Xr)

<IPython.core.display.Javascript object>

In [762]:
n_bins = 5
E = 1

X, Y, tf = delayEmbed(Xr, 1, E, 1, t=t)
# Xt = np.hstack([X,tf[:,None]])
Xt = X
symbols = bin_series(Xt, n_bins)

tot = 0
for i in np.unique(symbols, axis=0):
    tot += get_prob(symbols, i)
tot

1.0

In [722]:
symbols.shape[0] / np.unique(symbols, axis=0).shape[0]

21.25

In [723]:
binned_series = bin_series(Xr,n_bins)
fig, ax = plt.subplots(n_bins, figsize=(4,10))
for i in range(n_bins):
    chunk = binned_series[int(i*tlen/n_bins):int((i+1)*tlen/n_bins),0]
    ax[i].hist(chunk, label = f"Chunk {i}", alpha = 0.5, bins=n_bins)
    ax[i].legend()

<IPython.core.display.Javascript object>

In [724]:
n_trials = 50

results = np.zeros((n_trials, 2))
for i in range(n_trials):
    
    rate = rand.random(1)
    
    b1 = lambda t: 3 + 5 * t / end
    x0 = np.array([1,0.5,7]) - rand.random(3) * 0.1
    Xr = generateTimeSeriesContinuous('HastingsPowellP', x0, nsargs=(b1,), end=end, tlen = tlen, reduction = reduction, settlingTime=settlingTime)[:,0,None]

    X, Y, Z = delayEmbed(Xr, 1, E, 1, t=t)
    A = bin_series(X, n_bins)
    B = bin_series(Y, n_bins)
    C = bin_series(Z[:,None], n_bins)
    
    results[i,0] = rate
    results[i,1] = cond_mut_inf(A, B, C)

KeyboardInterrupt: 

In [713]:
fig = plt.figure()
plt.scatter(results[:,0], results[:,1])
plt.title(f"Hastings Powell (noise=0.0, {tlen} datapoints long, bins={n_bins}, E={E})")
plt.ylabel("Conditional Mutual Information")
plt.xlabel("Nonstationarity Rate")

<IPython.core.display.Javascript object>

Text(0.5, 0, 'Nonstationarity Rate')

In [725]:
n_bins = 3
E_max = 20

fig, ax = plt.subplots(1)

for E in range(E_max):
    X, Y, tf = delayEmbed(Xr, 1, E, 1, t=t)
    A = bin_series(X, n_bins)
    B = bin_series(Y, n_bins)
    C = tf[:,None]
    
    ax.scatter(entropy_cond(A,B), logLikelihood(X, Y, tf, 0, 0))

ax.set_xlabel("conditional entropy")
ax.set_ylabel("log likelihood")
plt.show()

<IPython.core.display.Javascript object>

In [726]:
def testing():
    S1=np.array([[0],
             [0],
             [1],
             [1]])
    S2=np.array([[0,0],
             [0,1],
             [1,0],
             [1,1]])
    
    assert entropy(S1) == 1
    assert entropy(S2) == 2
    assert entropy_cond(S2, S1) == 1
    assert entropy_cond(S1, S2) == 0
    
testing()

In [742]:
Xt.shape

(510, 2)

In [770]:
test_kde = kde(Xt.T, bw_method=0.1)

In [771]:
Xt

array([[0.02345811, 0.00589933],
       [0.0916313 , 0.02345811],
       [0.33294001, 0.0916313 ],
       ...,
       [0.08261512, 0.97890105],
       [0.30315945, 0.08261512],
       [0.84501519, 0.30315945]])

In [774]:
res = 20
dim = Xt.shape[1]
grid = create_grid(dim, res)

fig = plt.figure()
ax = plt.gca(projection="3d")
ax.plot_surface(u1, u2, test_kde(grid.T))
ax.set_xlabel("x_t")
ax.set_ylabel("x_t+1")
ax.set_zlabel("p")

<IPython.core.display.Javascript object>

ValueError: Argument Z must be 2-dimensional.