<a href="https://colab.research.google.com/github/pelinbalci/LLM_Notebooks/blob/main/Quantization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Quantize - DeQuantize

Original Codes for this part: https://github.com/hkproj/quantization-notes/blob/main/quantization_from_scratch.ipynb

In [83]:
import numpy as np

# Suppress scientific notation
np.set_printoptions(suppress=True)

# Generate randomly distributed parameters
params = np.random.uniform(low=-50, high=150, size=20)

# Make sure important values are at the beginning for better debugging
params[0] = params.max() + 1
params[1] = params.min() - 1
params[2] = 0

# Round each number to the second decimal place
params = np.round(params, 2)

# Print the parameters
print(params)

[147.16 -46.83   0.    26.65 135.02  60.88   1.46 124.1  142.15  53.96
 -45.83  31.05  -7.78 146.16 131.59 -24.1   27.18 116.29 122.6  -44.58]


In [87]:
def clamp(params_q: np.array, lower_bound: int, upper_bound: int) -> np.array:
    params_q[params_q < lower_bound] = lower_bound
    params_q[params_q > upper_bound] = upper_bound
    return params_q

def asymmetric_quantization(params: np.array, bits: int) -> tuple[np.array, float, int]:
    # Calculate the scale and zero point
    max_param = np.max(params)
    min_param = np.min(params)
    scale = (max_param - min_param) / (2**bits-1)
    zero = -1*np.round(min_param / scale)
    lower_bound, upper_bound = 0, 2**bits-1
    # Quantize the parameters
    quantized = clamp(np.round(params / scale + zero), lower_bound, upper_bound).astype(np.int32)
    return quantized, scale, zero

def asymmetric_dequantize(params_q: np.array, scale: float, zero: int) -> np.array:
    return (params_q - zero) * scale

def symmetric_dequantize(params_q: np.array, scale: float) -> np.array:
    return params_q * scale

def symmetric_quantization(params: np.array, bits: int) -> tuple[np.array, float]:
    # Calculate the scale
    max_param_abs = np.max(np.abs(params))
    scale = max_param_abs / (2**(bits-1)-1)
    lower_bound = -2**(bits-1)
    upper_bound = 2**(bits-1)-1
    # Quantize the parameters
    quantized = clamp(np.round(params / scale), lower_bound, upper_bound).astype(np.int32)
    return quantized, scale

def quantization_error(params: np.array, params_q: np.array):
    # calculate the MSE
    return np.mean((params - params_q)**2)

(asymmetric_q, asymmetric_scale, asymmetric_zero) = asymmetric_quantization(params, 8)
(symmetric_q, symmetric_scale) = symmetric_quantization(params, 8)

print(f'Original:')
print(np.round(params, 2))
print('')
print(f'Asymmetric scale: {asymmetric_scale}, zero: {asymmetric_zero}')
print(asymmetric_q)
print('')
print(f'Symmetric scale: {symmetric_scale}')
print(symmetric_q)


Original:
[147.16 -46.83   0.    26.65 135.02  60.88   1.46 124.1  142.15  53.96
 -45.83  31.05  -7.78 146.16 131.59 -24.1   27.18 116.29 122.6  -44.58]

Asymmetric scale: 0.7607450980392158, zero: 62.0
[255   0  62  97 239 142  64 225 249 133   2 103  52 254 235  30  98 215
 223   3]

Symmetric scale: 1.158740157480315
[127 -40   0  23 117  53   1 107 123  47 -40  27  -7 126 114 -21  23 100
 106 -38]


In [88]:
# Dequantize the parameters back to 32 bits
params_deq_asymmetric = asymmetric_dequantize(asymmetric_q, asymmetric_scale, asymmetric_zero)
params_deq_symmetric = symmetric_dequantize(symmetric_q, symmetric_scale)

print(f'Original:')
print(np.round(params, 2))
print('')
print(f'Dequantize Asymmetric:')
print(np.round(params_deq_asymmetric,2))
print('')
print(f'Dequantize Symmetric:')
print(np.round(params_deq_symmetric, 2))

Original:
[147.16 -46.83   0.    26.65 135.02  60.88   1.46 124.1  142.15  53.96
 -45.83  31.05  -7.78 146.16 131.59 -24.1   27.18 116.29 122.6  -44.58]

