## Tianyi's Model


1. Need to update the "Weights=None" to "pretrain = False" to align with Shuang's code

In [1]:
%load_ext autoreload
%autoreload 2

import random
import torch
import numpy as np
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.decomposition import PCA

%matplotlib inline



In [6]:
'''
Need to set the Code as root, and add the folder name in front of every model Tianyi used
'''
from Tianyi_Model.predict import load_trained_model, predict_image_class
from Tianyi_Model.data_preparation import get_cifar10_datasets, get_dataloader
from Tianyi_Model.train import train_model

In [7]:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [9]:
model = load_trained_model('./Tianyi_Model/output/resnet18_4cls_64dim_1tm_model.pth', 'resnet18', 64, 4, device)

NVIDIA GeForce RTX 3070 Laptop GPU with CUDA capability sm_86 is not compatible with the current PyTorch installation.
The current PyTorch install supports CUDA capabilities sm_37 sm_50 sm_60 sm_61 sm_70 sm_75 compute_37.
If you want to use the NVIDIA GeForce RTX 3070 Laptop GPU GPU with PyTorch, please check the instructions at https://pytorch.org/get-started/locally/



Model loaded successfully from ./Tianyi_Model/output/resnet18_4cls_64dim_1tm_model.pth


In [10]:
train_dataset, test_dataset = get_cifar10_datasets(data_dir='./Tianyi_Model/data')
train_loader, test_loader = get_dataloader(train_dataset, test_dataset, batch_size=128, num_workers=8)

Files already downloaded and verified
Files already downloaded and verified
Filtering and remapping base training dataset...
Dataset filtered. New number of samples: 20000
Filtering and remapping base testing dataset...
Dataset filtered. New number of samples: 4000
Triplet Dataset created with 20000 samples (based on base dataset size).
Classes found in dataset: [0, 1, 2, 3]
Filtered CIFAR-10 Triplet train loader and standard test loader created with 8 workers.


In [23]:
'''
test data and its label
'''
NEW_CLASS_MAPPING = {
    3: 0, # cat -> 0
    5: 1, # dog -> 1
    8: 2, # ship -> 2
    9: 3  # truck -> 3
}

print(test_dataset.data.shape)
print(len(test_dataset.targets))



(4000, 32, 32, 3)
4000


In [32]:
'''
Split data into ab_images/cd_images, and ab_labels/cd_labels
'''
ab_images, cd_images = [], []
ab_labels, cd_labels = [], []

for image, label in test_dataset:
    pred_label, prob, _= predict_image_class(model, image, device, True)

    if pred_label in ["cat","dog"]:
        ab_images.append(image)
        ab_labels.append(label)
    elif pred_label in ["ship","truck"]:
        cd_images.append(image)
        cd_labels.append(label)

In [33]:
cd_labels[0:5]

[2, 2, 3, 3, 2]

In [34]:
len(ab_labels)

2015

## Shuang's Model


In [36]:
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import logging
import numpy as np

# Configure logging
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s -   %(message)s",
    datefmt="%m/%d/%Y %H:%M:%S",
    level=logging.INFO,
)
logger = logging.getLogger()

# Constants (must match training)
CIFAR10_CLASSES = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
SELECTED_CLASSES = ['cat', 'dog']
NUM_CLASSES = len(SELECTED_CLASSES)
BATCH_SIZE = 64
NORMALIZATION_VARIABLES = {
    "mean": (0.4914, 0.4822, 0.4465),
    "std": (0.2470, 0.2435, 0.2616)
}

class WideResNet(nn.Module):
    def __init__(self, num_classes=10, depth=28, widen_factor=2, dropout_rate=0.0):
        super().__init__()
        n_channels = [16, 16 * widen_factor, 32 * widen_factor, 64 * widen_factor]
        assert (depth - 4) % 6 == 0
        n = (depth - 4) // 6
        self.conv1 = nn.Conv2d(3, n_channels[0], kernel_size=3, stride=1, padding=1, bias=True)
        self.block1 = self._make_block(n, n_channels[0], n_channels[1], 1, dropout_rate, True)
        self.block2 = self._make_block(n, n_channels[1], n_channels[2], 2, dropout_rate)
        self.block3 = self._make_block(n, n_channels[2], n_channels[3], 2, dropout_rate)
        self.bn1 = nn.BatchNorm2d(n_channels[3], momentum=0.001)
        self.relu = nn.LeakyReLU(negative_slope=0.1, inplace=True)
        self.avgpool = nn.AvgPool2d(8)
        self.fc = nn.Linear(n_channels[3], num_classes)

    def _make_block(self, n, in_planes, out_planes, stride, dropout_rate=0.0, activate_before_residual=False):
        layers = []
        for i in range(int(n)):
            layers.append(BasicBlock(i == 0 and in_planes or out_planes,
                                     out_planes,
                                     i == 0 and stride or 1,
                                     dropout_rate,
                                     activate_before_residual))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.block1(out)
        out = self.block2(out)
        out = self.block3(out)
        out = self.relu(self.bn1(out))
        out = self.avgpool(out)
        out = out.view(out.size(0), -1)
        return self.fc(out)


