In [1]:
# ----------------------------------------------------------------------
# Numenta Platform for Intelligent Computing (NuPIC)
# Copyright (C) 2019, Numenta, Inc.  Unless you have an agreement
# with Numenta, Inc., for a separate license for this software code, the
# following terms and conditions apply:
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero Public License version 3 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Affero Public License for more details.
#
# You should have received a copy of the GNU Affero Public License
# along with this program.  If not, see http://www.gnu.org/licenses.
#
# http://numenta.org/licenses/
# ----------------------------------------------------------------------

In [None]:
# Uncomment the following line to install nupic.torch
!pip install -e git+https://github.com/numenta/nupic.torch.git#egg=nupic.torch

In [3]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

torch.manual_seed(18)
np.random.seed(18)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
class RandomNoise(object):
  """
  An image transform that adds noise to random pixels in the image.
  """
  def __init__(self, noiselevel=0.0, whiteValue=0.1307 + 2*0.3081):
    """
    :param noiselevel:
      From 0 to 1. For each pixel, set its value to whiteValue with this
      probability. Suggested whiteVolue is 'mean + 2*stdev'
    """
    self.noiseLevel = noiselevel
    self.whiteValue = whiteValue

  def __call__(self, image):
    a = image.view(-1)
    numNoiseBits = int(a.shape[0] * self.noiseLevel)
    noise = np.random.permutation(a.shape[0])[0:numNoiseBits]
    a[noise] = self.whiteValue
    return image


def train(model, loader, optimizer, criterion):
    """
    Train the model using given dataset loader. 
    Called on every epoch.
    :param model: pytorch model to be trained
    :type model: torch.nn.Module
    :param loader: dataloader configured for the epoch.
    :type loader: :class:`torch.utils.data.DataLoader`
    :param optimizer: Optimizer object used to train the model.
    :type optimizer: :class:`torch.optim.Optimizer`
    :param criterion: loss function to use
    :type criterion: function
    """
    model.train()
    for batch_idx, (data, target) in enumerate(loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()


def test(model, loader, criterion):
    """
    Evaluate pre-trained model using given dataset loader.
    Called on every epoch.
    :param model: Pretrained pytorch model
    :type model: torch.nn.Module
    :param loader: dataloader configured for the epoch.
    :type loader: :class:`torch.utils.data.DataLoader`
    :param criterion: loss function to use
    :type criterion: function
    :return: Dict with "accuracy", "loss" and "total_correct"
    """
    model.eval()
    loss = 0
    total_correct = 0
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss += criterion(output, target, reduction='sum').item() # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
            total_correct += pred.eq(target.view_as(pred)).sum().item()
    
    return {"accuracy": total_correct / len(loader.dataset), 
            "loss": loss / len(loader.dataset), 
            "total_correct": total_correct}

### Parameters

In [5]:
# CNN layer configuration
IN_CHANNELS = 1
OUT_CHANNELS = 30
KERNEL_SIZE = 5
WIDTH = 28
CNN_OUTPUT_LEN = OUT_CHANNELS * ((WIDTH - KERNEL_SIZE + 1) // 2) ** 2 

# Linear layer configuration
HIDDEN_SIZE = 150
OUTPUT_SIZE = 10    

# Sparsity parameters
WEIGHT_SPARSITY = 0.3

# K-Winners parameters
K = 50
CNN_K = 400
K_INFERENCE_FACTOR = 1.5
BOOST_STRENGTH = 1.4
BOOST_STRENGTH_FACTOR = 0.85

# Training parameters
LEARNING_RATE = 0.01
MOMENTUM = 0.5
EPOCHS = 10
FIRST_EPOCH_BATCH_SIZE = 4
TRAIN_BATCH_SIZE = 64
TEST_BATCH_SIZE = 1000

# Noise test parameters
NOISE_LEVELS = [0.05, 0.10, 0.15, 0.20, 0.25]

###  Sparse CNN Model
Create a sparse CNN network composed of one convolution layer followed by a sparse linear layer with using k-winner activation between the layers

In [6]:
from nupic.torch.modules import (
    KWinners2d, KWinners, SparseWeights, Flatten, rezeroWeights, updateBoostStrength)

sparseCNN = nn.Sequential(
    # Sparse CNN layer
    nn.Conv2d(in_channels=IN_CHANNELS, out_channels=OUT_CHANNELS, kernel_size=KERNEL_SIZE),
    KWinners2d(n=CNN_OUTPUT_LEN, channels=OUT_CHANNELS,
               k=CNN_K, kInferenceFactor=K_INFERENCE_FACTOR,
               boostStrength=BOOST_STRENGTH, boostStrengthFactor=BOOST_STRENGTH_FACTOR),
    nn.MaxPool2d(kernel_size=2),

    # Flatten max pool output before passing to linear layer
    Flatten(),

    # Sparse Linear layer
    SparseWeights(nn.Linear(CNN_OUTPUT_LEN, HIDDEN_SIZE), WEIGHT_SPARSITY),
    KWinners(n=HIDDEN_SIZE, k=K, kInferenceFactor=K_INFERENCE_FACTOR,
             boostStrength=BOOST_STRENGTH, boostStrengthFactor=BOOST_STRENGTH_FACTOR),

    # Output layer
    nn.Linear(HIDDEN_SIZE, OUTPUT_SIZE),
    nn.LogSoftmax(dim=1)
).to(device)

### Load MNIST Dataset

In [7]:
normalize = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])
train_dataset = datasets.MNIST('data', train=True, download=True, transform=normalize)
test_dataset = datasets.MNIST('data', train=False, transform=normalize)

