In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Function
from torch.nn.modules.utils import _pair
from torch.nn import init

import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR

import sys
import math
import numpy as np
import time

import utils as utils
import ste as ste

log2 = math.log(2)

In [None]:
class LinearShift(nn.Module):
    def __init__(self, in_features, out_features, 
                 bias=True, freeze_sign=False, use_cuda=True, 
                 rounding='deterministic', weight_bits=5):
        
        super(LinearShift, self).__init__()
        
        self.in_features = in_features
        self.out_features = out_features
        self.use_cuda = use_cuda
        self.rounding = rounding
        self.shift_range = (-1 * (2**(weight_bits - 1) - 2), 0) # we use ternary weights to represent sign
        
        # nn.Parameter is a special kind of Tensor, that will get
        # automatically registered as Module's parameter once it's assigned
        # as an attribute. Parameters and buffers need to be registered, or
        # they won't appear in .parameters() (doesn't apply to buffers), and
        # won't be converted when e.g. .cuda() is called. You can use
        # .register_buffer() to register buffers.
        # nn.Parameters require gradients by default.
        
        # here we register the shift (P) and sign (S) as parameters
        self.shift = nn.Parameter(torch.Tensor(out_features, in_features))
        self.sign = nn.Parameter(torch.Tensor(out_features, in_features), requires_grad = (freeze_sign == False))

        if bias:
            self.bias = nn.Parameter(torch.Tensor(out_features))
        else:
            # You should always register all possible parameters, but the
            # optional ones can be None if you want.
            self.register_parameter('bias', None)

        self.reset_parameters()

    def reset_parameters(self):
        self.shift.data.uniform_(*self.shift_range) 
        self.sign.data.uniform_(-1, 1) 
        
        if self.bias is not None:
            fan_in, _ = init._calculate_fan_in_and_fan_out(self.shift)
            bound = 1 / math.sqrt(fan_in)
            init.uniform_(self.bias, -bound, bound)

    def forward(self, input):
        # force the P into our predefined range
        self.shift.data = ste.clamp(self.shift.data, *self.shift_range)
        
        # make a temporary rounded version of P
        # **NOTE: we don't actually update P to its rounded version,
        #         this seemed weird to me at first
        shift_rounded = ste.round(self.shift, rounding=self.rounding)
        
        # make a temproary rounded version of S
        sign_rounded_signed = ste.sign(ste.round(self.sign, rounding=self.rounding))
        
        # make a temporary weight matrix built from P and S
        weight_ps = ste.unsym_grad_mul(2**shift_rounded, sign_rounded_signed)
        
        # with all that said and done, we can just pass W_ps to a vanilla
        # pytorch linear layer

        
        return torch.nn.functional.linear(input, weight_ps, self.bias)

In [None]:
class Conv2dShift(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size,
                 stride=1, padding=0, dilation=1,# output_padding,
                 groups=1, bias=True, padding_mode='zeros', 
                 rounding='deterministic', weight_bits=5):
        
        super(Conv2dShift, self).__init__()
        
        if in_channels % groups != 0:
            raise ValueError('in_channels must be divisible by groups')
        if out_channels % groups != 0:
            raise ValueError('out_channels must be divisible by groups')
        
        kernel_size = _pair(kernel_size)
        stride = _pair(stride)
        padding = _pair(padding)
        dilation = _pair(dilation)
        
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.dilation = dilation
        self.groups = groups
        self.padding_mode = padding_mode
        self.rounding=rounding
        self.shift_range = (-1 * (2**(weight_bits - 1) - 2), 0) # we use ternary weights to represent sign

        self.shift = nn.Parameter(torch.Tensor(
            out_channels, in_channels // groups, *kernel_size))
        self.sign = nn.Parameter(torch.Tensor(
            out_channels, in_channels // groups, *kernel_size),
            requires_grad = True)
        
        if bias:
            self.bias = nn.Parameter(torch.Tensor(out_channels))
        else:
            self.register_parameter('bias', None)
        
        self.reset_parameters()

    def reset_parameters(self):
        self.shift.data.uniform_(-10, -1) # (-0.1, 0.1)
        self.sign.data.uniform_(-1, 1) # (-0.1, 0.1)
        
        if self.bias is not None:
            fan_in, _ = init._calculate_fan_in_and_fan_out(self.shift)
            bound = 1 / math.sqrt(fan_in)
            init.uniform_(self.bias, -bound, bound)

    def forward(self, input):
        self.shift.data = ste.clamp(self.shift.data, *self.shift_range)
        shift_rounded = ste.round(self.shift, self.rounding)
        sign_rounded_signed = ste.sign(ste.round(self.sign, self.rounding))
        weight_ps = ste.unsym_grad_mul(2**shift_rounded, sign_rounded_signed)
        input_fixed_point = ste.round_fixed_point(input)
        
        if self.bias is not None:
            bias_fixed_point = ste.round_fixed_point(self.bias)
        else:
            bias_fixed_point = None

        if self.padding_mode == 'circular':
            expanded_padding = ((self.padding[1] + 1) // 2, self.padding[1] // 2,
                                (self.padding[0] + 1) // 2, self.padding[0] // 2)

            input_padded = F.pad(input_fixed_point, expanded_padding, mode='circular')
            padding =  _pair(0)
        else:
            input_padded = input_fixed_point
            padding = self.padding
            
        return torch.nn.functional.conv2d(input_padded, weight_ps, bias_fixed_point, 
                                          self.stride, padding, self.dilation, self.groups)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = Conv2dShift(1, 32, 3, 1)
        self.conv2 = Conv2dShift(32, 64, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = LinearShift(9216, 128)
        self.fc2 = LinearShift(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        
        output = F.log_softmax(x, dim=1)
        return output


def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        
        readout = 'Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
            epoch, batch_idx * len(data), len(train_loader.dataset),
            100. * batch_idx / len(train_loader), loss.item())
        sys.stdout.write(readout)
        sys.stdout.flush()
        sys.stdout.write('\r')
        
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
        

In [None]:
use_cuda = False

device = torch.device("cuda" if use_cuda else "cpu")

n_epochs=5
lr=1.0
gamma=0.7
batch_size = 64
test_batch_size = 1000

kwargs = {}
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=batch_size, shuffle=True, **kwargs)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=test_batch_size, shuffle=True, **kwargs)


In [None]:
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=lr)

scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
for epoch in range(1, n_epochs + 1):
    train(model, device, train_loader, optimizer, epoch)
    test(model, device, test_loader)
    scheduler.step()