TORCH INT8 API

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# PyTorch INT8 API -
# This notebook evaluates a pretrained ResNet model with static quantization on ImageNet test data

import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
import torchvision.datasets as datasets
import matplotlib.pyplot as plt
import numpy as np
import time
import random
import os
from torch.utils.data import Subset, DataLoader, Dataset
from PIL import Image

# Import quantization libraries
from torch.ao.quantization import get_default_qconfig_mapping, quantize_fx
from torch.ao.quantization.quantize_fx import prepare_fx, convert_fx

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

quantization_device = torch.device("cpu")

# This is the standard preprocessing for models pretrained on ImageNet
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

imagenet_path = "/content/drive/My Drive/assignments_dl_cs7150/project/imagenet_data/dataset_dl/"
print(f"Loading from: {imagenet_path}")

# Get all jpg files from all folders
all_image_files = []
class_folders = [f for f in os.listdir(imagenet_path) if os.path.isdir(os.path.join(imagenet_path, f))]
print(f"Found {len(class_folders)} class folders (00000 to 00108)")

# Collect all jpg files
for folder in class_folders:
    folder_path = os.path.join(imagenet_path, folder)
    files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.jpg')]
    all_image_files.extend(files)

print(f"Found a total of {len(all_image_files)} images")

# Randomly sample 50 images
num_samples = 50
if len(all_image_files) > num_samples:
    random.shuffle(all_image_files)
    sampled_images = all_image_files[:num_samples]
else:
    sampled_images = all_image_files

print(f"Randomly sampled {len(sampled_images)} images for testing")

folder_to_idx = {folder: idx for idx, folder in enumerate(sorted(class_folders))}

# Custom dataset for the sampled images
class SampledImageNetDataset(Dataset):
    def __init__(self, image_files, transform=None):
        self.image_files = image_files
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = self.image_files[idx]

        # Get the class folder from the path
        folder_name = os.path.basename(os.path.dirname(img_path))
        class_idx = folder_to_idx[folder_name]

        # Load and transform the image
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        return image, class_idx

# Create the dataset and dataloader
sample_dataset = SampledImageNetDataset(sampled_images, transform=preprocess)
sample_loader = DataLoader(sample_dataset, batch_size=1, shuffle=False)

print(f"Created dataset with {len(sample_dataset)} images")
print(f"Number of classes represented: {len(set(folder_to_idx.values()))}")

idx_to_class = {v: k for k, v in folder_to_idx.items()}

# Print a few examples
print("\nSample images:")
for i in range(min(5, len(sampled_images))):
    img_path = sampled_images[i]
    folder = os.path.basename(os.path.dirname(img_path))
    print(f"{i+1}. {img_path} (Class: {folder})")



Using device: cpu
Loading from: /content/drive/My Drive/assignments_dl_cs7150/project/imagenet_data/dataset_dl/
Found 109 class folders (00000 to 00108)
Found a total of 5450 images
Randomly sampled 50 images for testing
Created dataset with 50 images
Number of classes represented: 109

Sample images:
1. /content/drive/My Drive/assignments_dl_cs7150/project/imagenet_data/dataset_dl/00097/9656554087649158.jpg (Class: 00097)
2. /content/drive/My Drive/assignments_dl_cs7150/project/imagenet_data/dataset_dl/00088/938808873819965.jpg (Class: 00088)
3. /content/drive/My Drive/assignments_dl_cs7150/project/imagenet_data/dataset_dl/00025/705343495580209.jpg (Class: 00025)
4. /content/drive/My Drive/assignments_dl_cs7150/project/imagenet_data/dataset_dl/00094/9459481074036453.jpg (Class: 00094)
5. /content/drive/My Drive/assignments_dl_cs7150/project/imagenet_data/dataset_dl/00045/811119649116805.jpg (Class: 00045)


In [3]:
# Function to load ResNet-18 model
def load_model(quantize=False):
    # Load the PyTorch model
    model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

    if not quantize:
        # Regular non-quantized model
        model = model.to(device)
        model.eval()
        return model

    # For static quantization:
    # 1. Set up a calibration data loader
    calib_dataset = SampledImageNetDataset(sampled_images[:20], transform=preprocess)
    calib_loader = DataLoader(calib_dataset, batch_size=1)

    # 2. Create a quantization configuration
    from torch.ao.quantization import get_default_qconfig

    # Get default qconfig
    qconfig = get_default_qconfig("fbgemm")
    qconfig_dict = {"": qconfig}

    # Work with the model on CPU for quantization
    model = model.to('cpu')
    model.eval()

    # Create example inputs for prepare_fx
    example_inputs = torch.randn(1, 3, 224, 224)

    # Prepare the model for quantization
    model_prepared = prepare_fx(model, qconfig_dict, example_inputs)

    # Calibrate the model with some data
    with torch.no_grad():
        for inputs, _ in calib_loader:
            model_prepared(inputs)

    # Convert the model to quantized version
    quantized_model = convert_fx(model_prepared)

    return quantized_model

