In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.quantization import QuantStub, DeQuantStub

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.quant = QuantStub()
        self.fc1 = nn.Linear(784, 10)  # Input layer to Hidden layer
        self.fc2 = nn.Linear(10, 10)   # Hidden layer to Output layer
        self.dequant = DeQuantStub()

    def forward(self, x):
        x = self.quant(x)
        x = F.relu(self.fc1(x.view(-1, 784)))
        x = self.fc2(x)
        x = F.log_softmax(x, dim=1)
        x = self.dequant(x)
        return x

In [2]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1000, shuffle=False)

In [3]:
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Net().to(device)
model.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
torch.quantization.prepare_qat(model, inplace=True)



Net(
  (quant): QuantStub(
    (activation_post_process): FusedMovingAvgObsFakeQuantize(
      fake_quant_enabled=tensor([1]), observer_enabled=tensor([1]), scale=tensor([1.]), zero_point=tensor([0], dtype=torch.int32), dtype=torch.quint8, quant_min=0, quant_max=127, qscheme=torch.per_tensor_affine, reduce_range=True
      (activation_post_process): MovingAverageMinMaxObserver(min_val=inf, max_val=-inf)
    )
  )
  (fc1): Linear(
    in_features=784, out_features=10, bias=True
    (weight_fake_quant): FusedMovingAvgObsFakeQuantize(
      fake_quant_enabled=tensor([1]), observer_enabled=tensor([1]), scale=tensor([1.]), zero_point=tensor([0], dtype=torch.int32), dtype=torch.qint8, quant_min=-128, quant_max=127, qscheme=torch.per_channel_symmetric, reduce_range=False
      (activation_post_process): MovingAveragePerChannelMinMaxObserver(min_val=tensor([]), max_val=tensor([]))
    )
    (activation_post_process): FusedMovingAvgObsFakeQuantize(
      fake_quant_enabled=tensor([1]), observer

In [5]:
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
model.to(device)

for epoch in range(1, 10):  # Run for 10 epochs for demonstration; adjust as needed
    train(model, device, train_loader, optimizer, epoch)
    test(model, device, test_loader)
    if epoch > 3:  # Freeze quantizer parameters after the first few epochs
        model.apply(torch.quantization.disable_observer)
    if epoch > 4:  # Freeze batch norm mean and variance estimates
        model.apply(torch.nn.intrinsic.qat.freeze_bn_stats)

# Convert to quantized version for inference
model.to('cpu')
torch.quantization.convert(model, inplace=True)


Test set: Average loss: 0.4747, Accuracy: 8734/10000 (87%)


Test set: Average loss: 0.3640, Accuracy: 8956/10000 (90%)


Test set: Average loss: 0.3202, Accuracy: 9077/10000 (91%)


Test set: Average loss: 0.2946, Accuracy: 9147/10000 (91%)


Test set: Average loss: 0.2858, Accuracy: 9195/10000 (92%)


Test set: Average loss: 0.2839, Accuracy: 9194/10000 (92%)


Test set: Average loss: 0.2806, Accuracy: 9227/10000 (92%)


Test set: Average loss: 0.2778, Accuracy: 9185/10000 (92%)


Test set: Average loss: 0.2789, Accuracy: 9197/10000 (92%)



Net(
  (quant): Quantize(scale=tensor([0.0256]), zero_point=tensor([17]), dtype=torch.quint8)
  (fc1): QuantizedLinear(in_features=784, out_features=10, scale=0.22472526133060455, zero_point=41, qscheme=torch.per_channel_affine)
  (fc2): QuantizedLinear(in_features=10, out_features=10, scale=0.23055149614810944, zero_point=73, qscheme=torch.per_channel_affine)
  (dequant): DeQuantize()
)

In [6]:
test(model, device, test_loader)

NotImplementedError: Could not run 'aten::_log_softmax.out' with arguments from the 'QuantizedCPU' backend. This could be because the operator doesn't exist for this backend, or was omitted during the selective/custom build process (if using custom build). If you are a Facebook employee using PyTorch on mobile, please visit https://fburl.com/ptmfixes for possible resolutions. 'aten::_log_softmax.out' is only available for these backends: [CPU, CUDA, Meta, BackendSelect, Python, FuncTorchDynamicLayerBackMode, Functionalize, Named, Conjugate, Negative, ZeroTensor, ADInplaceOrView, AutogradOther, AutogradCPU, AutogradCUDA, AutogradHIP, AutogradXLA, AutogradMPS, AutogradIPU, AutogradXPU, AutogradHPU, AutogradVE, AutogradLazy, AutogradMeta, AutogradMTIA, AutogradPrivateUse1, AutogradPrivateUse2, AutogradPrivateUse3, AutogradNestedTensor, Tracer, AutocastCPU, AutocastCUDA, FuncTorchBatched, FuncTorchVmapMode, Batched, VmapMode, FuncTorchGradWrapper, PythonTLSSnapshot, FuncTorchDynamicLayerFrontMode, PythonDispatcher].

CPU: registered at aten/src/ATen/RegisterCPU.cpp:31034 [kernel]
CUDA: registered at aten/src/ATen/RegisterCUDA.cpp:43986 [kernel]
Meta: registered at /dev/null:241 [kernel]
BackendSelect: fallthrough registered at ../aten/src/ATen/core/BackendSelectFallbackKernel.cpp:3 [backend fallback]
Python: registered at ../aten/src/ATen/core/PythonFallbackKernel.cpp:144 [backend fallback]
FuncTorchDynamicLayerBackMode: registered at ../aten/src/ATen/functorch/DynamicLayer.cpp:491 [backend fallback]
Functionalize: registered at aten/src/ATen/RegisterFunctionalization_3.cpp:22788 [kernel]
Named: registered at ../aten/src/ATen/core/NamedRegistrations.cpp:7 [backend fallback]
Conjugate: registered at ../aten/src/ATen/ConjugateFallback.cpp:17 [backend fallback]
Negative: registered at ../aten/src/ATen/native/NegateFallback.cpp:19 [backend fallback]
ZeroTensor: registered at ../aten/src/ATen/ZeroTensorFallback.cpp:86 [backend fallback]
ADInplaceOrView: registered at ../torch/csrc/autograd/generated/ADInplaceOrViewType_1.cpp:5017 [kernel]
AutogradOther: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradCPU: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradCUDA: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradHIP: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradXLA: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradMPS: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradIPU: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradXPU: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradHPU: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradVE: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradLazy: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradMeta: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradMTIA: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradPrivateUse1: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradPrivateUse2: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradPrivateUse3: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
AutogradNestedTensor: registered at ../torch/csrc/autograd/generated/VariableType_0.cpp:15232 [autograd kernel]
Tracer: registered at ../torch/csrc/autograd/generated/TraceType_0.cpp:16728 [kernel]
AutocastCPU: fallthrough registered at ../aten/src/ATen/autocast_mode.cpp:487 [backend fallback]
AutocastCUDA: fallthrough registered at ../aten/src/ATen/autocast_mode.cpp:354 [backend fallback]
FuncTorchBatched: registered at ../aten/src/ATen/functorch/LegacyBatchingRegistrations.cpp:815 [backend fallback]
FuncTorchVmapMode: fallthrough registered at ../aten/src/ATen/functorch/VmapModeRegistrations.cpp:28 [backend fallback]
Batched: registered at ../aten/src/ATen/LegacyBatchingRegistrations.cpp:1073 [backend fallback]
VmapMode: fallthrough registered at ../aten/src/ATen/VmapModeRegistrations.cpp:33 [backend fallback]
FuncTorchGradWrapper: registered at ../aten/src/ATen/functorch/TensorWrapper.cpp:210 [backend fallback]
PythonTLSSnapshot: registered at ../aten/src/ATen/core/PythonFallbackKernel.cpp:152 [backend fallback]
FuncTorchDynamicLayerFrontMode: registered at ../aten/src/ATen/functorch/DynamicLayer.cpp:487 [backend fallback]
PythonDispatcher: registered at ../aten/src/ATen/core/PythonFallbackKernel.cpp:148 [backend fallback]


In [None]:
import numpy as np

# Ensure the model is in evaluation mode
model.eval()

for name, module in model.named_children():
    if isinstance(module, torch.nn.quantized.Linear):
        # Handle integer weights
        weight_int_repr = module.weight().int_repr().numpy()
        np.save(f"{name}_weight_int.npy", weight_int_repr)

        # Handle scale and zero-point for weights
        if hasattr(module.weight(), 'q_per_channel_scales'):  # Per-channel quantization
            weight_scale = module.weight().q_per_channel_scales().numpy()
            weight_zero_point = module.weight().q_per_channel_zero_points().numpy()
        else:  # Per-tensor quantization
            weight_scale = np.array([module.weight().q_scale()])
            weight_zero_point = np.array([module.weight().q_zero_point()])
        
        np.save(f"{name}_weight_scale.npy", weight_scale)
        np.save(f"{name}_weight_zero_point.npy", weight_zero_point)
        
        

#         if hasattr(module, 'bias'):
#             # Check if the bias is indeed a tensor and not None
#             bias_attr = getattr(module, 'bias', None)
#             if bias_attr is not None and isinstance(bias_attr, torch.Tensor):
#                 # Ensure the tensor is detached and moved to CPU for conversion to NumPy
#                 bias_tensor = bias_attr.detach().cpu().numpy()
#                 np.save(f"{name}_bias.npy", bias_tensor)
#             else:
#                 print(f"Module {name} has no bias or it's not accessible as expected.")

In [None]:
for layer_name in ['fc1', 'fc2']:
    layer = getattr(model, layer_name, None)
    if layer is not None:
        print(f"Inspecting {layer_name}:")
        if hasattr(layer, 'bias'):
            print(f"  Bias attribute exists for {layer_name}.")
            if layer.bias is not None:
                print(f"  Bias for {layer_name} is not None.")
                # Attempt to print out the size of the bias tensor
                try:
                    print(f"  Bias size for {layer_name}: {layer.bias.size()}")
                except AttributeError as e:
                    print(f"  Encountered an AttributeError when accessing bias size for {layer_name}: {e}")
            else:
                print(f"  Bias for {layer_name} is None.")
        else:
            print(f"  No bias attribute found for {layer_name}.")
    else:
        print(f"Layer {layer_name} not found in the model.")

In [None]:
print(model)


In [9]:
import numpy as np

for name, module in model.named_children():
    print(name)
    if name == 'fc1' or name == 'fc2':
        weight_int_repr = module.weight().int_repr().numpy()
        np.save(f"{name}_weight_int.npy", weight_int_repr)
        weight_int_repr.tofile(f"{name}_weight_int.bin")
        print(weight_int_repr.shape)
        
        
        # Check the quantization scheme and save scale and zero_point accordingly
        if module.weight().qscheme() == torch.per_tensor_affine:
            # Saving scale and zero_point for per-tensor quantization
            weight_scale = np.array([module.weight().q_scale()], dtype=np.float32)
            weight_zero_point = np.array([module.weight().q_zero_point()], dtype=np.int32)
        elif module.weight().qscheme() == torch.per_channel_affine:
            # Saving scale and zero_point for per-channel quantization
            weight_scale = module.weight().q_per_channel_scales().numpy()
            weight_zero_point = module.weight().q_per_channel_zero_points().numpy()
        else:
            raise NotImplementedError("Unsupported quantization scheme")
        
        weight_scale.tofile(f"{name}_weight_scale.bin")
        np.save(f"{name}_weight_scale.npy", weight_scale)
        weight_zero_point.tofile(f"{name}_weight_zero_point.bin")
        np.save(f"{name}_weight_zero_point.npy", weight_zero_point)

        # Saving biases (already in float32)
        if module.bias is not None:
            bias = module.bias().detach().numpy()
            bias.tofile(f"{name}_bias.bin")
            np.save(f"{name}_bias.npy", bias)

quant
fc1
(10, 784)
fc2
(10, 10)
dequant


In [28]:
mnist_train = datasets.MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())

# Extract the first image and its label
image, label = mnist_train[0]

# Convert the image tensor to a PIL image (Note: 'image' is a 1x28x28 tensor)
#image = transforms.ToPILImage()(image)


# image_rgb = image.convert("RGB")

# # Save the image as BMP
# image_rgb.save('mnist_first_image.bmp', 'BMP')

# Convert the image tensor to a PIL image (Note: 'image' is a 1x28x28 tensor)
image_pil = transforms.ToPILImage()(image.squeeze())  # Use .squeeze() to remove the channel dimension for grayscale

# Save the image as BMP
image_pil.save('mnist_first_image_grayscale.bmp', 'BMP')

In [29]:
from PIL import Image
image = Image.open('mnist_first_image_grayscale.bmp')

# Convert the image to a NumPy array
image_array = np.array(image)

print(image_array.shape)

(28, 28)


In [23]:
# Collapse the (28, 28, 3) array into a (784, 3) array
collapsed_array = image_array.reshape(-1, 3)

# Take the average of all elements in the 3rd dimension to make it a (784,) array
average_array = np.mean(collapsed_array, axis=1)
print(average_array)

[  0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   3.  18.
  18.  18. 126. 136. 175.  26. 166. 255. 247. 127.   0.   0.   0.   0.
   0.   0.   0.   0.   0.   0.   0.   0.  30.  36.  94. 154. 170. 253.
 253. 253. 253. 253. 225. 172. 253. 242. 195.  64.   0.   0.   0.   0.
   0. 

In [10]:
print(weight_int_repr)

[[  86   63   30  -58  127 -128  127 -128 -128 -128]
 [-128 -128  127  127 -128  127 -128  127  -86 -128]
 [ -59  127  127 -114   94 -107 -128   52   19  -43]
 [-125  -90  127  127 -128 -128   70  -26  127   14]
 [  34   84 -128  -69 -128  -11  -82   12  -85  127]
 [  41   29 -128  127  127   23 -128 -128   70   15]
 [ -35  127 -128   54 -128  -69   29  127 -128 -128]
 [-128 -128  127 -128  127 -120  127  127 -128  127]
 [  39   62  -62 -128 -128  127   86  -21  127 -128]
 [ 127 -128  -38  -33 -128  -40   45  127    0   -2]]


In [13]:
image_array = np.array(image_rgb)

print(image_array)

[[[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 ...

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]

 [[0 0 0]
  [0 0 0]
  [0 0 0]
  ...
  [0 0 0]
  [0 0 0]
  [0 0 0]]]


In [24]:
for name, module in model.named_children():
    print(name)
    if name == 'fc1':
        weight_int_repr = module.weight().int_repr().numpy()
#         np.save(f"{name}_weight_int.npy", weight_int_repr)
#         weight_int_repr.tofile(f"{name}_weight_int.bin")
#         print(weight_int_repr.shape)

quant
fc1
fc2
dequant


In [44]:
image_array = image_array.astype(np.float64)
print(image_array.shape)
flattened_array = image_array.flatten()

(28, 28)


In [41]:
weight_int_repr = weight_int_repr.astype(np.float64)
print(weight_int_repr.shape)

(10, 784)


In [48]:
f1_out = np.dot(weight_int_repr[:10,:10],flattened_array[:10])
print(f1_out)

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


In [38]:
file_path = 'fc1_weight_int.bin'

# Read the binary file and convert it into a numpy array
data = np.fromfile(file_path, dtype=int)
print(data.shape)

(980,)
