# Simple PixelRNN on digits dataset

In this task we will try to train [PixelRNN](https://arxiv.org/pdf/1601.06759.pdf) on sklearn's digits dataset (sklearn.datasets.load_digits). 

Your task is to implement `Linear`, `ReLU`, `LogSoftMax`, `CrossEntropyLoss` and `SimpleReluRNN` modules (forward pass and gradient computations using numpy) and sampling code to sample from the network output distribution and for the reconstruction example at the end of this notebook. Read all the comments and look for instructions in TODOs. Please delegate as much logic to numpy as you can in forward and backward pass computations to speed up training on CPU. It is possible to implement all modules except SimpleReluRNN without for-loops and SimpleReluRNN with a for-loop over time.

There are tests included to test your implementation against PyTorch.

After filling out this notebook, please send it to s2t \[at\] tinkoff.ru **with your name and \[Test task\] tag in the subject line**.

### Modules

In [None]:
import numpy as np

In [None]:
class Module:
    """All neural network modules inherit from this class."""
    def __init__(self):
        """Initialize parameters and gradients here."""
        pass

    def forward(self, module_input):
        """Forward pass: calculate module output based on its input
        
        This method may share any intermediate state with backward pass via `self`
        Args:
            module_input: numpy array, input to this module
        """
        raise NotImplementedError("forward not implemented")

    def backward(self, module_input, grad_output):
        """Backward pass: calculate gradients w.r.t. module parameters and its input.
        
        No need to override this method, you can override backward_grad_input and (if module has params)
        backward_grad_params separatly.
        Args:
            module_input: numpy array, input to this module, the same array that was passed to forward
            grad_output: numpy array of shape self.forward(module_input).shape, gradients w.r.t. module output
        """
        grad_input = self.backward_grad_input(module_input, grad_output)
        self.backward_grad_params(module_input, grad_output)
        return grad_input
    
    def backward_grad_input(self, module_input, grad_output):
        """Calculate gradients w.r.t. input and return them. The returned array must be of the same shape as
        module_input. This method may share any intermediate state with backward_grad_params via `self`."""
        raise NotImplementedError("backward_grad_input not implemented")
        
    def backward_grad_params(self, module_input, grad_output):
        """Calculate gradients w.r.t. module parameters and save them for later retrieval via self.grad_params."""
        pass
        
    def zero_grad_params(self): 
        """Zero out accumulated gradients."""
        pass
        
    @property
    def params(self):
        """Return list of module paramteres. Each parameter is a numpy array of arbitrary shape."""
        return []
        
    @property
    def grad_params(self):
        """Return list of accumulated gradients w.r.t. parameters.
        
        Returned list should be of the same length as self.params. Each element in list must be of the same shape
        as corresponding element in self.params"""
        return []


class Sequential(Module):
    """Example implementation of Sequential module.
    
    Sequential module acts like [torch.nn.Sequential](https://pytorch.org/docs/stable/nn.html#sequential).
    """
    def __init__ (self, *modules):
        self._modules = modules
        self._module_outputs = []

    def forward(self, module_input):
        self._module_outputs.clear()
        for module in self._modules:
            module_input = module.forward(module_input)
            self._module_outputs.append(module_input)
        return module_input

    def backward(self, module_input, grad_output):
        modules_reversed = list(reversed(self._modules))
        for module, prev_module_output in zip(modules_reversed, self._module_outputs[:-1][::-1]):
            grad_output = module.backward(prev_module_output, grad_output)
        grad_input = self._modules[0].backward(module_input, grad_output)
        return grad_input
      
    def zero_grad_params(self): 
        for module in self._modules:
            module.zero_grad_params()
    
    @property
    def params(self):
        return [module.params for module in self._modules]
    
    @property
    def grad_params(self):
        return [module.grad_params for module in self._modules]

In [None]:
class Linear(Module):
    """Linear module is the simple module that does the following computation:
    
    y = xW^T + b,
    
    where 
        * x is module input of shape (num_samples, n_in),
        * y is module output of shape (num_samples, n_out),
        * W is parameter matrix of shape (n_out, n_in) and
        * b is bias vector of shape (n_out,)
    """
    def __init__(self, n_in, n_out):
        # This is a nice initialization
        stdv = 1.0 / np.sqrt(n_in)
        self.W = np.random.uniform(-stdv, stdv, size=(n_out, n_in))
        self.b = np.random.uniform(-stdv, stdv, size=n_out)
        self.grad_W = np.zeros_like(self.W)
        self.grad_b = np.zeros_like(self.b)
        
    def forward(self, module_input):
        # TODO: calculate module output using the formula above
        # TODO: add your implementation here
        pass
    
    def backward_grad_input(self, module_input, grad_output):
        # TODO: calculate gradients w.r.t. x (dL/dx, where L is some loss function) based
        # on x and gradients w.r.t. y (dL/dy). Derive neccessary formulas. 
        # TODO: add your implementation here
        pass
    
    def backward_grad_params(self, module_input, grad_output):
        # TODO: Calculate gradients w.r.t. W and b (dL/dW, dL/db) based on x and dL/dy
        # TODO: add your implementation here
        pass
    
    def zero_grad_params(self):
        self.grad_W.fill(0)
        self.grad_b.fill(0)
    
    @property
    def params(self):
        return [self.W, self.b]
    
    @property
    def grad_params(self):
        return [self.grad_W, self.grad_b]


In [None]:
class ReLU(Module):
    """ReLU is a module that implements the following function:
    
    y = max(x, 0)
    """
    def forward(self, module_input):
        # TODO: calculate module output based on the formula above
        # TODO: add your implementation here
        pass
    
    def backward_grad_input(self, module_input, grad_output):
        # TODO: calculate gradients w.r.t. x based on gradients w.r.t. y
        # TODO: add your implementation here
        pass

In [None]:
class LogSoftMax(Module):
    """LogSoftMax is a module that implements natural logarithm of softmax function
    (https://en.wikipedia.org/wiki/Softmax_function, https://pytorch.org/docs/stable/nn.html#torch.nn.LogSoftmax)
    
    y_ij = log_softmax(x)_ij = log(exp(x_ij) / \sum_k(exp(x_ik)))
    
    where x is the module input of shape (num_samples, num_classes) and y is the module output of the same shape
    
    LogSoftMax is invariant to scalar shifts:
    
    log_softmax(x + \alpha)_ij = log(exp(x_ij + \alpha) / \sum_k(exp(x_ik + \alpha))) = 
    log(exp(x_ij) / \sum_k(exp(x_ik))) = log_softmax(x)_ij
    
    For numerical stability, it is important to normalize the input before calculations
    (set \alpha_ij = max_j(x_ij) in the scalar shift invariance formula). 
    """
    def forward(self, module_input):
        # TODO: Start with normalization for numerical stability, then implement logsoftmax based on formula above.
        # You may want to save module output for later reuse in backward pass computations. Scalar shift is invisible
        # for gradient calculations, so you don't need to keep track of normalization constant for later use in
        # backward pass.
        # TODO: add your implementation here
        pass
    
    def backward_grad_input(self, module_input, grad_output):
        # TODO: implement gradients w.r.t. x based on gradients w.r.t. y
        # TODO: add your implementation here
        pass


In [None]:
class CrossEntropyLoss(Module):
    """CrossEntropyLoss is a module that implements cross entropy function between some distribution and
    a target distribution (typically one-hot encoded). See https://en.wikipedia.org/wiki/Cross_entropy
    
    y = - \sum_ij(target_ij * logprob_ij) / logprob.shape[0]
    
    where:
        * target is a numpy array of shape (num_samples, num_classes), target distribution, typically one-hot encoded
        * logprob is a numpy array of shape (num_samples, num_classes), logarithm of some probability distribution, 
            typically output of LogSoftMax layer
        * y is a scalar, the final cross entropy between logprob and target
    """
    def forward(self, logprob, target):
        # TODO: calculate cross entropy using the formula above.
        # TODO: add your implementation here
        pass

    def backward_grad_input(self, logprob, target):
        # TODO: calculate gradients w.r.t. logprob: (dy/dlogprob)
        # TODO: add your implementation here
        pass

In [None]:
class SimpleReluRNN(Module):
    """SimpleReluRNN is a module that implements vanilla rnn with relu activation
    (https://pytorch.org/docs/stable/nn.html#torch.nn.RNN)
    
    y_t = h_t = max(Wx_t + Uh_{t-1}, 0)
    
    where:
        X = {x_1, x_2, ...} is a module input of shape (num_samples, input_size)
        H = {h_1, h_2, ...} is a module hidden states of shape (num_samples, hidden_size)
        Y = {y_1, y_2, ...} is a module output of shape (num_samples, hidden_size)
        
    h_0 is typically initialized with zeros (np.zeros(hidden_size))
    """
    def __init__(self, input_size, hidden_size):
        stdv_W = 1 / np.sqrt(input_size)
        self.W = np.random.uniform(-stdv_W, stdv_W, size=(hidden_size, input_size))
        stdv_U = 1 / np.sqrt(hidden_size)
        self.U = np.random.uniform(-stdv_U, stdv_U, size=(hidden_size, hidden_size))
        self.hidden_size = hidden_size
        
        self.grad_W = np.zeros_like(self.W)
        self.grad_U = np.zeros_like(self.U)
        
    def step(self, input_slice, hidden):
        # TODO: implement rnn step: h_t = relu(Wx_t + Uh_{t-1})
        # TODO: add your code here
        pass

    def forward(self, module_input):
        # TODO: implement forward pass: calculate {h_1, h_2, ... h_t} based on module_input using self.step
        # and return np.stack([h_1, h_2, ..., h_t])
        # TODO: you may want to save module output for later use in backward calculations
        # TODO: add your code here
        pass

    def backward_grad_input(self, module_input, grad_output):
        # TODO: calculate gradients w.r.t. module input (dL/dx) based on gradients w.r.t. module output (dL/dY)
        # TODO: note that each element in grad_output is not the full derivative of L w.r.t. h_t, because
        # h_t is 
        #   1) returned from the module (y_t = h_t)
        #   2) used in computations at later timestamps
        # you need to aggregate both of this cases to obtain the full derivative (dL/dH) in a similar manner 
        # as in forward pass (for-loop over time in reverse direction)
        # You may want to save dL/dH in self for later use in backward_grad_params
        # TODO: add your code here
        pass

    def backward_grad_params(self, module_input, grad_output):
        # TODO: calculate gradients w.r.t. module params (dL/dW, dL/dU). Because W and U is used at all timestamps,
        # gradients need to be aggregated from each timestamp
        # TODO: add your code here
        pass

    def zero_grad_params(self): 
        self.grad_W.fill(0)
        self.grad_U.fill(0)

    @property
    def params(self):
        return [self.W, self.U]

    @property
    def grad_params(self):
        return [self.grad_W, self.grad_U]


### Tests

In [None]:
import torch
print("Testing your implementation")

def test_linear():
    for i in range(10):
        layer = Linear(16, 32)
        torch_layer = torch.nn.Linear(16, 32)
        torch_layer.weight.data = torch.tensor(layer.W)
        torch_layer.bias.data = torch.tensor(layer.b)
        
        inp = np.random.normal(size=(10, 16))
        inp_torch = torch.tensor(inp, requires_grad=True)
        
        result = layer.forward(inp)
        result_torch = torch_layer.forward(inp_torch)
        assert(np.allclose(result, result_torch.data.numpy()))
        
        grad_out = np.random.normal(size=(10, 32))
        grad_in = layer.backward(inp, grad_out)
        
        (result_torch * torch.tensor(grad_out)).sum().backward()
        grad_in_torch = inp_torch.grad
        assert(np.allclose(grad_in, grad_in_torch.data.numpy()))
        assert(np.allclose(layer.grad_W, torch_layer.weight.grad.data.numpy()))
        assert(np.allclose(layer.grad_b, torch_layer.bias.grad.data.numpy()))
        
def test_relu():
    for i in range(10):
        layer = ReLU()
        torch_layer = torch.nn.ReLU()
        
        inp = np.random.normal(size=(10, 16))
        inp_torch = torch.tensor(inp, requires_grad=True)
        
        result = layer.forward(inp)
        result_torch = torch_layer.forward(inp_torch)
        assert(np.allclose(result, result_torch.data.numpy()))

        grad_out = np.random.normal(size=(10, 16))
        grad_in = layer.backward(inp, grad_out)
        
        (result_torch * torch.tensor(grad_out)).sum().backward()
        grad_in_torch = inp_torch.grad
        assert(np.allclose(grad_in, grad_in_torch.data.numpy()))
        
        
def test_log_softmax():
    for i in range(10):
        layer = LogSoftMax()
        torch_layer = torch.nn.LogSoftmax(dim=1)
        
        inp = np.random.normal(size=(10, 16))
        inp_torch = torch.tensor(inp, requires_grad=True)
        
        result = layer.forward(inp)
        result_torch = torch_layer.forward(inp_torch)
        assert(np.allclose(result, result_torch.data.numpy()))
        
        grad_out = np.random.normal(size=(10, 16))
        grad_in = layer.backward(inp, grad_out)
        
        (result_torch * torch.tensor(grad_out)).sum().backward()
        grad_in_torch = inp_torch.grad
        assert(np.allclose(grad_in, grad_in_torch.data.numpy()))
        
def test_cross_entropy_loss():
    for i in range(10):
        layer = CrossEntropyLoss()
        torch_layer = torch.nn.NLLLoss()
        
        inp = np.random.normal(size=(10, 16))
        inp_torch = torch.tensor(inp, requires_grad=True)
        
        target_indices = np.random.choice(16, 10)
        target_ohe = np.zeros((10, 16), dtype=np.int64)
        target_ohe[np.arange(10), target_indices] = 1
        
        result = layer.forward(inp, target_ohe)
        result_torch = torch_layer.forward(inp_torch, torch.tensor(target_indices))
        
        assert(np.allclose(result, result_torch.data.numpy()))
        
        grad_in = layer.backward(inp, target_ohe)
        
        result_torch.backward()
        grad_in_torch = inp_torch.grad
        assert(np.allclose(grad_in, grad_in_torch.data.numpy()))
        
        
def test_rnn_relu():
    for i in range(10):
        layer = SimpleReluRNN(64, 128)
        torch_layer = torch.nn.RNN(64, 128, nonlinearity='relu', bias=False)
        torch_layer.weight_ih_l0.data = torch.tensor(layer.W)
        torch_layer.weight_hh_l0.data = torch.tensor(layer.U)
        
        inp = np.random.normal(size=(10, 64))
        inp_torch = torch.tensor(inp[:, None, :], requires_grad=True)
        
        result = layer.forward(inp)
        result_torch = torch_layer.forward(inp_torch)[0][:, 0, :]
        assert(np.allclose(result, result_torch.data.numpy()))
        
        grad_out = np.random.normal(size=(10, 128))
        grad_in = layer.backward(inp, grad_out)
        
        (result_torch * torch.tensor(grad_out)).sum().backward()
        grad_in_torch = inp_torch.grad[:, 0, :]
        assert(np.allclose(grad_in, grad_in_torch.data.numpy()))
        assert(np.allclose(layer.grad_W, torch_layer.weight_ih_l0.grad.data.numpy()))
        assert(np.allclose(layer.grad_U, torch_layer.weight_hh_l0.grad.data.numpy()))
        
def test_sequential():
    for i in range(10):
        linear_1 = Linear(8, 16)
        linear_2 = Linear(16, 32)
        model = Sequential(
            linear_1,
            LogSoftMax(),
            linear_2,
            LogSoftMax()
        )
        linear_1_torch = torch.nn.Linear(8, 16)
        linear_1_torch.weight.data = torch.tensor(linear_1.W)
        linear_1_torch.bias.data = torch.tensor(linear_1.b)
        linear_2_torch = torch.nn.Linear(16, 32)
        linear_2_torch.weight.data = torch.tensor(linear_2.W)
        linear_2_torch.bias.data = torch.tensor(linear_2.b)
        model_torch = torch.nn.Sequential(
            linear_1_torch,
            torch.nn.LogSoftmax(dim=-1),
            linear_2_torch,
            torch.nn.LogSoftmax(dim=-1)
        )
        
        inp = np.random.normal(size=(10, 8))
        inp_torch = torch.tensor(inp, requires_grad=True)
        
        result = model.forward(inp)
        result_torch = model_torch.forward(inp_torch)
        
        assert(np.allclose(result, result_torch.data.numpy()))
        
        grad_out = np.random.normal(size=(10, 32))
        grad_in = model.backward(inp, grad_out)
        
        (result_torch * torch.tensor(grad_out)).sum().backward()
        grad_in_torch = inp_torch.grad
        assert(np.allclose(grad_in, grad_in_torch.data.numpy()))
        assert(np.allclose(linear_1.grad_W, linear_1_torch.weight.grad.data.numpy()))
        assert(np.allclose(linear_1.grad_b, linear_1_torch.bias.grad.data.numpy()))
        assert(np.allclose(linear_2.grad_W, linear_2_torch.weight.grad.data.numpy()))
        assert(np.allclose(linear_2.grad_b, linear_2_torch.bias.grad.data.numpy()))
        
        
test_linear()
test_relu()
test_log_softmax()
test_cross_entropy_loss()
test_rnn_relu()
test_sequential()
print("All tests passed!")

### PixelRNN

In [None]:
class SGDOptimizer:
    """Simple optimizer that does stochastic gradient descent with momentum update rule."""
    def __init__(self, model, lr, momentum=0.9):
        self._lr = lr
        self._momentum = momentum
        self._model = model
        self._grad_running_mean = {}
        for layer in self._model.params:
            for param in layer:
                self._grad_running_mean[id(param)] = np.zeros_like(param)
    
    def step(self):
        for layer, layer_grads in zip(self._model.params, self._model.grad_params):
            for param, param_grad in zip(layer, layer_grads):
                self._grad_running_mean[id(param)] = ((1 - self._momentum) * param_grad +
                                                      self._momentum * self._grad_running_mean[id(param)])
                param -= self._lr * self._grad_running_mean[id(param)]
        self._model.zero_grad_params()

In [None]:
class Network(Module):
    """Here is the most simple PixelRNN implementation."""
    def __init__(self, input_size, hidden_size):
        self._rnn = SimpleReluRNN(input_size, hidden_size)
        self._prediction_net = Sequential(
            Linear(hidden_size, hidden_size),
            ReLU(),
            Linear(hidden_size, input_size),
            LogSoftMax()
        )
        
    def forward(self, inp):
        self._rnn_out = self._rnn.forward(inp)
        self._output = self._prediction_net.forward(self._rnn_out)
        return self._output
    
    def backward(self, inp, grad_out):
        grad_rnn_out = self._prediction_net.backward(self._rnn_out, grad_out)
        grad_in = self._rnn.backward(inp, grad_rnn_out)
        return grad_in
    
    def zero_grad_params(self):
        self._rnn.zero_grad_params()
        self._prediction_net.zero_grad_params()
        
    def step(self, inp, hidden):
        hidden = self._rnn.step(inp, hidden)
        loglikes = self._prediction_net.forward(hidden[None, :])[0]
        return loglikes, hidden
    
    @property
    def params(self):
        return [self._rnn.params, *self._prediction_net.params]
    
    @property
    def grad_params(self):
        return [self._rnn.grad_params, *self._prediction_net.grad_params]

In [None]:
# TODO: try changing hidden state size and optimizer parameters (learning rate, momentum) to obtain the
# best train set cross entropy loss
network = Network(17, 128)
opt = SGDOptimizer(network, lr=1e-2, momentum=0.9)
loss = CrossEntropyLoss()

In [None]:
from sklearn.datasets import load_digits
import matplotlib.pyplot as plt
print("Loading data")

X = load_digits()['data'].astype(np.int)
np.random.shuffle(X)

_, axes = plt.subplots(1, 10, figsize=(10, 1))
for idx, img in enumerate(X[:10]):
    axes[idx].imshow(img.reshape(8, 8), cmap="gray")
plt.show()

In [None]:
# For reference: you should be able to obtain final loss close to 1.2-1.4 if you implemented CrossEntropyLoss
# with normalization over samples. If your loss is higher, try tweaking learning_rate and hidden_size parameters
print("Training for 20 epochs")

for epoch in range(20):
    loss_values = []
    for idx, sequence in enumerate(X):
        X_ohe = np.zeros((len(sequence), 17))
        X_ohe[np.arange(len(sequence)), sequence] = 1
        inp = X_ohe[:-1]
        target = X_ohe[1:]
        loglikes = network.forward(inp)
        loss_value = loss.forward(loglikes, target)
        loss_values.append(loss_value)
        grad_loglikes = loss.backward(loglikes, target)
        network.backward(inp, grad_loglikes)
        opt.step()
    print(epoch, np.mean(loss_values))

In [None]:
def sample(network, num_steps, starting_samples=[0]):
    hidden = np.zeros(network._rnn.hidden_size)
    samples = [sample for sample in starting_samples]
    # TODO: first feed all samples but the last one to the network to obtain correct hidden state,
    # then sample your network num_steps times starting with the last element of samples as the first input
    # to the network, each time calling np.random.choice(np.arange(17), p=probs) with the correct
    # probabilities (np.exp(loglikes))
    # TODO: add your code here
    pass

In [None]:
print("Sampling from network")
plt.figure(figsize=(1, 8))
plt.imshow(sample(network, 63).reshape(8, 8), cmap="gray")

In [None]:
# Now we try reconstruction example from the paper (https://arxiv.org/pdf/1601.06759.pdf)
print("Reconstruction example")
_, axes = plt.subplots(1, 5)
img = X[0]
axes[0].imshow(img.reshape(8, 8), cmap="gray")
axes[0].set_title("Original")
upper_part = img[:32]
axes[1].imshow(np.concatenate((upper_part.reshape(4, 8),
                               np.zeros(shape=(4, 8)))), cmap="gray")
axes[1].set_title("Occluded")
result = sample(network, 32, starting_samples=upper_part)
axes[2].imshow(result.reshape(8, 8), cmap="gray")
axes[2].set_title("Sample #1")

result = sample(network, 32, starting_samples=upper_part)
axes[3].imshow(result.reshape(8, 8), cmap="gray")
axes[3].set_title("Sample #2")

result = sample(network, 32, starting_samples=upper_part)
axes[4].imshow(result.reshape(8, 8), cmap="gray")
axes[4].set_title("Sample #3")
plt.show()