Dequantize Asymmetric:
[146.82 -47.17   0.    26.63 134.65  60.86   1.52 124.   142.26  54.01
 -45.64  31.19  -7.61 146.06 131.61 -24.34  27.39 116.39 122.48 -44.88]

Dequantize Symmetric:
[147.16 -46.35   0.    26.65 135.57  61.41   1.16 123.99 142.53  54.46
 -46.35  31.29  -8.11 146.   132.1  -24.33  26.65 115.87 122.83 -44.03]


In [86]:
# Calculate the quantization error
print(f'{"Asymmetric error: ":>20}{np.round(quantization_error(params, params_deq_asymmetric), 2)}')
print(f'{"Symmetric error: ":>20}{np.round(quantization_error(params, params_deq_symmetric), 2)}')

  Asymmetric error: 0.04
   Symmetric error: 0.14


# Quantization


## Quantization with Data Types or Downcasting - PyTorch

The model's parameters are saved in a more compact data type (bfloat16).
During inference, the model performs its calculations in this data type, and its activations are in this data type.


In [None]:
import torch
import torch.nn as nn

class DummyModel(nn.Module):
    def __init__(self):
        super(DummyModel, self).__init__()
        self.token_embedding = nn.Embedding(2, 2)
        self.linear_1 = nn.Linear(2, 2)
        self.layernorm_1 = nn.LayerNorm(2)
        self.linear_2 = nn.Linear(2, 2)
        self.layernorm_2 = nn.LayerNorm(2)
        self.head = nn.Linear(2, 2)

    def forward(self, x):
        x = self.token_embedding(x)
        x = self.linear_1(x)
        x = self.layernorm_1(x)
        x = torch.relu(x)
        x = self.linear_2(x)
        x = self.layernorm_2(x)
        x = torch.relu(x)
        x = self.head(x)
        return x

# Testing the model
model = DummyModel()
print(model)


DummyModel(
  (token_embedding): Embedding(2, 2)
  (linear_1): Linear(in_features=2, out_features=2, bias=True)
  (layernorm_1): LayerNorm((2,), eps=1e-05, elementwise_affine=True)
  (linear_2): Linear(in_features=2, out_features=2, bias=True)
  (layernorm_2): LayerNorm((2,), eps=1e-05, elementwise_affine=True)
  (head): Linear(in_features=2, out_features=2, bias=True)
)


In [None]:
def print_param_dtype(model):
    for name, param in model.named_parameters():
        print(f"{name} is loaded in {param.dtype}")

In [None]:
print_param_dtype(model)

token_embedding.weight is loaded in torch.float32
linear_1.weight is loaded in torch.float32
linear_1.bias is loaded in torch.float32
layernorm_1.weight is loaded in torch.float32
layernorm_1.bias is loaded in torch.float32
linear_2.weight is loaded in torch.float32
linear_2.bias is loaded in torch.float32
layernorm_2.weight is loaded in torch.float32
layernorm_2.bias is loaded in torch.float32
head.weight is loaded in torch.float32
head.bias is loaded in torch.float32


In [None]:
# float 16
model_fp16 = DummyModel().half()
print_param_dtype(model_fp16)

token_embedding.weight is loaded in torch.float16
linear_1.weight is loaded in torch.float16
linear_1.bias is loaded in torch.float16
layernorm_1.weight is loaded in torch.float16
layernorm_1.bias is loaded in torch.float16
linear_2.weight is loaded in torch.float16
linear_2.bias is loaded in torch.float16
layernorm_2.weight is loaded in torch.float16
layernorm_2.bias is loaded in torch.float16
head.weight is loaded in torch.float16
head.bias is loaded in torch.float16


In [None]:
dummy_input = torch.LongTensor([[1, 0], [0, 1]])

# inference using float32 model
logits_fp32 = model(dummy_input)

logits_fp32

tensor([[[-0.4875, -0.6201],
         [-0.4875, -0.6201]],

        [[-0.4875, -0.6201],
         [-0.4875, -0.6201]]], grad_fn=<ViewBackward0>)

In [None]:
# inference using float16 model
try:
    logits_fp16 = model_fp16(dummy_input)
