# Paper replication

In [None]:
%load_ext autoreload
%autoreload 2

import matplotlib.pyplot as plt
import numpy as np
import torch

from collections import Counter
from copy import deepcopy
from IPython.display import clear_output
from scipy.stats import entropy
from tqdm import tqdm

from ca_funcs import get_network_entropies, make_table_walk, make_ca, make_glider
from train_ca import initialize_model
from utils import all_combinations

from nni.compression.torch import *

%matplotlib inline
plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

In [None]:
M = 2
D = (3, 3)
ALL_INPUTS = all_combinations(M, D)
RANDOM_INPUTS = np.random.choice([0, 1], (500, 10, 10))

## Define CAs and training data

In [None]:
def sample_CAs(rng=None):   
    rng = rng or np.random.default_rng(0)
    inputs = ALL_INPUTS
    outputs = make_table_walk(len(ALL_INPUTS), rng=rng)
    for o in outputs:
        yield make_ca(inputs, o)

def generate_CA_train_data(ca, height=10, width=10, n_samples=500, rng=None, noise=0.0):
    rng = rng or np.random.default_rng(0)
    X_train = torch.from_numpy(rng.choice([0, 1], (n_samples, height, width), p=[.5, .5])).float()
    Y_train = ca(X_train).float()
    foo = Y_train.detach().clone()
    flat_Y_train = Y_train.view(-1)
    flat_Y_indices = rng.choice(range(Y_train.numel()), size=int(Y_train.numel() * noise), replace=False)
    flat_Y_train[flat_Y_indices] = 1 - flat_Y_train[flat_Y_indices]
    return X_train, Y_train

In [None]:
np.random.seed(0)

for i, ca in enumerate(sample_CAs()):
    X_test = torch.from_numpy(make_glider(10).reshape(1, 10, 10)).float()
    Y_test = ca(X_test).float()

    plt.figure(figsize=(12,4))
    plt.suptitle(i)

    plt.subplot(1,2,1)
    plt.imshow(X_test[0])
    plt.axis('off')
    plt.title("Input")

    plt.subplot(1,2,2)
    plt.imshow(Y_test[0])
    plt.axis('off')
    plt.title("Output")

    plt.show()
    plt.close()
    clear_output(wait=True)

## Find entropy of the training CA

In [None]:
def ca_entropy(ca):        
    inputs = torch.from_numpy(ALL_INPUTS)
    outputs = ca(inputs)
    output_counts = np.array(list(Counter(tuple(torch.reshape(o, [-1]).numpy()) for o in outputs).values()))
    output_ps = output_counts / len(inputs)
    return entropy(output_ps, base=2)

In [None]:
entropies = [ca_entropy(a) for a in tqdm(sample_CAs(rng=np.random.default_rng(0)))]
plt.plot(entropies)

## Training

In [None]:
def train(ca, rng=None, train_noise=0.0):
    rng = rng or np.random.default_rng(0)

    input_dims = [10, 10]
    layer_dims = [100] + [100] * 11  # neighborhood conv + mlpconv layers

    learning_rate = 1e-4

    loss = torch.nn.MSELoss()

    training_epochs = 100
    samples = 500
    batch_size = 10
    num_batches = samples // batch_size
    
    def make_model(seed=0):
        np.random.seed(seed)
        torch.random.manual_seed(seed)

        model = initialize_model(input_dims, layer_dims)
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        display(model)

        if torch.cuda.is_available():
            model.cuda()

        return model, optimizer

    def learn_CA(model, optimizer):
        losses = []
        X_train, Y_train = generate_CA_train_data(ca, *input_dims, n_samples=samples, rng=rng, noise=train_noise)
        if torch.cuda.is_available():
            X_train = X_train.cuda()
            Y_train = Y_train.cuda()

        for _ in tqdm(range(training_epochs)):
            batch_losses = []
            for i in range(num_batches):
                X_batch = X_train[i * batch_size : (i + 1) * batch_size]
                Y_batch = Y_train[i * batch_size : (i + 1) * batch_size]

                optimizer.zero_grad()
                Y_pred = model(X_batch)
                l = loss(Y_batch, Y_pred)
                l.backward()
                optimizer.step()
                batch_losses.append(l.item())
            losses.append(np.mean(batch_losses))
        return losses
    
    model, optimizer = make_model()
    losses = learn_CA(model, optimizer)
    
    return model, optimizer, losses

