
# Imports


In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from copy import deepcopy

# Set a seed for reproducibility
torch.manual_seed(1)

<torch._C.Generator at 0x7e51e8ecba10>

In [2]:
! pip install onnx
import onnx



# Define the neural network, load data, train it

In [3]:
# Define a simple neural network
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(784, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = x.view(-1, 784)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Set the device to use for computation
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Set up the network and optimizer
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# Load the training data
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('./data', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                #       transforms.Normalize((0.1307,), (0.3081,))
                   ])
    ),
    batch_size=64, shuffle=True)

# Train the model
model.train()
for epoch in range(10):  # 10 epochs
    for batch_idx, (data, target) in enumerate(train_loader):
        data = data
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = nn.CrossEntropyLoss()(output, target)
        loss.backward()
        optimizer.step()

        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))




# Create the test loader and test the model

In [4]:
# Load the test data
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('./data', train=False,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                   #    transforms.Normalize((0.1307,), (0.3081,))
                   ])
    ),
    batch_size=1000, shuffle=True)


In [5]:

def test(model, device, test_loader):
    model.eval()  # set the model to evaluation mode
    test_loss = 0
    correct = 0
    with torch.no_grad():  # disable gradient computation
        for data, target in test_loader:
            data = torch.round(data)
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += nn.CrossEntropyLoss()(output, target).item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            break

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

test(model, device, test_loader)



Test set: Average loss: 0.0000, Accuracy: 969/10000 (10%)



# Construct and train an Observer Model.

This network records the maximum and minimum output value of each layer in the training set when loaded with the learned weights from the previous step.

In [6]:
class ObserveNet(nn.Module):
    def __init__(self):
        super(ObserveNet, self).__init__()
        self.fc1 = nn.Linear(784, 500)
        self.fc2 = nn.Linear(500, 10)
        self.so_1_max = torch.tensor(-float('inf'))
        self.so_2_max = torch.tensor(-float('inf'))
        self.so_1_min = torch.tensor(float('inf'))
        self.so_2_min = torch.tensor(float('inf'))

    def forward(self, x):

        x = x.view(-1, 784)
        x = torch.relu(self.fc1(x))

        temp_max = torch.max(x)
        temp_min = torch.min(x)
        self.so_1_max = temp_max if temp_max > self.so_1_max else self.so_1_max
        self.so_1_min = temp_min if temp_min < self.so_1_min else self.so_1_min

        x = self.fc2(x)
        temp_max = torch.max(x)
        temp_min = torch.min(x)
        self.so_2_max = temp_max if temp_max > self.so_2_max else self.so_2_max
        self.so_2_min = temp_min if temp_min < self.so_2_min else self.so_2_min

        return x

q_model_dict = deepcopy(model.state_dict())

o_net = ObserveNet()
o_net.load_state_dict(q_model_dict)

test(o_net, device, train_loader)



Test set: Average loss: 0.0000, Accuracy: 63/60000 (0%)



# Calculate scale constants of output for layer1 and layer2

We calculate the output scale for layer1 and layer2 using the maximum and minimum output values for each layer gathered by the Observer network.

In [7]:
so_1 = max(o_net.so_1_max, torch.abs(o_net.so_1_min)).float().item() / 127
so_2 = max(o_net.so_2_max, torch.abs(o_net.so_2_min)).float().item() / 127
print(so_1)
print(so_2)

0.04499745181226355
0.14810363889679196


# Quantize the layer weights

In [8]:
from copy import deepcopy
q_model_dict = deepcopy(model.state_dict())

# Returns the maximum value of a tensor devided by `m` which is the maximum n-bit int value
# in quantization range.
def max_scale(x: torch.tensor, m: int):
  return torch.max(torch.abs(x)).item()/m

# Scale of fc1 and fc2 determined by maximum value of int8 (127) and maximum weight value.
def quantize_fc(x: torch.tensor, m: int):
  return max_scale(x,m), torch.round(x / max_scale(x, m)).to(dtype=torch.int32)

# Scale of bias determined by scale of the output of fc layer
# Which is the scale of input multiplied by scale of the fc layer.
def quantize_bias(x: torch.tensor, s: float):
  # saturate
  return torch.clip(torch.round(x/s), min=-127, max=127)

s_fc1, q_model_dict['fc1.weight'] = quantize_fc(q_model_dict['fc1.weight'], 127)
q_model_dict['fc1.bias'] = quantize_bias(q_model_dict['fc1.bias'], s_fc1 * 1 / 127)
s_fc2, q_model_dict['fc2.weight'] = quantize_fc(q_model_dict['fc2.weight'], 127)
q_model_dict['fc2.bias'] = quantize_bias(q_model_dict['fc2.bias'], s_fc2*so_1)