In [4]:
# Function to measure FPS
def measure_fps(model, input_tensor, is_quantized=False):
    # For quantized models, ensure we're on CPU
    if is_quantized:
        input_tensor = input_tensor.to('cpu')
        model = model.to('cpu')
    else:
        input_tensor = input_tensor.to(device)
        model = model.to(device)

    # Warm-up run
    with torch.no_grad():
        model(input_tensor)

    # Actual timing
    start_time = time.time()
    num_runs = 10
    with torch.no_grad():
        for _ in range(num_runs):
            model(input_tensor)

    end_time = time.time()
    time_per_image = (end_time - start_time) / num_runs
    fps = 1.0 / time_per_image

    return fps

# Function to evaluate model on the test set
def evaluate_model(model, data_loader, is_quantized=False):
    if is_quantized:
        model = model.to('cpu')
        eval_device = 'cpu'
    else:
        model = model.to(device)
        eval_device = device

    correct = 0
    total = 0

    # For top-1 and top-5 accuracy
    top1_correct = 0
    top5_correct = 0

    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(eval_device), labels.to(eval_device)
            outputs = model(images)

            # Top-1 accuracy
            _, predicted = torch.max(outputs, 1)
            top1_correct += (predicted == labels).sum().item()

            # Top-5 accuracy
            _, top5_preds = torch.topk(outputs, 5, dim=1)
            for i in range(labels.size(0)):
                if labels[i] in top5_preds[i]:
                    top5_correct += 1

            total += labels.size(0)

    top1_accuracy = 100 * top1_correct / total
    top5_accuracy = 100 * top5_correct / total

    return top1_accuracy, top5_accuracy

In [5]:
# See a few sample images with predictions
def visualize_predictions(model_obj, dataset, loader, num_samples=5, is_quantized=False):
    if is_quantized:
        model_obj = model_obj.to('cpu')
        vis_device = 'cpu'
    else:
        model_obj = model_obj.to(device)
        vis_device = device

    model_obj.eval()
    fig, axes = plt.subplots(1, num_samples, figsize=(20, 4))

    with torch.no_grad():
        for i, (images, labels) in enumerate(loader):
            if i >= num_samples:
                break

            images, labels = images.to(vis_device), labels.to(vis_device)
            outputs = model_obj(images)

            # Get predicted class
            _, predicted = torch.max(outputs, 1)

            # Get class names (folder names)
            true_class_idx = labels.item()
            pred_class_idx = predicted.item()

            true_class = idx_to_class[true_class_idx]
            pred_class = idx_to_class.get(pred_class_idx, f"Unknown ({pred_class_idx})")

            # Get image
            img = images[0].cpu().numpy().transpose((1, 2, 0))
            # Denormalize
            img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
            img = np.clip(img, 0, 1)

            # Plot
            axes[i].imshow(img)
            axes[i].set_title(f"True: {true_class}\nPred: {pred_class}")
            axes[i].axis('off')

    plt.tight_layout()
    plt.savefig('sample_predictions_quantized.png')
    plt.show()


# Function to calculate model size in MB
def calculate_model_size_mb(model):
    """Calculate model size in megabytes"""
    # Get model state_dict
    state_dict = model.state_dict()

    # Calculate size in bytes
    total_size = 0
    for param in state_dict.values():

        if isinstance(param, torch.Tensor):
            # Each parameter's size is: num_elements * element_size
            num_elements = param.numel()
            element_size = param.element_size()  # Size in bytes of each element
            total_size += num_elements * element_size

    # Convert to MB
    size_mb = total_size / (1024 * 1024)
    return size_mb


