In [None]:
onnx_model_path = "./model/model.onnx"

ExecutionProvider="NvTensorRTRTXExecutionProvider"
if ExecutionProvider == "OpenVINOExecutionProvider":
    onnx_model_path = "./model/ov_model_st_quant.onnx"

In [None]:
import numpy as np
import onnxruntime as ort
import time
import torch
import torchvision.transforms as transforms
from datasets import load_dataset
from transformers import ViTFeatureExtractor, ViTForImageClassification

In [None]:
num_samples = 256

In [None]:
# Load datasets

feature_extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224")
preprocess = transforms.Compose([
    transforms.Lambda(lambda img: img.convert("RGB")),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=feature_extractor.image_mean, std=feature_extractor.image_std),
])

def imageTransform(example):
    example["image"] = preprocess(example["image"])
    return example
datasetStream = load_dataset("timm/mini-imagenet", split="validation", streaming=True, trust_remote_code=True)
iterable_dataset = iter(datasetStream)
selected_samples = [next(iterable_dataset) for _ in range(num_samples)]
selected_samples = list(map(imageTransform, selected_samples))

def get_imagenet_label_map():
    import json
    from pathlib import Path
    cache_file = Path(f"../../cache/data/imagenet_class_index.json")
    if not cache_file.exists():
        import requests        
        imagenet_class_index_url = (
            "https://raw.githubusercontent.com/pytorch/vision/main/gallery/assets/imagenet_class_index.json"
        )
        response = requests.get(imagenet_class_index_url)
        response.raise_for_status()  # Ensure the request was successful
        content = response.json()
        cache_file.parent.resolve().mkdir(parents=True, exist_ok=True)
        with open(cache_file, "w") as f:
            json.dump(content, f)
    else:
        with open(cache_file) as f:
            content = json.loads(f.read())

    return {v[0]: int(k) for k, v in content.items()}

label_map = get_imagenet_label_map()
label_names = datasetStream.features["label"].names

def mini_to_imagenet_label(mini_label):
    class_name = label_names[mini_label]
    return label_map[class_name]

In [None]:
# Original model metrics

def evaluate_torch(model, selected_samples, device):
    model.eval()
    correct, total = 0, 0
    latencies = []
    with torch.no_grad():
        for example in selected_samples:
            image = example["image"].unsqueeze(0).to(device)
            label = torch.tensor(example["label"]).to(device)
            label = mini_to_imagenet_label(label.item())
            
            start_time = time.time()
            output = model(image)
            end_time = time.time()
            
            latencies.append((end_time - start_time))
            pred = torch.argmax(output.logits, dim=1)
            correct += (pred == label).sum().item()
            total += 1
    
    accuracy = correct / total
    avg_latency = np.mean(latencies)
    return accuracy, avg_latency

device = torch.device("cpu")
model = ViTForImageClassification.from_pretrained("google/vit-base-patch16-224").to(device)
accuracy, avg_latency = evaluate_torch(model, selected_samples, device)

print(f"Original Model Accuracy: {accuracy * 100:.2f}%")
print(f"Original Model Average Latency Per Image: {avg_latency * 1000:.2f} ms")

In [None]:
# Quantized model metrics

def evaluate_onnx(session, selected_samples):
    correct, total = 0, 0
    latencies = []
    input_name = session.get_inputs()[0].name
    output_name = session.get_outputs()[0].name

    for example in selected_samples:
        image = np.expand_dims(example["image"], axis=0)
        label = example["label"]
        label = mini_to_imagenet_label(label)
        
        start_time = time.time()
        output = session.run([output_name], {input_name: image.astype(np.float16)})[0]
        end_time = time.time()
        
        latencies.append((end_time - start_time))
        pred = np.argmax(output, axis=1)[0]
        correct += (pred == label)
        total += 1
    
    accuracy = correct / total
    avg_latency = np.mean(latencies)
    return accuracy, avg_latency

def add_ep_for_device(session_options, ep_name, device_type, ep_options=None):
    ep_devices = ort.get_ep_devices()
    for ep_device in ep_devices:
        if ep_device.ep_name == ep_name and ep_device.device.type == device_type:
            print(f"Adding {ep_name} for {device_type}")
            session_options.add_provider_for_devices([ep_device], {} if ep_options is None else ep_options)


session_options = ort.SessionOptions()

add_ep_for_device(session_options, ExecutionProvider, ort.OrtHardwareDeviceType.NPU)

session = ort.InferenceSession(
    onnx_model_path, # a model with QNN EPContext nodes
    sess_options=session_options,
)

accuracy, avg_latency = evaluate_onnx(session, selected_samples)

print(f"Quantized Model Accuracy: {accuracy * 100:.2f}%")
print(f"Quantized Model Average Latency Per Image: {avg_latency * 1000:.2f} ms")