class QuantNet(nn.Module):
    def __init__(self, s_fc1, s_fc2, so_1, so_2):
        super(QuantNet, self).__init__()
        self.fc1 = nn.Linear(784, 500)
        self.fc2 = nn.Linear(500, 10)
        self.s_fc1 = s_fc1
        self.s_fc2 = s_fc2
        self.so_1 = so_1
        self.so_2 = so_2
        self.s_x = 1 / 127

    def forward(self, x):
        x = x.view(-1, 784)

        # Scale input
        x = torch.round(x / self.s_x)

        # First layer
        x = torch.relu(self.fc1(x))

        # Requantize and saturation cast
        x = torch.clip(torch.round(x * ((self.s_fc1 * self.s_x) / self.so_1)), -127, 127)

        # Second layer
        x = self.fc2(x)

        # Requantize and saturation cast
        x = torch.clip(torch.round(x * ((self.s_fc2 * self.so_1) / self.so_2)), -127, 127)

        return x * self.so_2

q_net = QuantNet(s_fc1=s_fc1, s_fc2=s_fc2, so_1=so_1, so_2=so_2)
q_net.load_state_dict(q_model_dict)

<All keys matched successfully>

# Print the scaling values so they can be used inside of Urbit

In [9]:
print(so_1)
print(so_2)
print(s_fc1)
print(s_fc2)

0.04499745181226355
0.14810363889679196
0.0019561668315271692
0.0059992356563177635


# Test the QuantNet on the test set

In [10]:
# Run the test function
test(q_net, device, test_loader)


Test set: Average loss: 0.0000, Accuracy: 973/10000 (10%)



In [11]:
x = list(test_loader)[0][0][0]

# Run cells below to export model to onnx




In [12]:
torch.onnx.export(q_net,               # model being run
                  x[0],                         # model input (or a tuple for multiple inputs)
                  "net-quant.onnx",   # where to save the model (can be a file or file-like object)
                  export_params=True,        # store the trained parameter weights inside the model file
                  opset_version=11,          # the ONNX version to export the model to
                  do_constant_folding=True,  # whether to execute constant folding for optimization
                  input_names = ['input1'],   # the model's input names
                  output_names = ['output'], # the model's output names
                  dynamic_axes={'input1' : {0 : 'input_size'},    # variable length axes
                                'output' : {0 : 'input_size'}})

verbose: False, log level: Level.ERROR



In [13]:
model = onnx.load('net-quant.onnx')
weights = model.graph.initializer
onnx.numpy_helper.to_array(weights[2]).shape

(10, 500)

# Write the QuantNet weights to disk as int32

In [14]:
for name, param in q_net.named_parameters():
    print(name)
    print(param.detach().int().numpy())
    def to_byte_array(array, name):
        # Ensure the array is int32
        array = array.astype(np.int32)

        # Flatten the array in column-major order
        flattened = array.flatten(order='C')

        # Convert to byte array
        byte_array = flattened.tobytes()

        # Write byte array to a file
        with open(name, 'wb') as f:
          f.write(byte_array)


        return byte_array

    # Test the function
    to_byte_array(param.detach().int().numpy(), f'{name}.mnist')

fc1.weight
[[  9  -8  -4 ...   3  -2   1]
 [-14 -11 -16 ...  -1   1  -9]
 [ 14  -5 -17 ...  -7   3 -17]
 ...
 [ 16 -10 -16 ...   7 -14   9]
 [  8 -11 -14 ...  -9 -17  13]
 [ 13 -17 -12 ...  18   4  11]]
fc1.bias
[ 127 -127  127 -127  127  127  127  127  127 -127 -127  127  127  127
 -127  127  127 -127  127  127  127  127  127 -127  127  127 -127 -127
  127 -127  127 -127 -127 -127 -127  127 -127  127  127  127  127 -127
 -127  127 -127 -127  127 -127 -104  127  127  127 -127  127 -127  127
 -127  127 -127 -127  127  127  127  127 -127  127 -127  127  127  127
  127  127 -127  127  127  127 -127  127  127  127 -127  127  127 -127
  127  127  127  127 -127 -127 -127 -127 -127  127   56 -127  127  127
  127  127  127 -127  127  127 -127  127  127 -127  127  127  127 -127
  127 -127  127 -127  127  127  127 -127 -127  127 -127  127  -42  127
  127  127 -127  127  127  127 -127  127  127  127  127  127  127  127
  127  127  127  127  127  127  127 -127 -127  127  127  127 -127 -127
  127  