class BasicBlock(nn.Module):
    def __init__(self, in_planes, out_planes, stride, dropout_rate=0.0, activate_before_residual=False):
        super().__init__()
        self.bn1 = nn.BatchNorm2d(in_planes, momentum=0.001)
        self.relu1 = nn.LeakyReLU(negative_slope=0.1, inplace=True)
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=True)
        self.bn2 = nn.BatchNorm2d(out_planes, momentum=0.001)
        self.relu2 = nn.LeakyReLU(negative_slope=0.1, inplace=True)
        self.conv2 = nn.Conv2d(out_planes, out_planes, kernel_size=3, stride=1, padding=1, bias=True)
        self.dropout_rate = dropout_rate
        self.equalInOut = (in_planes == out_planes)
        self.convShortcut = (not self.equalInOut) and nn.Conv2d(
            in_planes, out_planes, kernel_size=1, stride=stride, padding=0, bias=True) or None
        self.activate_before_residual = activate_before_residual

    def forward(self, x):
        if not self.equalInOut and self.activate_before_residual:
            out = self.relu1(self.bn1(x))
        else:
            out = self.bn1(x)
            out = self.relu1(out)
        out = self.conv1(out if self.equalInOut else x)
        out = self.bn2(out)
        out = self.relu2(out)
        if self.dropout_rate > 0:
            out = F.dropout(out, p=self.dropout_rate, training=self.training)
        out = self.conv2(out)
        shortcut = x if self.equalInOut else self.convShortcut(x)
        return torch.add(out, shortcut)


def get_normalizer():
    return transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(**NORMALIZATION_VARIABLES)
    ])

def evaluate(model, eval_loader, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for inputs, targets in eval_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = F.cross_entropy(outputs, targets)
            total_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            correct += predicted.eq(targets).sum().item()
            total += targets.size(0)
            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())

    per_class_accuracy = []
    for i, class_name in enumerate(SELECTED_CLASSES):
        correct_class = sum((np.array(all_targets) == i) & (np.array(all_preds) == i))
        total_class = sum(np.array(all_targets) == i)
        accuracy = 100. * correct_class / total_class if total_class > 0 else 0.0
        per_class_accuracy.append(accuracy)

    return total_loss / total, 100. * correct / total, per_class_accuracy


In [37]:
from torch.utils.data import Dataset
from PIL import Image

class NumpyDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images  # list of np.array (H, W, C)
        self.labels = labels  # list of integers
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.images[idx]
        label = self.labels[idx]

        # 转成 PIL.Image 然后应用 transform
        img = Image.fromarray(img)
        if self.transform:
            img = self.transform(img)

        return img, label


In [38]:
def get_normalizer():
    return transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(**NORMALIZATION_VARIABLES)
    ])

NORMALIZATION_VARIABLES = {
    "mean": (0.4914, 0.4822, 0.4465),
    "std": (0.2470, 0.2435, 0.2616)
}
transform = get_normalizer()

ab_dataset = NumpyDataset(ab_images, ab_labels, transform=transform)
ab_loader = DataLoader(ab_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

cd_dataset = NumpyDataset(cd_images, cd_labels, transform=transform)
cd_loader = DataLoader(cd_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

In [40]:
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

ab_model = WideResNet(num_classes=2).to(device)
ab_model.load_state_dict(torch.load('best_model_ema.pth', map_location=device))

cd_model = WideResNet(num_classes=2).to(device)
cd_model.load_state_dict(torch.load('best_model_ema_shipTruckFL.pth', map_location=device))

<All keys matched successfully>

In [None]:
# 假设 ab_loader 和 cd_loader 的长度分别是 AB 数据集和 CD 数据集的样本数量
ab_size = len(ab_loader.dataset)

# 获取两个模型的准确率
ab_loss, ab_acc, ab_per_class_acc = evaluate(ab_model, ab_loader, device)


print("整体准确率 (加权平均): {:.2f}%".format(ab_acc))

In [None]:
# 假设 ab_loader 和 cd_loader 的长度分别是 AB 数据集和 CD 数据集的样本数量
ab_size = len(ab_loader.dataset)
cd_size = len(cd_loader.dataset)

# 获取两个模型的准确率
ab_loss, ab_acc, ab_per_class_acc = evaluate(ab_model, ab_loader, device)
cd_loss, cd_acc, cd_per_class_acc = evaluate(cd_model, cd_loader, device)

# 计算加权平均准确率
total_samples = ab_size + cd_size
overall_acc = (ab_acc * ab_size + cd_acc * cd_size) / total_samples

print("整体准确率 (加权平均): {:.2f}%".format(overall_acc))