In [6]:
# Function to run multiple evaluations and calculate averages
def run_multiple_evaluations(num_runs=5):
    # Store results for each run
    run_results = {
        'models': ['ResNet18 Original', 'ResNet18 INT8 Static'],
        'runs': [],
        'avg_top1_accuracy': [],
        'avg_top5_accuracy': [],
        'avg_fps': [],
        'avg_model_size_mb': []
    }

    # Create dummy input for FPS measurement
    dummy_input = torch.randn(1, 3, 224, 224)

    for run in range(num_runs):
        print(f"\n--- Starting Run {run+1}/{num_runs} ---")
        run_data = {
            'top1_accuracy': [],
            'top5_accuracy': [],
            'fps': [],
            'model_size_mb': []
        }

        # Evaluate original PyTorch ResNet-18
        print(f"\nRun {run+1}: Evaluating original PyTorch ResNet-18...")
        resnet18_original = load_model(quantize=False)
        model_size_original = calculate_model_size_mb(resnet18_original)
        fps_original = measure_fps(resnet18_original, dummy_input, is_quantized=False)
        top1_acc_original, top5_acc_original = evaluate_model(resnet18_original, sample_loader, is_quantized=False)

        print(f"Original model size: {model_size_original:.2f} MB")

        run_data['top1_accuracy'].append(top1_acc_original)
        run_data['top5_accuracy'].append(top5_acc_original)
        run_data['fps'].append(fps_original)
        run_data['model_size_mb'].append(model_size_original)

        # Evaluate INT8 static quantized model
        print(f"\nRun {run+1}: Evaluating ResNet-18 with INT8 Static Quantization...")
        resnet18_int8_static = load_model(quantize=True)
        model_size_static = calculate_model_size_mb(resnet18_int8_static)
        fps_int8_static = measure_fps(resnet18_int8_static, dummy_input, is_quantized=True)
        top1_acc_int8_static, top5_acc_int8_static = evaluate_model(resnet18_int8_static, sample_loader, is_quantized=True)

        print(f"Quantized model size: {model_size_static:.2f} MB")

        run_data['top1_accuracy'].append(top1_acc_int8_static)
        run_data['top5_accuracy'].append(top5_acc_int8_static)
        run_data['fps'].append(fps_int8_static)
        run_data['model_size_mb'].append(model_size_static)

        # Store this run's data
        run_results['runs'].append(run_data)

        # Print a summary of this run
        print(f"\n--- Run {run+1} Summary ---")
        print(f"{'Model':<30} {'Top-1 Acc (%)':<15} {'Top-5 Acc (%)':<15} {'FPS':<10} {'Size (MB)':<10}")
        print("-" * 85)

        for i, model_name in enumerate(run_results['models']):
            print(f"{model_name:<30} {run_data['top1_accuracy'][i]:<15.2f} {run_data['top5_accuracy'][i]:<15.2f} {run_data['fps'][i]:<10.2f} {run_data['model_size_mb'][i]:<10.2f}")

    # Calculate averages across all runs
    for metric in ['top1_accuracy', 'top5_accuracy', 'fps']:
        for model_idx in range(len(run_results['models'])):
            # Calculate average of all runs for each model
            values = [run_results['runs'][run_idx][metric][model_idx] for run_idx in range(num_runs)]
            run_results[f'avg_{metric}'].append(np.mean(values))

    # Calculate improvement/loss metrics
    original_model_idx = 0  # Index of the original model in results

    # Calculate FPS improvement for static quantized model compared to original
    static_model_idx = 1  # Index of the static quantized model
    fps_improvement = (run_results['avg_fps'][static_model_idx] / run_results['avg_fps'][original_model_idx] - 1) * 100

    # Calculate accuracy loss for static quantized model compared to original
    top1_loss = run_results['avg_top1_accuracy'][original_model_idx] - run_results['avg_top1_accuracy'][static_model_idx]
    top5_loss = run_results['avg_top5_accuracy'][original_model_idx] - run_results['avg_top5_accuracy'][static_model_idx]

    # Print final average results
    print("\n\n====== FINAL RESULTS (AVERAGED OVER 5 RUNS) ======")
    print(f"{'Model':<30} {'Top-1 Acc (%)':<15} {'Top-5 Acc (%)':<15} {'FPS':<10}")
    print("-" * 75)

    for i, model_name in enumerate(run_results['models']):
        print(f"{model_name:<30} {run_results['avg_top1_accuracy'][i]:<15.2f} {run_results['avg_top5_accuracy'][i]:<15.2f} {run_results['avg_fps'][i]:<10.2f}")

    # Print improvements/losses
    print("\n====== IMPROVEMENTS AND LOSSES ======")
    print(f"\n{run_results['models'][1]} compared to Original:")
    print(f"  FPS Improvement: {fps_improvement:+.2f}%")
    print(f"  Top-1 Accuracy Loss: {top1_loss:+.2f}%")
    print(f"  Top-5 Accuracy Loss: {top5_loss:+.2f}%")

    # Plot the results
    plot_comparison_results(run_results)

    return run_results

