In [279]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm.auto import tqdm
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset, Subset

## Model Training + Evaluation

In [310]:
class CustomLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super(CustomLinear, self).__init__()
        self.weight = nn.Parameter(torch.randn(out_features, in_features))
        self.bias = nn.Parameter(torch.zeros(out_features))
        
        # Initialize weights and bias using Xavier/Glorot initialization
        nn.init.xavier_uniform_(self.weight)
        nn.init.constant_(self.bias, 0.0)

    def forward(self, x):
        return torch.matmul(x, self.weight.t()) + self.bias

In [311]:
# Define a simple model with a single fully connected layer for regression
class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc = CustomLinear(10, 1)  

        # Assign names to linear layers
        for name, param in self.fc.named_parameters():
            self.register_buffer(f'fc_{name}', param)

        # Initialize weights and bias using Xavier/Glorot initialization
        nn.init.xavier_uniform_(self.fc.weight.data)
        nn.init.constant_(self.fc.bias.data, 0.0)
    
    def forward(self, x):
        return self.fc(x)

In [312]:
# Generate synthetic data
def generate_synthetic_data(num_samples, input_size):
    inputs = torch.randn(num_samples, input_size)
    targets = torch.randint(0, 5, (num_samples,))
    return inputs, targets

In [313]:
# Define the model
model = SimpleModel()
print("Original Model:")
print(model)

# Generate synthetic data
X, y = generate_synthetic_data(100000, 10)
dataset = TensorDataset(X, y)
data_loader = DataLoader(dataset, batch_size=10, shuffle=True)

Original Model:
SimpleModel(
  (fc): CustomLinear()
)


In [314]:
# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert the numpy arrays to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# Create TensorDataset objects for train and test data
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create DataLoader objects for train and test datasets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

  X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
  y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
  X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
  y_test_tensor = torch.tensor(y_test, dtype=torch.float32)


In [315]:
# Train the model
def train_model(model, data_loader, criterion, optimizer, num_epochs):
    model.train()
    with torch.no_grad():
        for epoch in range(num_epochs):
            total_loss = 0.0
            for inputs, targets in data_loader:
                optimizer.zero_grad()  # Zero the gradients
    
                # Forward pass
                outputs = model(inputs)
                loss = criterion(outputs, targets)
    
                # Backward pass and optimization
                loss.requires_grad = True
                loss.backward()
                optimizer.step()
    
                total_loss += loss.item()
    
            avg_loss = total_loss / len(data_loader)
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}')

In [316]:
# Define loss function and optimizer for regression
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
num_epochs = 50
train_model(model, train_loader, criterion, optimizer, num_epochs)

Epoch [1/50], Loss: 8.6553
Epoch [2/50], Loss: 8.6505
Epoch [3/50], Loss: 8.6477
Epoch [4/50], Loss: 8.6503
Epoch [5/50], Loss: 8.6474
Epoch [6/50], Loss: 8.6535
Epoch [7/50], Loss: 8.6494
Epoch [8/50], Loss: 8.6494
Epoch [9/50], Loss: 8.6536
Epoch [10/50], Loss: 8.6471
Epoch [11/50], Loss: 8.6462
Epoch [12/50], Loss: 8.6487
Epoch [13/50], Loss: 8.6475
Epoch [14/50], Loss: 8.6518
Epoch [15/50], Loss: 8.6502
Epoch [16/50], Loss: 8.6545
Epoch [17/50], Loss: 8.6555
Epoch [18/50], Loss: 8.6471
Epoch [19/50], Loss: 8.6533
Epoch [20/50], Loss: 8.6571
Epoch [21/50], Loss: 8.6499
Epoch [22/50], Loss: 8.6506
Epoch [23/50], Loss: 8.6481
Epoch [24/50], Loss: 8.6537
Epoch [25/50], Loss: 8.6519
Epoch [26/50], Loss: 8.6510
Epoch [27/50], Loss: 8.6491
Epoch [28/50], Loss: 8.6475
Epoch [29/50], Loss: 8.6483
Epoch [30/50], Loss: 8.6524
Epoch [31/50], Loss: 8.6499
Epoch [32/50], Loss: 8.6501
Epoch [33/50], Loss: 8.6530
Epoch [34/50], Loss: 8.6506
Epoch [35/50], Loss: 8.6454
Epoch [36/50], Loss: 8.6464
E