except Exception as error:
    print("\033[91m", type(error).__name__, ": ", error, "\033[0m")

[91m RuntimeError :  "LayerNormKernelImpl" not implemented for 'Half' [0m


In [None]:
from copy import deepcopy
model_bf16 = deepcopy(model)
model_bf16 = model_bf16.to(torch.bfloat16)
print_param_dtype(model_bf16)

token_embedding.weight is loaded in torch.bfloat16
linear_1.weight is loaded in torch.bfloat16
linear_1.bias is loaded in torch.bfloat16
layernorm_1.weight is loaded in torch.bfloat16
layernorm_1.bias is loaded in torch.bfloat16
linear_2.weight is loaded in torch.bfloat16
linear_2.bias is loaded in torch.bfloat16
layernorm_2.weight is loaded in torch.bfloat16
layernorm_2.bias is loaded in torch.bfloat16
head.weight is loaded in torch.bfloat16
head.bias is loaded in torch.bfloat16


In [None]:
# inference using float16 model
try:
    logits_bf16 = model_bf16(dummy_input)
    print(logits_bf16)
except Exception as error:
    print("\033[91m", type(error).__name__, ": ", error, "\033[0m")

tensor([[[-0.4902, -0.6250],
         [-0.4902, -0.6250]],

        [[-0.4902, -0.6250],
         [-0.4902, -0.6250]]], dtype=torch.bfloat16, grad_fn=<ViewBackward0>)


In [None]:
mean_diff = torch.abs(logits_bf16 - logits_fp32).mean().item()
max_diff = torch.abs(logits_bf16 - logits_fp32).max().item()

print(f"Mean diff: {mean_diff} | Max diff: {max_diff}")

Mean diff: 0.003837764263153076 | Max diff: 0.004937291145324707


## Reduced Float - TensorFlow

In [33]:
import os
import tensorflow as tf
from tensorflow import keras


# Define a simple sequential model
def create_model():
  model = tf.keras.Sequential([
    keras.layers.Dense(256, activation='relu', input_shape=(784,)),
    keras.layers.Dense(2)
  ])

  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

  return model

# Create a basic model instance
model = create_model()

# Display the model's architecture
model.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_10 (Dense)            (None, 256)               200960    
                                                                 
 dense_11 (Dense)            (None, 2)                 514       
                                                                 
Total params: 201474 (787.01 KB)
Trainable params: 201474 (787.01 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [34]:
# save model - first way
model.save('model.h5')

# save model - second way
saved_model_path = 'saved_model/1'
tf.saved_model.save(model, saved_model_path)

# Get file size in bytes for a given model - first way
TF_model_size = os.path.getsize('model.h5') /float(2**10)
print('TF_model_size in KB', TF_model_size)

  saving_api.save_model(


TF_model_size in KB 802.390625


In [48]:
# Iterate through the layers of the model
for layer in model.layers:
    # Get the weights of the layer
    weights = layer.get_weights()
    print(len(weights[0]))
    print(weights[0].dtype)

784
float32
256
float32


In [49]:
converter_1 = tf.lite.TFLiteConverter.from_saved_model(saved_model_path)
converter_1.optimizations = [tf.lite.Optimize.DEFAULT]
converter_1.target_spec.supported_types = [tf.float16]
tflite_model = converter_1.convert()

open("converted_model.tflite", "wb").write(tflite_model) /float(2**10)

394.67578125

In [50]:
# Load the TensorFlow Lite model
interpreter = tf.lite.Interpreter(model_path="converted_model.tflite")
interpreter.allocate_tensors()

# Get tensor details
tensor_details = interpreter.get_tensor_details()

# Print data types of parameters
for tensor in tensor_details:
    if tensor['name'] != 'constants':
        print('Name:', tensor['name'])
        print('Data type:', tensor['dtype'])

Name: serving_default_dense_10_input:0
Data type: <class 'numpy.float32'>
Name: sequential_5/dense_10/MatMul
Data type: <class 'numpy.float16'>
Name: sequential_5/dense_11/MatMul
Data type: <class 'numpy.float16'>
Name: sequential_5/dense_10/MatMul2
Data type: <class 'numpy.float32'>
Name: sequential_5/dense_11/MatMul1
Data type: <class 'numpy.float32'>
Name: sequential_5/dense_10/MatMul;sequential_5/dense_10/Relu;sequential_5/dense_10/BiasAdd
Data type: <class 'numpy.float32'>
Name: StatefulPartitionedCall:0
Data type: <class 'numpy.float32'>


In [52]:
tensor_details

[{'name': 'serving_default_dense_10_input:0',
  'index': 0,
  'shape': array([  1, 784], dtype=int32),
  'shape_signature': array([ -1, 784], dtype=int32),
  'dtype': numpy.float32,
  'quantization': (0.0, 0),
  'quantization_parameters': {'scales': array([], dtype=float32),
   'zero_points': array([], dtype=int32),
   'quantized_dimension': 0},
  'sparsity_parameters': {}},
 {'name': 'sequential_5/dense_10/MatMul',
  'index': 1,
  'shape': array([256, 784], dtype=int32),
  'shape_signature': array([256, 784], dtype=int32),
  'dtype': numpy.float16,
  'quantization': (0.0, 0),
  'quantization_parameters': {'scales': array([], dtype=float32),
   'zero_points': array([], dtype=int32),
   'quantized_dimension': 0},
  'sparsity_parameters': {}},
 {'name': 'sequential_5/dense_11/MatMul',
  'index': 2,
  'shape': array([  2, 256], dtype=int32),
  'shape_signature': array([  2, 256], dtype=int32),
  'dtype': numpy.float16,
  'quantization': (0.0, 0),
  'quantization_parameters': {'scales': ar

In [51]:
# Iterate through the input and output details
for input_detail in interpreter.get_input_details():
    print("Input:", input_detail["name"], input_detail["dtype"])

for output_detail in interpreter.get_output_details():
    print("Output:", output_detail["name"], output_detail["dtype"])

Input: serving_default_dense_10_input:0 <class 'numpy.float32'>
Output: StatefulPartitionedCall:0 <class 'numpy.float32'>


## Hybrid Quantization - TensorFlow

In [56]:
converter_2 = tf.lite.TFLiteConverter.from_saved_model(saved_model_path)
converter_2.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model_2 = converter_2.convert()

open("converted_hybrid_model.tflite", "wb").write(tflite_model_2) /float(2**10)

199.3671875

In [57]:
# Load the TensorFlow Lite model
interpreter = tf.lite.Interpreter(model_path="converted_hybrid_model.tflite")
interpreter.allocate_tensors()

# Get tensor details
tensor_details = interpreter.get_tensor_details()

# Print data types of parameters
for tensor in tensor_details:
    if tensor['name'] != 'constants':
        print('Name:', tensor['name'])
        print('Data type:', tensor['dtype'])

Name: serving_default_dense_10_input:0
Data type: <class 'numpy.float32'>
Name: sequential_5/dense_11/MatMul
Data type: <class 'numpy.float32'>
Name: sequential_5/dense_10/MatMul1
Data type: <class 'numpy.int8'>
Name: sequential_5/dense_10/MatMul;sequential_5/dense_10/Relu;sequential_5/dense_10/BiasAdd
Data type: <class 'numpy.float32'>
Name: StatefulPartitionedCall:0
Data type: <class 'numpy.float32'>
Name: 
Data type: <class 'numpy.int8'>
Name: 
Data type: <class 'numpy.float32'>
Name: 
Data type: <class 'numpy.int32'>
Name: 
Data type: <class 'numpy.int32'>
Name: 
Data type: <class 'numpy.int32'>


## Integer Quantization - TensorFlow

We need a smaple dataset. I would like to try with titanic dataset.

In [67]:
import pandas as pd
# Load dataset.
dftrain = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/train.csv')
dfeval = pd.read_csv('https://storage.googleapis.com/tf-datasets/titanic/eval.csv')
y_train = dftrain.pop('survived')
y_eval = dfeval.pop('survived')

# Use only numeric values
dfeval = dfeval.dropna()
dfeval_num = dfeval[["age","n_siblings_spouses","parch","fare"]]
dftrain = dftrain.dropna()
dftrain_num = dftrain[["age","n_siblings_spouses","parch","fare"]]

train_data_ = np.array(dftrain_num)
train_labels = np.array(y_train)
test_data_ = np.array(dfeval_num)
test_labels = np.array(y_eval)

In [79]:
import numpy as np
def representative_data_gen():
  for i in range(100):
    yield [test_data_[i].astype(np.float32)]

# Convert model
converter_rep = tf.lite.TFLiteConverter.from_keras_model(model)
converter_rep.optimizations = [tf.lite.Optimize.DEFAULT]
converter_rep.representative_dataset = representative_data_gen
tflite_model_quant_rep = converter_rep.convert()

open("converted_rep_model.tflite", "wb").write(tflite_model_quant_rep) /float(2**10)



787.28515625

Unfortunately the model size is almost the same as the original model.

In [76]:
# Load the TensorFlow Lite model
interpreter = tf.lite.Interpreter(model_path="converted_rep_model.tflite")
interpreter.allocate_tensors()

# Get tensor details
tensor_details = interpreter.get_tensor_details()

# Print data types of parameters
for tensor in tensor_details:
    if tensor['name'] != 'constants':
        print('Name:', tensor['name'])
        print('Data type:', tensor['dtype'])

Name: serving_default_dense_10_input:0
Data type: <class 'numpy.float32'>
Name: sequential_5/dense_11/MatMul
Data type: <class 'numpy.float32'>
Name: sequential_5/dense_10/MatMul1
Data type: <class 'numpy.float32'>
Name: sequential_5/dense_10/MatMul;sequential_5/dense_10/Relu;sequential_5/dense_10/BiasAdd
Data type: <class 'numpy.float32'>
Name: StatefulPartitionedCall:0
Data type: <class 'numpy.float32'>


It is not expected. Let's use another model and convert.

In [81]:
# Load MNIST dataset
mnist = tf.keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

# Normalize the input image so that each pixel value is between 0 to 1.
train_images = train_images.astype(np.float32) / 255.0
test_images = test_images.astype(np.float32) / 255.0

# Define the model architecture
model = tf.keras.Sequential([
  tf.keras.layers.InputLayer(input_shape=(28, 28)),
  tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
  tf.keras.layers.Conv2D(filters=12, kernel_size=(3, 3), activation='relu'),
  tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(10)
])


# save model - first way
model.save('model.h5')

# save model - second way
saved_model_path = 'saved_model/1'
tf.saved_model.save(model, saved_model_path)

# Get file size in bytes for a given model - first way
TF_model_size = os.path.getsize('model.h5') /float(2**10)
print('TF_model_size in KB', TF_model_size)


def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(train_images).batch(1).take(100):
    # Model has only one input so each data point has one element.
    yield [input_value]

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen

tflite_model_quant = converter.convert()
open("converted_rep_model.tflite", "wb").write(tflite_model_quant) /float(2**10)

  saving_api.save_model(


TF_model_size in KB 96.4765625




23.890625

In [82]:
# Load the TensorFlow Lite model
interpreter = tf.lite.Interpreter(model_path="converted_rep_model.tflite")
interpreter.allocate_tensors()

# Get tensor details
tensor_details = interpreter.get_tensor_details()

# Print data types of parameters
for tensor in tensor_details:
    if tensor['name'] != 'constants':
        print('Name:', tensor['name'])
        print('Data type:', tensor['dtype'])

Name: serving_default_input_1:0
Data type: <class 'numpy.float32'>
Name: sequential_6/reshape/strided_slice/stack
Data type: <class 'numpy.int32'>
Name: sequential_6/reshape/strided_slice/stack_1
Data type: <class 'numpy.int32'>
Name: sequential_6/reshape/Reshape/shape/1
Data type: <class 'numpy.int32'>
Name: sequential_6/reshape/Reshape/shape/3
Data type: <class 'numpy.int32'>
Name: sequential_6/flatten/Const
Data type: <class 'numpy.int32'>
Name: sequential_6/dense_12/MatMul1
Data type: <class 'numpy.int8'>
Name: sequential_6/conv2d/BiasAdd/ReadVariableOp
Data type: <class 'numpy.int32'>
Name: sequential_6/conv2d/Conv2D
Data type: <class 'numpy.int8'>
Name: tfl.quantize
Data type: <class 'numpy.int8'>
Name: sequential_6/reshape/Shape
Data type: <class 'numpy.int32'>
Name: sequential_6/reshape/strided_slice
Data type: <class 'numpy.int32'>
Name: sequential_6/reshape/Reshape/shape
Data type: <class 'numpy.int32'>
Name: sequential_6/reshape/Reshape
Data type: <class 'numpy.int8'>
Name: 

## Linear Quantization - PyTorch

It enables the quantized model to maintain performance much closer to the original model by converting from the compressed data type back to the original FP32 data type during inference.


### Use Torch
The original codes: https://github.com/hkproj/quantization-notes/blob/main/post_training_quantization.ipynb

In [94]:
import torch
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torch.nn as nn
import matplotlib.pyplot as plt
from tqdm import tqdm
from pathlib import Path
import os


class DummyTorchModel(nn.Module):
    def __init__(self, hidden_size_1=100, hidden_size_2=100):
        super(VerySimpleNet,self).__init__()
        self.linear1 = nn.Linear(28*28, hidden_size_1)
        self.linear2 = nn.Linear(hidden_size_1, hidden_size_2)
        self.linear3 = nn.Linear(hidden_size_2, 10)
        self.relu = nn.ReLU()

    def forward(self, img):
        x = img.view(-1, 28*28)
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        x = self.linear3(x)
        return x

In [95]:
# Define the device
device = "cpu"
original_model = DummyTorchModel().to(device)


print('Weights before quantization')
print(original_model.linear1.weight)
print(original_model.linear1.weight.dtype)

print('Size of the model')
torch.save(original_model.state_dict(), "model.p")
print('Size (KB):', os.path.getsize("model.p")/1e3)

Weights before quantization
Parameter containing:
tensor([[ 1.3020e-02,  3.1752e-02, -4.7959e-03,  ..., -3.6602e-03,
          2.1780e-02,  3.3144e-02],
        [ 9.8718e-05,  2.7122e-02, -1.9646e-02,  ...,  3.1045e-02,
         -2.1462e-02,  3.8348e-03],
        [-1.9780e-03,  2.4252e-02, -1.5464e-02,  ...,  1.0473e-03,
         -9.3692e-03, -2.6161e-02],
        ...,
        [ 2.6387e-02,  3.5439e-02, -2.9204e-02,  ..., -2.4509e-02,
         -1.6869e-02,  2.8564e-02],
        [ 2.7821e-02,  3.4160e-02,  2.7421e-02,  ...,  1.3018e-02,
          3.8771e-03,  2.5858e-02],
        [-2.7773e-02,  3.5401e-02,  2.2690e-03,  ..., -2.7014e-03,
         -2.8694e-02, -2.1597e-02]], requires_grad=True)
torch.float32
Size of the model before quantization
Size (KB): 360.998


In [97]:
original_model

VerySimpleNet(
  (linear1): Linear(in_features=784, out_features=100, bias=True)
  (linear2): Linear(in_features=100, out_features=100, bias=True)
  (linear3): Linear(in_features=100, out_features=10, bias=True)
  (relu): ReLU()
)

In [98]:
class QuantizedModel(nn.Module):
    def __init__(self, hidden_size_1=100, hidden_size_2=100):
        super(QuantizedVerySimpleNet,self).__init__()
        # Define Quant
        self.quant = torch.quantization.QuantStub()
        self.linear1 = nn.Linear(28*28, hidden_size_1)
        self.linear2 = nn.Linear(hidden_size_1, hidden_size_2)
        self.linear3 = nn.Linear(hidden_size_2, 10)
        self.relu = nn.ReLU()
        # Define DeQuant
        self.dequant = torch.quantization.DeQuantStub()

    def forward(self, img):
        x = img.view(-1, 28*28)
        x = self.quant(x)
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        x = self.linear3(x)
        x = self.dequant(x)
        return x

In [99]:
# Define the device
device = "cpu"
quantized_model = QuantizedModel().to(device)

# Copy weights from unquantized model
quantized_model.load_state_dict(original_model.state_dict())


quantized_model.qconfig = torch.ao.quantization.default_qconfig
quantized_model = torch.ao.quantization.prepare(quantized_model) # Insert observers
quantized_model

QuantizedVerySimpleNet(
  (quant): QuantStub(
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (linear1): Linear(
    in_features=784, out_features=100, bias=True
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (linear2): Linear(
    in_features=100, out_features=100, bias=True
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (linear3): Linear(
    in_features=100, out_features=10, bias=True
    (activation_post_process): MinMaxObserver(min_val=inf, max_val=-inf)
  )
  (relu): ReLU()
  (dequant): DeQuantStub()
)

In [100]:
quantized_model = torch.ao.quantization.convert(quantized_model)
quantized_model



QuantizedVerySimpleNet(
  (quant): Quantize(scale=tensor([1.]), zero_point=tensor([0]), dtype=torch.quint8)
  (linear1): QuantizedLinear(in_features=784, out_features=100, scale=1.0, zero_point=0, qscheme=torch.per_tensor_affine)
  (linear2): QuantizedLinear(in_features=100, out_features=100, scale=1.0, zero_point=0, qscheme=torch.per_tensor_affine)
  (linear3): QuantizedLinear(in_features=100, out_features=10, scale=1.0, zero_point=0, qscheme=torch.per_tensor_affine)
  (relu): ReLU()
  (dequant): DeQuantize()
)

In [103]:
# Print the weights matrix of the model after quantization
print('Weights before quantization')
print(quantized_model.linear1.weight())

print('Size of the model')
torch.save(quantized_model.state_dict(), "quant_model.p")
print('Size (KB):', os.path.getsize("quant_model.p")/1e3)

Weights before quantization
tensor([[ 0.0129,  0.0317, -0.0048,  ..., -0.0036,  0.0218,  0.0331],
        [ 0.0000,  0.0272, -0.0196,  ...,  0.0311, -0.0216,  0.0039],
        [-0.0020,  0.0244, -0.0154,  ...,  0.0011, -0.0092, -0.0261],
        ...,
        [ 0.0263,  0.0356, -0.0291,  ..., -0.0244, -0.0168,  0.0286],
        [ 0.0277,  0.0342,  0.0275,  ...,  0.0129,  0.0039,  0.0258],
        [-0.0277,  0.0353,  0.0022,  ..., -0.0028, -0.0286, -0.0216]],
       size=(100, 784), dtype=torch.qint8,
       quantization_scheme=torch.per_tensor_affine, scale=0.00028011034009978175,
       zero_point=0)
Size of the model before quantization
Size (KB): 95.394


In [104]:
print('Original weights: ')
print(original_model.linear1.weight)
print('')
print(f'Dequantized weights: ')
print(torch.dequantize(quantized_model.linear1.weight()))
print('')

Original weights: 
Parameter containing:
tensor([[ 1.3020e-02,  3.1752e-02, -4.7959e-03,  ..., -3.6602e-03,
          2.1780e-02,  3.3144e-02],
        [ 9.8718e-05,  2.7122e-02, -1.9646e-02,  ...,  3.1045e-02,
         -2.1462e-02,  3.8348e-03],
        [-1.9780e-03,  2.4252e-02, -1.5464e-02,  ...,  1.0473e-03,
         -9.3692e-03, -2.6161e-02],
        ...,
        [ 2.6387e-02,  3.5439e-02, -2.9204e-02,  ..., -2.4509e-02,
         -1.6869e-02,  2.8564e-02],
        [ 2.7821e-02,  3.4160e-02,  2.7421e-02,  ...,  1.3018e-02,
          3.8771e-03,  2.5858e-02],
        [-2.7773e-02,  3.5401e-02,  2.2690e-03,  ..., -2.7014e-03,
         -2.8694e-02, -2.1597e-02]], requires_grad=True)

Dequantized weights: 
tensor([[ 0.0129,  0.0317, -0.0048,  ..., -0.0036,  0.0218,  0.0331],
        [ 0.0000,  0.0272, -0.0196,  ...,  0.0311, -0.0216,  0.0039],
        [-0.0020,  0.0244, -0.0154,  ...,  0.0011, -0.0092, -0.0261],
        ...,
        [ 0.0263,  0.0356, -0.0291,  ..., -0.0244, -0.0168,  

### Use Quanto Library

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
tokenizer = AutoTokenizer.from_pretrained("EleutherAI/pythia-70m")

model_name = "EleutherAI/pythia-70m"
base_model = AutoModelForCausalLM.from_pretrained(model_name)

device_count = torch.cuda.device_count()
if device_count > 0:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

base_model.to(device)
print(device)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


cpu


In [None]:
input_text = "Hello, my name is "
input_ids = tokenizer(input_text, return_tensors="pt")

outputs = base_model.generate(**input_ids, max_new_tokens=10)
outputs

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


tensor([[12092,    13,   619,  1416,   310,   209,   187,    29,    81,    31,
           187,    29,    81,    31,   187,    29]])

In [None]:
tokenizer.decode(outputs[0], skip_special_tkens=True)

'Hello, my name is \n<p>\n<p>\n<'

In [None]:
base_model.gpt_neox.layers[0].attention.dense.weight

Parameter containing:
tensor([[ 0.0309,  0.0214, -0.0100,  ...,  0.0015,  0.0079,  0.0244],
        [-0.0027,  0.0154, -0.0028,  ...,  0.0334,  0.0286, -0.0287],
        [ 0.0411,  0.0161, -0.0055,  ..., -0.0134, -0.0373, -0.0166],
        ...,
        [-0.0063,  0.0312, -0.0121,  ...,  0.0329, -0.0504,  0.0386],
        [-0.0065,  0.0047,  0.0069,  ...,  0.0031, -0.0621, -0.0452],
        [ 0.0078, -0.0311,  0.0021,  ...,  0.0286,  0.0178, -0.0465]],
       requires_grad=True)

In [None]:
!pip install quanto
import quanto
from quanto import quantize, freeze
import torch



In [None]:
quanto.quantize(base_model, weights=quanto.qint8, activations=None)

In [None]:
base_model.gpt_neox.layers[0].attention.dense.weight

Parameter containing:
tensor([[ 0.0309,  0.0214, -0.0100,  ...,  0.0015,  0.0079,  0.0244],
        [-0.0027,  0.0154, -0.0028,  ...,  0.0334,  0.0286, -0.0287],
        [ 0.0411,  0.0161, -0.0055,  ..., -0.0134, -0.0373, -0.0166],
        ...,
        [-0.0063,  0.0312, -0.0121,  ...,  0.0329, -0.0504,  0.0386],
        [-0.0065,  0.0047,  0.0069,  ...,  0.0031, -0.0621, -0.0452],
        [ 0.0078, -0.0311,  0.0021,  ...,  0.0286,  0.0178, -0.0465]],
       requires_grad=True)

Nothing changed???

In [None]:
freeze(base_model)
base_model.gpt_neox.layers[0].attention.dense.weight

QTensor(tensor([[  53,   37,  -17,  ...,    3,   14,   42],
        [  -4,   21,   -4,  ...,   46,   39,  -39],
        [  90,   35,  -12,  ...,  -29,  -81,  -36],
        ...,
        [ -12,   61,  -24,  ...,   64,  -98,   75],
        [ -11,    8,   11,  ...,    5, -102,  -74],
        [  15,  -61,    4,  ...,   56,   35,  -91]], dtype=torch.int8), scale=tensor([[0.0006],
        [0.0007],
        [0.0005],
        [0.0006],
        [0.0006],
        [0.0006],
        [0.0006],
        [0.0006],
        [0.0007],
        [0.0005],
        [0.0005],
        [0.0006],
        [0.0006],
        [0.0005],
        [0.0004],
        [0.0006],
        [0.0004],
        [0.0006],
        [0.0005],
        [0.0005],
        [0.0005],
        [0.0006],
        [0.0006],
        [0.0010],
        [0.0007],
        [0.0006],
        [0.0006],
        [0.0005],
        [0.0005],
        [0.0005],
        [0.0005],
        [0.0005],
        [0.0006],
        [0.0005],
        [0.0006],
        [0.

In [None]:
input_text = "Hello, my name is "
input_ids = tokenizer(input_text, return_tensors="pt")

outputs = base_model.generate(**input_ids, max_new_tokens=10)
outputs

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


tensor([[12092,    13,   619,  1416,   310,   209,   187,    29,    67,  5651,
            14,    77, 48013,    31, 14260,   187]])

In [None]:
tokenizer.decode(outputs[0], skip_special_tkens=True)

'Hello, my name is \n<brian-laptop> hi\n'