In [7]:
# Function to plot comparison results
def plot_comparison_results(results):
    # Create a figure with subplots
    fig, axs = plt.subplots(1, 3, figsize=(18, 6))

    # Get the data
    models = results['models']
    avg_top1 = results['avg_top1_accuracy']
    avg_top5 = results['avg_top5_accuracy']
    avg_fps = results['avg_fps']
    avg_model_size = [results['runs'][0]['model_size_mb'][i] for i in range(len(models))]

    # Accuracy plot (combining top-1 and top-5)
    bar_width = 0.35
    x = np.arange(len(models))

    axs[0].bar(x - bar_width/2, avg_top1, bar_width, color='skyblue', label='Top-1')
    axs[0].bar(x + bar_width/2, avg_top5, bar_width, color='lightgreen', label='Top-5')
    axs[0].set_xlabel('Model')
    axs[0].set_ylabel('Accuracy (%)')
    axs[0].set_title('Average Test Accuracy')
    axs[0].set_xticks(x)
    axs[0].set_xticklabels(models)
    axs[0].set_ylim([0, 100])
    axs[0].legend()
    for i, v in enumerate(avg_top1):
        axs[0].text(x[i] - bar_width/2, v + 2, f"{v:.2f}%", ha='center')
    for i, v in enumerate(avg_top5):
        axs[0].text(x[i] + bar_width/2, v + 2, f"{v:.2f}%", ha='center')

    # Model size plot
    axs[1].bar(models, avg_model_size, color='salmon')
    axs[1].set_xlabel('Model')
    axs[1].set_ylabel('Model Size (MB)')
    axs[1].set_title('Model Size')
    for i, v in enumerate(avg_model_size):
        axs[1].text(i, v + 0.5, f"{v:.2f} MB", ha='center')

    # FPS plot
    axs[2].bar(models, avg_fps, color='gold')
    axs[2].set_xlabel('Model')
    axs[2].set_ylabel('FPS')
    axs[2].set_title('Average Frames Per Second')
    for i, v in enumerate(avg_fps):
        axs[2].text(i, v + 0.5, f"{v:.2f}", ha='center')

    plt.tight_layout()
    plt.savefig('resnet18_pytorch_int8_comparison_avg.png')
    plt.show()