# Configure data loaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=TRAIN_BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=TEST_BATCH_SIZE, shuffle=True)
first_loader = torch.utils.data.DataLoader(train_dataset, batch_size=FIRST_EPOCH_BATCH_SIZE, shuffle=True)

### Train
On the first epoch we use smaller batch size to calculate the duty cycles used by the k-winner function. Once the duty cycles stabilize we can use larger batch sizes.

In [8]:
sgd = optim.SGD(sparseCNN.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM)
train(model=sparseCNN, loader=first_loader, optimizer=sgd, criterion=F.nll_loss)

After each epoch we rezero the weights to keep the initial sparsity constant during training. We also apply the boost strength factor after each epoch

In [9]:
%%capture
sparseCNN.apply(rezeroWeights)
sparseCNN.apply(updateBoostStrength)

Test and print results

In [10]:
test(model=sparseCNN, loader=test_loader, criterion=F.nll_loss)

{'accuracy': 0.9782, 'loss': 0.06787856979370117, 'total_correct': 9782}

At this point the duty cycles should be stable and we can train on larger batch sizes

In [11]:
for epoch in range(1, EPOCHS):
    train(model=sparseCNN, loader=train_loader, optimizer=sgd, criterion=F.nll_loss)
    sparseCNN.apply(rezeroWeights)
    sparseCNN.apply(updateBoostStrength)
    results = test(model=sparseCNN, loader=test_loader, criterion=F.nll_loss)
    print(results)

{'accuracy': 0.9862, 'loss': 0.0412615779876709, 'total_correct': 9862}
{'accuracy': 0.9868, 'loss': 0.04029187545776367, 'total_correct': 9868}
{'accuracy': 0.9867, 'loss': 0.03934368209838867, 'total_correct': 9867}
{'accuracy': 0.9876, 'loss': 0.03759277114868164, 'total_correct': 9876}
{'accuracy': 0.9877, 'loss': 0.03777754402160644, 'total_correct': 9877}
{'accuracy': 0.9872, 'loss': 0.03784116630554199, 'total_correct': 9872}
{'accuracy': 0.9872, 'loss': 0.03829168930053711, 'total_correct': 9872}
{'accuracy': 0.9876, 'loss': 0.03837165260314941, 'total_correct': 9876}
{'accuracy': 0.9871, 'loss': 0.03919747161865234, 'total_correct': 9871}


### Noise
Add noise to the input and check the test accuracy

In [12]:
for noise in NOISE_LEVELS:
    noise_transform = transforms.Compose([transforms.ToTensor(), RandomNoise(noise), 
                                      transforms.Normalize((0.1307,), (0.3081,))])
    noise_dataset = datasets.MNIST('data', train=False, transform=noise_transform)
    noise_loader = torch.utils.data.DataLoader(noise_dataset, batch_size=TEST_BATCH_SIZE, shuffle=True)

    results = test(model=sparseCNN, loader=noise_loader, criterion=F.nll_loss)
    print(noise, ":", results)

0.05 : {'accuracy': 0.9841, 'loss': 0.05029125137329102, 'total_correct': 9841}
0.1 : {'accuracy': 0.9777, 'loss': 0.06853677139282227, 'total_correct': 9777}
0.15 : {'accuracy': 0.9697, 'loss': 0.09935387115478515, 'total_correct': 9697}
0.2 : {'accuracy': 0.9509, 'loss': 0.1598887924194336, 'total_correct': 9509}
0.25 : {'accuracy': 0.9225, 'loss': 0.2412475631713867, 'total_correct': 9225}