In [317]:
def evaluate_model(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0

    with torch.no_grad():
        for data, targets in data_loader:
            outputs = model(data)
            
            # Compute the loss
            loss = criterion(outputs, targets)
            total_loss += loss.item()

    avg_loss = total_loss / len(data_loader)
    print(f"Test Loss (MSE): {avg_loss:.4f}")
    
    return avg_loss

In [318]:
# Example usage:
test_loss_fc_noquant = evaluate_model(model, test_loader, criterion)

Test Loss (MSE): 8.6669


## Add PTSQ

In [319]:
# Define a subset of indices to use for calibration
def get_calibration_subset_indices(dataset, subset_size=1000):
    """Select a subset of indices for calibration."""
    # Randomly sample indices
    indices = np.random.choice(len(dataset), size=subset_size, replace=False)
    return indices

In [320]:
# Get calibration subset indices
calibration_indices = get_calibration_subset_indices(train_dataset, subset_size=int(len(train_dataset) * 0.1))

# Create a subset of the dataset
calibration_subset = Subset(train_dataset, calibration_indices)

# Create a DataLoader for the calibration subset
calibration_loader = DataLoader(calibration_subset, batch_size=32, shuffle=False)

In [321]:
def collect_activations(model, dataloader, device):
    activations = {}

    def forward_hook(module, input, output):
        if module is model.fc:
            module_name = "fc_output"
            if isinstance(output, tuple):
                for i, out in enumerate(output):
                    if isinstance(out, torch.Tensor):
                        activations.setdefault(f"{module_name}_{i}", []).append(out.cpu().numpy())
            elif isinstance(output, torch.Tensor):
                activations.setdefault(module_name, []).append(output.cpu().numpy())

    # Register hooks only for the fully connected layer
    hook = model.fc.register_forward_hook(forward_hook)

    model.eval()

    with torch.no_grad():
        for x_batch, _ in dataloader:
            x_batch = x_batch.to(device)
            model(x_batch)

    # Remove the hook
    hook.remove()

    return activations

In [322]:
def pad_arrays(arrays, target_shape):
    padded_arrays = []
    for arr in arrays:
        pad_width = [(0, max(0, target - dim)) for dim, target in zip(arr.shape, target_shape)]
        padded_array = np.pad(arr, pad_width, mode='constant', constant_values=0)
        padded_arrays.append(padded_array)
    return padded_arrays

def compute_histogram(activations, num_bins=2048):
    histograms = {}
    for layer_type, outputs in activations.items():
        target_shape = tuple(max(s) for s in zip(*[output.shape for output in outputs]))
        padded_outputs = pad_arrays(outputs, target_shape)
        all_outputs = np.concatenate(padded_outputs, axis=0)
        histograms[layer_type] = np.histogram(all_outputs, bins=num_bins, range=(all_outputs.min(), all_outputs.max()))
    return histograms

def compute_scale_from_range(min_value, max_value, num_levels=256):
    scale = (max_value - min_value) / (num_levels - 1)
    return scale

def compute_optimal_scale(histogram, num_levels=256):
    counts, bin_edges = histogram
    
    # Compute scale based on min and max bin edges
    min_value = bin_edges[0]
    max_value = bin_edges[-1]
    
    scale = (max_value - min_value) / (num_levels - 1)
    
    return scale

# Modify the histogram processing to calculate scales
def compute_scales_from_histograms(histograms, num_levels=256):
    scales = {}
    
    for layer, histogram in histograms.items():
        scale = compute_optimal_scale(histogram, num_levels)
        scales[layer] = scale
        
        # Optional: Compute zero point if needed
        min_value = histogram[1][0]  # Minimum bin edge
        zero_point = round(-min_value / scale)
        scales[f'{layer}_zero_point'] = zero_point
    
    return scales

In [323]:
# Set up device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

activations = collect_activations(model, calibration_loader, device)
histograms = compute_histogram(activations)

# Debugging output for histograms
print(f'Histograms: {histograms}')

scales = {layer: compute_optimal_scale(histogram) for layer, histogram in histograms.items()}
print(f'Scales: {scales}')
# activations

Histograms: {'fc_output': (array([1, 0, 0, ..., 0, 0, 1]), array([-5.823876 , -5.818256 , -5.812636 , ...,  5.6748137,  5.6804338,
        5.6860538], dtype=float32))}
Scales: {'fc_output': 0.04513697904698989}


In [324]:
def quantize_tensor(tensor, scale, zero_point=0, symmetric=False):
    if symmetric:
        return (tensor / scale).round().clamp(-128, 127).to(torch.int8)
    else:
        return ((tensor / scale) + zero_point).round().clamp(0, 255).to(torch.uint8)

In [325]:
def int32_computation(weights, biases, inputs):
    """Perform INT32 computations for recurrent/linear cells."""
    return torch.matmul(inputs, weights) + biases

In [326]:
def requantize_tensor(int32_tensor, output_scale):
    """Requantize INT32 results back to INT8."""
    return (int32_tensor.float() / output_scale).round().clamp(-128, 127).to(torch.int8)

In [327]:
def dequantize_tensor(tensor, scale, zero_point=0, symmetric=False):
    if symmetric:
        return tensor.float() * scale
    else:
        return (tensor.to(torch.float32) - zero_point) * scale

In [328]:
def compute_mse(original, dequantized):
    return np.mean((original - dequantized) ** 2)

def print_intermediate_results(x, x_quantized, x_dequantized):
    print("Original tensor:", x.mean().item(), x.std().item())
    print("Quantized tensor:", x_quantized.cpu().numpy().mean().item(), x_quantized.cpu().numpy().std().item())
    print("Dequantized tensor:", x_dequantized.cpu().numpy().mean().item(), x_dequantized.cpu().numpy().std().item())

In [329]:
def quantized_forward(model, x, scales, zero_point=0):
    x_quantized = quantize_tensor(x, scales['fc_output'], zero_point)
    fc_weight_quantized = quantize_tensor(model.fc.weight.data, scales['fc_output'], zero_point)
    fc_biases_quantized = quantize_tensor(model.fc.bias.data, scales['fc_output'], zero_point)
    dense_out_int32 = int32_computation(
        fc_weight_quantized.to(torch.int32).T,
        fc_biases_quantized.to(torch.int32),
        x_quantized.to(torch.int32)
    )
    dense_out = requantize_tensor(dense_out_int32, scales['fc_output'])
    return dense_out

In [330]:
def evaluate_model_with_quantization(model, test_loader, scales, zero_point=0):
    """Evaluate model with quantization."""
    test_loss_quant = 0.0
    test_loss_dequant = 0.0
    total = 0 
    
    criterion = nn.MSELoss()  # Adjust this criterion if needed
    
    with torch.no_grad():
        for X_batch, y_batch in tqdm(test_loader, position=0, leave=True):
            # Move to device
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            
            # Forward pass with quantization
            y_pred_quant = quantized_forward(model, X_batch, scales, zero_point)
            
            # Dequantize the predictions
            y_pred_dequant = dequantize_tensor(y_pred_quant, scales['fc_output'])
            
            # Calculate MSE between original and dequantized predictions
            y_pred_quant_np = y_pred_quant.cpu().numpy()
            y_pred_dequant_np = y_pred_dequant.cpu().numpy()
            # mse_error = compute_mse(y_pred_quant_np, y_pred_dequant_np)
            # print(f'Mean Squared Error between quantized and dequantized tensor: {mse_error}')
            
            # Loss for quantized predictions
            y_pred_quant_dequant = dequantize_tensor(y_pred_quant, scales['fc_output'])
            loss_quant = criterion(y_pred_quant_dequant, y_batch)
            test_loss_quant += loss_quant.item()
            
            # Loss for dequantized predictions
            loss_dequant = criterion(y_pred_dequant, y_batch)
            test_loss_dequant += loss_dequant.item()
            
            total += y_batch.size(0)
    
    avg_test_loss_quant = test_loss_quant / len(test_loader)
    avg_test_loss_dequant = test_loss_dequant / len(test_loader)

    print('Avg Test Loss Quantized:', avg_test_loss_quant)
    print('Avg Test Loss Dequantized:', avg_test_loss_dequant)
    print('% diff:', 100 * abs(avg_test_loss_dequant - avg_test_loss_quant) / avg_test_loss_dequant, '\n')
    
    return avg_test_loss_quant, avg_test_loss_dequant

# Evaluate the model
avg_test_loss_quant, avg_test_loss_noquant = evaluate_model_with_quantization(model, data_loader, scales)
percent_diff_in_test_loss_fc = 100 * abs(avg_test_loss_quant - avg_test_loss_noquant) / avg_test_loss_noquant
print(f'{percent_diff_in_test_loss_fc}% difference between quantized & non-quantized loss')

  0%|          | 0/10000 [00:00<?, ?it/s]

Avg Test Loss Quantized: 15.825647464823723
Avg Test Loss Dequantized: 15.825647464823723
% diff: 0.0 

0.0% difference between quantized & non-quantized loss