# Function to run multiple evaluations and calculate averages
def run_multiple_evaluations(num_runs=5):
    # Store results for each run
    run_results = {
        'models': ['ResNet18 Original', 'ResNet18 INT8 Static'],
        'runs': [],
        'avg_top1_accuracy': [],
        'avg_top5_accuracy': [],
        'avg_fps': [],
        'avg_model_size_mb': []
    }

    # Create dummy input for FPS measurement
    dummy_input = torch.randn(1, 3, 224, 224)

    for run in range(num_runs):
        print(f"\n--- Starting Run {run+1}/{num_runs} ---")
        run_data = {
            'top1_accuracy': [],
            'top5_accuracy': [],
            'fps': [],
            'model_size_mb': []
        }

        # Evaluate original PyTorch ResNet-18
        print(f"\nRun {run+1}: Evaluating original PyTorch ResNet-18...")
        resnet18_original = load_model(quantize=False)
        model_size_original = calculate_model_size_mb(resnet18_original)
        fps_original = measure_fps(resnet18_original, dummy_input, is_quantized=False)
        top1_acc_original, top5_acc_original = evaluate_model(resnet18_original, sample_loader, is_quantized=False)

        print(f"Original model size: {model_size_original:.2f} MB")

        run_data['top1_accuracy'].append(top1_acc_original)
        run_data['top5_accuracy'].append(top5_acc_original)
        run_data['fps'].append(fps_original)
        run_data['model_size_mb'].append(model_size_original)

        # Evaluate INT8 static quantized model
        print(f"\nRun {run+1}: Evaluating ResNet-18 with INT8 Static Quantization...")
        resnet18_int8_static = load_model(quantize=True)
        model_size_static = calculate_model_size_mb(resnet18_int8_static)
        fps_int8_static = measure_fps(resnet18_int8_static, dummy_input, is_quantized=True)
        top1_acc_int8_static, top5_acc_int8_static = evaluate_model(resnet18_int8_static, sample_loader, is_quantized=True)

        print(f"Quantized model size: {model_size_static:.2f} MB")

        run_data['top1_accuracy'].append(top1_acc_int8_static)
        run_data['top5_accuracy'].append(top5_acc_int8_static)
        run_data['fps'].append(fps_int8_static)
        run_data['model_size_mb'].append(model_size_static)

        # Store this run's data
        run_results['runs'].append(run_data)

        # Print a summary of this run
        print(f"\n--- Run {run+1} Summary ---")
        print(f"{'Model':<30} {'Top-1 Acc (%)':<15} {'Top-5 Acc (%)':<15} {'FPS':<10} {'Size (MB)':<10}")
        print("-" * 85)

        for i, model_name in enumerate(run_results['models']):
            print(f"{model_name:<30} {run_data['top1_accuracy'][i]:<15.2f} {run_data['top5_accuracy'][i]:<15.2f} {run_data['fps'][i]:<10.2f} {run_data['model_size_mb'][i]:<10.2f}")

    # Calculate averages across all runs
    for metric in ['top1_accuracy', 'top5_accuracy', 'fps', 'model_size_mb']:
        for model_idx in range(len(run_results['models'])):
            # Calculate average of all runs for each model
            values = [run_results['runs'][run_idx][metric][model_idx] for run_idx in range(num_runs)]
            run_results[f'avg_{metric}'].append(np.mean(values))

    # Calculate improvement/loss metrics
    original_model_idx = 0  # Index of the original model in results

    # Calculate FPS improvement for static quantized model compared to original
    static_model_idx = 1  # Index of the static quantized model
    fps_improvement = (run_results['avg_fps'][static_model_idx] / run_results['avg_fps'][original_model_idx] - 1) * 100

    # Calculate model size reduction
    size_reduction = (1 - run_results['avg_model_size_mb'][static_model_idx] / run_results['avg_model_size_mb'][original_model_idx]) * 100

    # Calculate accuracy loss for static quantized model compared to original
    top1_loss = run_results['avg_top1_accuracy'][original_model_idx] - run_results['avg_top1_accuracy'][static_model_idx]
    top5_loss = run_results['avg_top5_accuracy'][original_model_idx] - run_results['avg_top5_accuracy'][static_model_idx]

    # Print final average results
    print("\n\n====== FINAL RESULTS (AVERAGED OVER 5 RUNS) ======")
    print(f"{'Model':<30} {'Top-1 Acc (%)':<15} {'Top-5 Acc (%)':<15} {'FPS':<10} {'Size (MB)':<10}")
    print("-" * 85)

    for i, model_name in enumerate(run_results['models']):
        print(f"{model_name:<30} {run_results['avg_top1_accuracy'][i]:<15.2f} {run_results['avg_top5_accuracy'][i]:<15.2f} {run_results['avg_fps'][i]:<10.2f} {run_results['avg_model_size_mb'][i]:<10.2f}")

    # Print improvements/losses
    print("\n====== IMPROVEMENTS AND LOSSES ======")
    print(f"\n{run_results['models'][1]} compared to Original:")
    print(f"  FPS Improvement: {fps_improvement:+.2f}%")
    print(f"  Model Size Reduction: {size_reduction:+.2f}%")
    print(f"  Top-1 Accuracy Loss: {top1_loss:+.2f}%")
    print(f"  Top-5 Accuracy Loss: {top5_loss:+.2f}%")

    # Plot the results
    plot_comparison_results(run_results)

    return run_results

In [8]:
# For the final visualization, show predictions using the models from the last run
def visualize_final_models():
    print("\nVisualizing sample predictions using models from final run:")

    # Load models for visualization
    resnet18_original = load_model(quantize=False)
    resnet18_int8_static = load_model(quantize=True)

    print("\nVisualizing sample predictions using original ResNet-18 for comparison:")
    visualize_predictions(resnet18_original, sample_dataset, sample_loader)

    print("\nVisualizing sample predictions using INT8 Static Quantized ResNet-18:")
    visualize_predictions(resnet18_int8_static, sample_dataset, sample_loader, is_quantized=True)

# Run multiple evaluations and get the average metrics
print("Starting multiple evaluation runs...")
all_results = run_multiple_evaluations(num_runs=5)
print("\nEvaluation complete!")
print(f"Results are saved as 'resnet18_pytorch_int8_comparison_avg.png'")

# Visualize predictions using models from the final run
visualize_final_models()

Output hidden; open in https://colab.research.google.com to view.