In [33]:
!pip install ftfy

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m23.3.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


In [34]:
import os
import time
import os.path as osp

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader

from torchvision.datasets import CIFAR10
from torchvision import datasets
from torchvision import transforms
import torchvision

from PIL import Image, ImageFilter
import matplotlib.pyplot as plt
from PIL import Image
from clip import clip
from sklearn.metrics import f1_score

import scipy.io
import shutil

In [35]:
BATCH_SIZE = 128
VISUAL_BACKBONE = 'RN50'
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model, preprocess = clip.load(name=VISUAL_BACKBONE, device=device, download_root='/shareddata/clip/')
model.to(device)
def model_inference(model, image):
    image_embedding = model.encode_image(image)
    text_embedding = model.encode_text(text_inputs)
    image_embedding /= image_embedding.norm(dim=-1, keepdim=True)
    text_embedding /= text_embedding.norm(dim=-1, keepdim=True)
    logit_scale = model.logit_scale.exp()
    logits = logit_scale * image_embedding @ text_embedding.t()
    return logits

In [10]:
transform_MNIST_test = transforms.Compose([
    transforms.Resize(size=224),
    transforms.CenterCrop(size=(224, 224)),
    transforms.Grayscale(num_output_channels=3),  
    transforms.ToTensor(),
     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),  
])

test_set = torchvision.datasets.MNIST(root='/shareddata', train=False,
                                       download=True, transform=transform_MNIST_test)
test_dataloader = torch.utils.data.DataLoader(test_set, batch_size=BATCH_SIZE,
                                         shuffle=False, num_workers=2)

class_names = [str(i) for i in range(10)]
dataset_name = 'MNIST '
prompt = 'number' 
text_inputs = torch.cat([clip.tokenize(f"{prompt} {c}") for c in class_names]).to(device)
test_targets=[]
testing_acc=[]
test_preds =[]
with torch.no_grad():
    model.eval()
    for batch_idx, (images, labels) in enumerate(test_dataloader):
        images = images.to(device)
        labels = labels.to(device)
        logits = model_inference(model, images)
        _, predicted = logits.max(1)
        test_preds.extend(predicted.cpu().tolist())
        correct = predicted.eq(labels).sum().item()
        acc = correct / labels.size(0)
        testing_acc.append(acc)
        test_targets.extend(labels.cpu().tolist())
    test_f1 = f1_score(test_targets, test_preds, average='weighted')
    avg_acc = sum(testing_acc) / len(testing_acc)
    print(f"the zero-shot performance on {dataset_name} is {avg_acc*100:.2f}%, visual encoder is {VISUAL_BACKBONE}.")
    print(f"the f1 {dataset_name} is {test_f1}, visual encoder is {VISUAL_BACKBONE}.")
    torch.cuda.empty_cache()

the zero-shot performance on MNIST  is 11.25%, visual encoder is RN50.
the f1 MNIST  is 0.05681565066516961, visual encoder is RN50.


In [26]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
from torchcp.classification.scores import THR, APS, SAPS, RAPS
from torchcp.classification.predictors import SplitPredictor, ClusterPredictor, ClassWisePredictor
from sklearn.metrics import f1_score
from clip import load as clipload, tokenize

# 定义 CLIP 模型类
class CLIPModel(nn.Module):
    def __init__(self, clip_model, model_device) -> None:
        super().__init__()
        self.clip_model = clip_model
        self.model_device = model_device

    def forward(self, x_batch):
        image_features = self.clip_model.encode_image(x_batch.to(self.model_device))
        text_inputs = torch.cat([tokenize(f"a photo of a {c}").to(self.model_device) for c in class_names])
        text_features = self.clip_model.encode_text(text_inputs)
        image_features /= image_features.norm(dim=-1, keepdim=True)
        text_features /= text_features.norm(dim=-1, keepdim=True)
        logit_scale = self.clip_model.logit_scale.exp()
        logits = logit_scale * image_features @ text_features.t()
        return logits