In [None]:
rng = np.random.default_rng(0)
model, optimizer, losses = train(
    ca=list(sample_CAs(rng=rng))[24],
    rng=rng,
    train_noise=0.05
)

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(losses)
plt.loglog();

In [None]:
x = np.random.default_rng().choice([0, 1], size=100)
# x = make_glider(10)
X_test = torch.from_numpy(x.reshape(1, 10, 10)).float()
Y_test = ca(X_test).float()

if torch.cuda.is_available():
    X_test = X_test.cuda()
Y_pred = model(X_test)


if torch.cuda.is_available():
    X_test = X_test.cpu()
    Y_pred = Y_pred.cpu()

X_test = X_test.detach().numpy()
Y_test = Y_test.detach().numpy()
Y_pred = Y_pred.detach().numpy()

plt.figure(figsize=(12, 4))

plt.subplot(141)
plt.imshow(X_test[0])
plt.axis('off')
plt.title("Input")

plt.subplot(142)
plt.imshow(Y_test[0])
plt.axis('off')
plt.title("Expected Output")

plt.subplot(143)
plt.imshow(Y_pred[0], vmin=0, vmax=1)
plt.axis('off')
plt.title("Observed Output")

plt.subplot(144)
plt.imshow((Y_pred[0] - Y_test[0]) ** 2)
plt.axis('off')
plt.title("Normalised Diff")

print('max loss:', ((Y_pred[0] - Y_test[0]) ** 2).max())

## Find model entropies

In [None]:
def calculate_entropies(model):
    def get_activations(x_input):
        activations = []
        for m in model.children():
            x_input = m(x_input)
            activations.append(x_input)
        return activations[1:-3:2]

    X_test = np.pad(all_combinations(2, (3, 3)), [(0, 0), (3, 4), (3, 4)], 'wrap')
    X_test = torch.from_numpy(X_test).float()
        
    if torch.cuda.is_available():
        X_test = X_test.cuda()

    res = [activation.cpu().detach().numpy() for activation in get_activations(X_test)]
    layer_activations = np.array(res)
    # Layer activations are floats, but to calculate entropy,
    # we want to map activations to binary values,
    # 1 if a given activation is >0, and 0 otherwise.
    binary_activations = np.digitize(layer_activations, [0], right=True)
    binary_activations = binary_activations.transpose(0, 1, -2, -1, 2) \
        .reshape(len(layer_dims), np.product(X_test.shape), layer_dims[0])
    return get_network_entropies(binary_activations)

In [None]:
ca_entropy(ca)

In [None]:
calculate_entropies(model)

## Compression

In [None]:
def prune_model_and_test(model, Pruner, config):
    model_copy = deepcopy(model)
    pruner = Pruner(model_copy, config, optimizer=optimizer)
    pruner.compress()
    return model_copy

def display_test(model):
    x = make_glider(10)
    X_test = torch.from_numpy(x.reshape(1, 10, 10)).float()
    Y_test = ca(X_test).float()

    if torch.cuda.is_available():
        X_test = X_test.cuda()
    Y_pred = model(X_test)

    if torch.cuda.is_available():
        X_test = X_test.cpu()
        Y_pred = Y_pred.cpu()

    X_test = X_test.detach().numpy()
    Y_test = Y_test.detach().numpy()
    Y_pred = Y_pred.detach().numpy()

    plt.figure(figsize=(12, 4))

    plt.subplot(141)
    plt.imshow(X_test[0])
    plt.axis('off')
    plt.title("Input")

    plt.subplot(142)
    plt.imshow(Y_test[0])
    plt.axis('off')
    plt.title("Expected Output")

    plt.subplot(143)
    plt.imshow(Y_pred[0])
    plt.axis('off')
    plt.title("Observed Output")

    plt.subplot(144)
    plt.imshow((Y_pred[0] - Y_test[0]) ** 2)
    plt.axis('off')
    plt.title("Normalised Diff")

    print('max loss:', ((Y_pred[0] - Y_test[0]) ** 2).max())

### Level Pruner

In [None]:
config_list = [{ 'sparsity': 0.1, 'op_types': ['default'] }]
m = prune_model_and_test(model, LevelPruner, config_list)
display_test(m)

### FPGM Pruner

In [None]:
config_list = [{ 'sparsity': 0.1, 'op_types': ['Conv2d'] }]
m = prune_model_and_test(model, FPGMPruner, config_list)
display_test(m)

### L1 Pruner

In [None]:
config_list = [{ 'sparsity': 0.1, 'op_types': ['Conv2d'] }]
m = prune_model_and_test(model, L1FilterPruner, config_list)
display_test(m)

