In [10]:
%matplotlib tk
# --------------------------------------------------
#
# Training HMMs at different levels of a Laplacian Pyramid
#
# --------------------------------------------------

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from hmmlearn import hmm
import warnings
# warnings.filterwarnings("ignore")
import hmmlearn
# print('hmmlearn', hmmlearn.__version__)

# params
DATASET = 'pills-s0'
BLUR = 5
SUBSAMPLE = 2
TIERS = 4
N = 5
M = 2
COVAR = 'diag'
ITERS = 100
# tqdm params
TS = 5
# derived tqdm constants
STEPS = 3 + (TS*2)
TOTAL = STEPS * TIERS
# other derived constants
params = [N, M, COVAR, ITERS]
event_labels = {i:i for i in range(N)}


def set_colrow_labels(axes):
    cols = ['original'] + ['x1/{}'.format(SUBSAMPLE ** col) for col in range(1, TIERS)]
    rows = ['original', 'difference']
    for ax, col in zip(axes[0], cols):
        ax.set_title(col)
    for ax, row in zip(axes[:,0], rows):
        ax.set_ylabel(row, rotation=0, size='large')

def boundaries(v):
    bounds = [0] + list(np.where(v[:-1] != v[1:])[0] + 1) + [len(v)-1]
    return [(bounds[i], bounds[i+1], v[bounds[i]]) for i in range(len(bounds)-1)]

def to_range(l, cmap):
    total = cmap.N
    idx = int((l[-1] / N) * total)
    return cmap(idx)

def train_model(x):
    model = hmm.GMMHMM(N,M,covariance_type=COVAR, n_iter=ITERS)
    model.fit(x)
    z = model.predict(x)
    return z

def blur(x, n):
    if n % 2 == 0:
        raise ValueError('n must be odd, given: {}'.format(n))
    p = int((n-1) / 2)
    dims = x.shape[1]
    _x = np.pad(x,((p,p),(0,0)), mode='edge')
    _x = [np.convolve(_x[:,i], np.ones((n,))/n, mode='valid') for i in range(dims)]
    return np.stack(_x, axis=-1)



def plot_bounds(ax, x, z):
    Zb = boundaries(z)
    ax.plot(x[:,:-3], alpha=.7)
    ax.set_ylim(-5000, 5000)
    # plot labels
    _cmap = plt.get_cmap('nipy_spectral')
    plots = {}
    for l in Zb:
        ev = event_labels[l[-1]]
        i = ax.axvspan(l[0], l[1], color=to_range(l, _cmap), alpha=0.5, label=ev)
        plots[ev] = i
    # legend
    legs = [plots[event_labels[i]] for i in sorted(event_labels)]
    nms = [event_labels[i] for i in sorted(event_labels)]
    ax.legend(legs, nms)




# file and data
FILE = 'data/' + DATASET + '.npy'
X0 = np.load(FILE)


# Create Pyramids/Train Models
X, D = [X0], []
Zx, Zd = [], []
with tqdm(total=TOTAL) as pbar:
    for i in range(TIERS):
        xi = X[i]
        # blur image
        x_blur = blur(xi,BLUR)
        pbar.update(1)
        # take difference
        diff = xi - x_blur
        D.append(diff)
        pbar.update(1)
        # down-sample
        x_next = x_blur[::SUBSAMPLE]
        X.append(x_next)
        pbar.update(1)
        # train x model
        zx = train_model(xi)
        Zx.append(zx)
        pbar.update(TS)
        # train diff model
        zd = train_model(diff)
        Zd.append(zd)
        pbar.update(TS)



fig, axes = plt.subplots(nrows=2, ncols=TIERS, sharey='col', squeeze=False)
print('axes:', axes.shape)
# Plot
for i, ax in enumerate(axes.T):
    # original
    plot_bounds(ax[0], X[i], Zx[i])
    # difference
    plot_bounds(ax[1], D[i], Zd[i])

# draw titles for each column/row
set_colrow_labels(axes)    


# fig.tight_layout()
plt.show()

100%|██████████| 52/52 [00:06<00:00, 10.10it/s]


axes: (2, 4)