# 加载 CLIP 模型
model_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
clip, preprocess = clipload(name=VISUAL_BACKBONE, device=model_device, download_root='/shareddata/clip/')
model = CLIPModel(clip, model_device).to(model_device)
# 准备 MNIST 数据集
transform_MNIST_test = transforms.Compose([
    transforms.Resize(size=224),
    transforms.CenterCrop(size=(224, 224)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

test_set = datasets.MNIST(root='./data', train=False, download=True, transform=transform_MNIST_test)
test_dataloader = DataLoader(test_set, batch_size=32, shuffle=False, num_workers=2)


# 使用测试集进行预测和评估
test_targets = []
testing_acc = []
test_preds = []

with torch.no_grad():
    model.eval()
    for batch_idx, (images, labels) in enumerate(test_dataloader):
        images = images.to(model_device)
        labels = labels.to(model_device)

        # 使用 CLIP 模型进行预测
        logits = model(images)
        _, predicted = logits.max(1)

        test_preds.extend(predicted.cpu().tolist())
        test_targets.extend(labels.cpu().tolist())

        correct = predicted.eq(labels).sum().item()
        acc = correct / labels.size(0)
        testing_acc.append(acc)

test_f1 = f1_score(test_targets, test_preds, average='weighted')
avg_acc = sum(testing_acc) / len(testing_acc)
print(f"Zero-shot performance on MNIST: {avg_acc*100:.2f}%")
print(f"F1 Score on MNIST: {test_f1:.4f}")

weight_for_saps = 0.3
penalty=0.5
score_functions = [THR(), APS(), SAPS(weight=weight_for_saps), RAPS(penalty)]
predictors = [SplitPredictor, ClusterPredictor, ClassWisePredictor]  
# 用于存储不同组合的性能结果
performance_results = []
for score_function in score_functions:
    for Predictor in predictors:
        # 创建预测器实例
        predictor = Predictor(score_function=score_function, model=model)

        # 校准预测器
        predictor.calibrate(test_dataloader, alpha=0.1)

        # 使用测试集进行预测和评估
        evaluation_results = predictor.evaluate(test_dataloader)
        coverage_rate = evaluation_results["Coverage_rate"]
        average_size = evaluation_results["Average_size"]

        # 记录性能结果
        performance_results.append((score_function.__class__.__name__, Predictor.__name__, coverage_rate, average_size))
# 打印性能结果
for result in performance_results:
    print(f"Score Function: {result[0]}, Predictor: {result[1]}, Coverage Rate: {result[2]:.4f}, Average Set Size: {result[3]:.4f}")
    
# 清理 CUDA 缓存
torch.cuda.empty_cache()

Zero-shot performance on MNIST: 11.47%
F1 Score on MNIST: 0.0494
Score Function: THR, Predictor: SplitPredictor, Coverage Rate: 0.9000, Average Set Size: 8.7136
Score Function: THR, Predictor: ClusterPredictor, Coverage Rate: 0.9009, Average Set Size: 8.7182
Score Function: THR, Predictor: ClassWisePredictor, Coverage Rate: 0.9004, Average Set Size: 7.5346
Score Function: APS, Predictor: SplitPredictor, Coverage Rate: 0.9002, Average Set Size: 8.8408
Score Function: APS, Predictor: ClusterPredictor, Coverage Rate: 0.9011, Average Set Size: 8.8405
Score Function: APS, Predictor: ClassWisePredictor, Coverage Rate: 0.8970, Average Set Size: 8.4201
Score Function: SAPS, Predictor: SplitPredictor, Coverage Rate: 0.9032, Average Set Size: 8.8980
Score Function: SAPS, Predictor: ClusterPredictor, Coverage Rate: 0.8995, Average Set Size: 8.8528
Score Function: SAPS, Predictor: ClassWisePredictor, Coverage Rate: 0.8966, Average Set Size: 8.6131
Score Function: RAPS, Predictor: SplitPredictor, C