### L2 Pruner

In [None]:
config_list = [{ 'sparsity': 0.1, 'op_types': ['Conv2d'] }]
m = prune_model_and_test(model, L2FilterPruner, config_list)
display_test(m)

### LotteryTicket Pruner

In [None]:
config_list = [{
    'prune_iterations': 50,
    'sparsity': 0.1,
    'op_types': ['default']
}]
m = prune_model_and_test(model, LotteryTicketPruner, config_list)
display_test(m)

## LMC Complexity

In [None]:
def lmc_complexity(P, N):
    P = np.array(P)
    P = P / P.sum()
    H = entropy(P, base=2)
    
    if N <= np.finfo(P.dtype).max:
        uniform_ps = np.full(len(P), 1.0 / N)
        D = np.sum((P - uniform_ps) ** 2) + (N - len(P)) * (1.0 / N)**2
    else:
        # assuming N >> len(P) >= 1 >= P, so that len(P)/N, 1/N and P/N are negligible
        D = np.sum(P ** 2)
        
    return H * D

def ca_lmc(ca, m=M, d=D):  
    inputs = torch.from_numpy(ALL_INPUTS)
    outputs = ca(inputs)
    output_counts = np.array(list(Counter(tuple(torch.reshape(o, [-1]).numpy()) for o in outputs).values()))
    output_ps = output_counts / len(inputs)
    return lmc_complexity(output_ps, m ** np.product(d))

In [None]:
lmcs = [ca_lmc(a) for a in tqdm(sample_CAs(np.random.default_rng(0)))]
plt.plot(lmcs)

In [None]:
def get_network_lmcs(layers_samples_neurons):
    neuron_lmcs_by_layer = []
    layer_lmcs = []
    
    layer_count = layers_samples_neurons.shape[0]
    neuron_count = layers_samples_neurons.shape[2]
    
    for l in layers_samples_neurons:
        neuron_ps = l.mean(axis=0)
        neuron_lmcs_by_layer.append(np.array([lmc_complexity([p, 1-p], 2) for p in neuron_ps]))
        
        layer_patterns = (tuple(sample) for sample in l)
        layer_pattern_counts = list(Counter(layer_patterns).values())
        layer_lmcs.append(lmc_complexity(layer_pattern_counts, 2 ** neuron_count))
        
    network_patterns = (tuple(sample.ravel()) for sample in layers_samples_neurons.swapaxes(0, 1))
    network_pattern_counts = list(Counter(network_patterns).values())
    network_lmc = lmc_complexity(network_pattern_counts, 2 ** (layer_count * neuron_count))
    
    return network_lmc, layer_lmcs, neuron_lmcs_by_layer

def model_lmc(model):
    def get_activations(x_input):
        activations = []
        for m in model.children():
            x_input = m(x_input)
            activations.append(x_input)
        return activations[1:-3:2]

    X_test = np.pad(all_combinations(2, (3, 3)), [(0, 0), (3, 4), (3, 4)], 'wrap')
    X_test = torch.from_numpy(X_test).float()
        
    if torch.cuda.is_available():
        X_test = X_test.cuda()

    res = [activation.cpu().detach().numpy() for activation in get_activations(X_test)]
    layer_activations = np.array(res)
    # Layer activations are floats, but to calculate lmc complexity,
    # we want to map activations to binary values,
    # 1 if a given activation is >0, and 0 otherwise.
    binary_activations = np.digitize(layer_activations, [0], right=True)
    binary_activations = binary_activations.transpose(0, 1, -2, -1, 2) \
        .reshape(len(layer_dims), np.product(X_test.shape), layer_dims[0])
    return get_network_lmcs(binary_activations)

In [None]:
ca_lmc(ca)

In [None]:
model_lmc(model)

In [None]:
cas = np.array([ca for ca in sample_CAs(rng=np.random.default_rng(0))])
ca_entropies = np.array([ca_entropy(ca) for ca in cas])
ca_lmcs = np.array([ca_lmc(ca) for ca in cas])

In [None]:
i_test = np.r_[0:8:2, 8:128:16, 128:256:32]
i_test = np.sort(np.r_[i_test, 511 - i_test, 255])

In [None]:
plt.subplot(211)
plt.plot(ca_entropies)
plt.plot(i_test, ca_entropies[i_test], 'r.')
plt.subplot(212)
plt.plot(ca_lmcs)
plt.plot(i_test, ca_lmcs[i_test], 'r.')
plt.show()