In [1]:
import math
import torch
from torch import Tensor
from torch import nn
import torch.nn.functional as F
from typing import Optional, List
import torchvision.transforms as transforms
from PIL import Image
from torchvision.models import resnet50,ResNet50_Weights
from torch import Tensor
from matplotlib import cm
from torchvision.transforms.functional import to_pil_image
import os  

In [2]:
# 记得在文件开头导入相应的模型和权重  
from torchvision.models import (  
    resnet50, resnet101, resnet18, resnet34,  
    densenet121, densenet169, densenet201,  
    vgg16, vgg19,  
    efficientnet_b0, efficientnet_b1, efficientnet_b2,  
    mobilenet_v2, mobilenet_v3_small, mobilenet_v3_large,  
    shufflenet_v2_x0_5, shufflenet_v2_x1_0  
)  
from torchvision.models import (  
    ResNet50_Weights, ResNet101_Weights, ResNet18_Weights, ResNet34_Weights,  
    DenseNet121_Weights, DenseNet169_Weights, DenseNet201_Weights,  
    VGG16_Weights, VGG19_Weights,  
    EfficientNet_B0_Weights, EfficientNet_B1_Weights, EfficientNet_B2_Weights,  
    MobileNet_V2_Weights, MobileNet_V3_Small_Weights, MobileNet_V3_Large_Weights,  
    ShuffleNet_V2_X0_5_Weights, ShuffleNet_V2_X1_0_Weights  
) 
def select_hook_layer(model, model_type):  
        if model_type.startswith('resnet'):  
            return model.layer4[-1]  
        elif model_type.startswith('densenet'):  
            return model.features[-1]  
        elif model_type.startswith('vgg'):  
            return model.features[-1]  
        elif model_type.startswith('efficientnet'):  
            return model.features[-1]  
        elif model_type.startswith('mobilenet'):  
            return model.features[-1]  
        elif model_type.startswith('shufflenet'):  
            return model.conv5  
        else:  
            raise ValueError(f"Unsupported model type: {model_type}") 
def process_image_withhooks(img_path, model_weights=ResNet50_Weights.DEFAULT):  
    """  
    处理图片并计算特征和梯度的加权值donations_values  

    Args:  
        img_path (str): 输入图片的路径  
        model_weights: 模型的预训练权重，默认为ResNet50的默认权重  

    Returns:  
        tensor: shape (num_classes,) 的 tensor，表示各个类别的donations_values  
    """  
    # 定义预处理步骤  
    test_transform = transforms.Compose([  
        transforms.Resize(512),  
        transforms.ToTensor(),  
        transforms.Normalize(  
            mean=[0.485, 0.456, 0.406],  
            std=[0.229, 0.224, 0.225]  
        )  
    ])  

    # 获取设备  
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')  


    # 初始化模型  
    # model = resnet50(weights=model_weights).eval().to(device)  
    model = vgg19(weights=VGG19_Weights.DEFAULT).eval().to(device) 
    # model = efficientnet_b0(weights=EfficientNet_B0_Weights.DEFAULT).eval().to(device) 
    
    # print(f"try to open image, path：{img_path}")  
    if not img_path.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):  
        print(f"跳过非图片文件：{img_path}")  
        return None  

    # 加载图片  
    img_pil = Image.open(img_path)  

    # 预处理图片  
    input_tensor = test_transform(img_pil).unsqueeze(0).to(device)  

    # 定义前向hook  
    def forward_hook(module, inp, outp):  
        feature_map.append(outp)  

    # 定义反向hook  
    def backward_hook(module, inp, outp):  
        grad.append(outp)  

    # 初始化容器  
    feature_map = []  
    grad = []  
    
    # 使用示例  
    hook_layer = select_hook_layer(model, type(model).__name__.lower())  
    hook_handle = hook_layer.register_forward_hook(forward_hook)  
    grad_hook_handle = hook_layer.register_full_backward_hook(backward_hook)  

    try:  
        # 前向传播  
      
        out = model(input_tensor)  
        cls_idx = torch.argmax(out).item()  
        # 计算预测类别分数  
        score = out[:, cls_idx].sum()  

        # 反向传播  
        model.zero_grad()  
        score.backward(retain_graph=False)  # 减少内存占用  

        # 获取特征和梯度  
        weights = grad[0][0].squeeze(0).mean(dim=(1, 2))  #GAP(a)
        mean_values= feature_map[0].squeeze(0).mean(dim=(1, 2))  

        # 计算加权后的值  
        # donations_values = mean_values * weights  
        donations_values = mean_values 

        return donations_values.detach().cpu()  

    except Exception as e:  
        print(f"处理过程中出现错误：{e}")  
        return None  

    finally:  
        # 删除hook并清理数据  
        hook_handle.remove()  
        grad_hook_handle.remove()  
        feature_map.clear()  
        grad.clear()  

        # 删除GPU上的张量并释放内存  
        if input_tensor.is_cuda:  
            del input_tensor  
            torch.cuda.empty_cache()  

        if 'out' in locals():  
            del out  
        if 'score' in locals():  
            del score

In [3]:
# img_path="dataset/波斯猫/1.jpg"
# donations_values=process_image_withhooks(img_path, model_weights=ResNet50_Weights.DEFAULT)
# print(donations_values)
# donations_values.size()

In [4]:
import numpy as np  

# 配置PyTorch的内存分配参数，减少内存碎片  
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'  

def process_batch_images(class_path, batch_size=8, device=None):  
    """  
    批量处理一个类别下的所有图片，并返回每张图片的donations_values  

    Args:  
        class_path (str): 类别图片的路径  
        batch_size (int): 批次大小，缺省值为8  
        device (str or torch.device, optional): 设备类型（"cpu" 或 "cuda"），缺省值为 None，会自动检测  

    Returns:  
        list: 包含每张图片的donations_values 的列表  
    """  
    if device is None:  
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  
    print()
    print("using device:"+str(device))
    print("analyse class:"+str(class_path))
    donations_values_list = []  

    # 获取图片文件路径列表  
    img_paths = [os.path.join(class_path, f) for f in os.listdir(class_path)  
                 if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]  

    # 计算总批次数  
    total_batches = len(img_paths) // batch_size  
    if len(img_paths) % batch_size != 0:  
        total_batches += 1  

    succeed_img = 0
    # 分批次处理图片  
    for batch_idx in range(total_batches):  
        start = batch_idx * batch_size  
        end = min((batch_idx + 1) * batch_size, len(img_paths))  
        batch_paths = img_paths[start:end]  

        batch_features = []  

        # 逐张处理图片以减少内存占用  
        for img_path in batch_paths:  
            try:  
                # 使用process_image_withhooks处理图片  
                donations_value = process_image_withhooks(img_path)  

                if donations_value is not None:  
                    # 添加到批次结果  
                    # print("succeed to process image:"+img_path)
                    succeed_img += 1
                    batch_features.append(donations_value)  

            except Exception as e:  
                print(f"处理 {img_path} 时发生错误：{str(e)}")  
                continue  

        # 如果批次中没有有效的特征，跳过  
        if not batch_features:  
            continue  

        # 将批次的特征拼接到结果列表  
        donations_values_list.extend(batch_features)  

        # 释放当前批次的内存  
        batch_features.clear()  

    print(f"all_imgs : {succeed_img}")
    return donations_values_list


In [5]:

# class_path3="dataset/n01910747/" #水母
# donations_values_list1 = process_batch_images(class_path1)
# donations_values_list2 = process_batch_images(class_path2)
# donations_values_list3 = process_batch_images(class_path3)
# class_path1="dataset/n02105505/" #匈牙利牧羊犬
# class_path2="dataset/n02101006/" #戈登雪达犬

In [6]:
from sklearn.metrics.pairwise import cosine_similarity  
import numpy as np  

def calculate_intra_class_similarity(donations_values_list):  
    """  
    计算类内相似度  

    Args:  
        donations_values_list (list): 包含一个类别中所有图片的donations_values的列表  

    Returns:  
        float: 类内相似度的平均值  
    """  
    if len(donations_values_list) < 2:  
        return 0.0  # 如果类别中图片少于2张，无法计算相似度  

    # 将tensor列表转换为numpy数组  
    donations_array = np.stack([tensor.numpy() for tensor in donations_values_list])  

    # 计算余弦相似度矩阵  
    similarity_matrix = cosine_similarity(donations_array)  

    # 取上三角部分（忽略对角线）  
    upper_triangle = np.triu(similarity_matrix, k=1)  

    # 计算平均值  
    num_pairs = len(donations_values_list) * (len(donations_values_list) - 1) // 2  
    return np.sum(upper_triangle) / num_pairs 

In [7]:
def calculate_inter_class_similarity(class1_values, class2_values):  
    """  
    计算类间相似度  

    Args:  
        class1_values (list): 类别1的donations_values列表  
        class2_values (list): 类别2的donations_values列表  

    Returns:  
        float: 类间相似度的平均值  
    """  
    if len(class1_values) == 0 or len(class2_values) == 0:  
        return 0.0  # 如果某一类别没有数据，无法计算相似度  

    # 将tensor列表转换为numpy数组  
    class1_array = np.stack([tensor.numpy() for tensor in class1_values])  
    class2_array = np.stack([tensor.numpy() for tensor in class2_values])  

    # 计算余弦相似度矩阵  
    similarity_matrix = cosine_similarity(class1_array, class2_array)  

    # 返回平均值  
    return np.mean(similarity_matrix)

In [8]:
import os    

def batch_process_and_analyze(dataset_path, class1, class2):  
    """  
    批量处理图片并计算类内和类间相似度  

    Args:  
        dataset_path (str): 数据集路径，包含两个类别文件夹class1和class2  

    Returns:  
        tuple: (intra_class_similarity_class1, intra_class_similarity_class2, inter_class_similarity)  
    """  
    class1_path = os.path.join(dataset_path, class1)  
    class2_path = os.path.join(dataset_path, class2)  

    # 处理两个类别的图片  
    class1_values = process_batch_images(class1_path)  
    class2_values = process_batch_images(class2_path)  

    # 计算类内相似度  
    intra_class_similarity_class1 = calculate_intra_class_similarity(class1_values)  
    intra_class_similarity_class2 = calculate_intra_class_similarity(class2_values)  

    # 计算类间相似度  
    inter_class_similarity = calculate_inter_class_similarity(class1_values, class2_values)  

    return intra_class_similarity_class1, intra_class_similarity_class2, inter_class_similarity

In [9]:
# class_path1="dataset/n02105505/" #匈牙利牧羊犬
# class_path2="dataset/n02101006/" #戈登雪达犬
dataset_path = "dataset/"  
intra_class1, intra_class2, inter_class = batch_process_and_analyze(dataset_path,"n01532829/","n01558993/")  

print()
print(f"Class1 类内相似度: {intra_class1}")  
print(f"Class2 类内相似度: {intra_class2}")  
print(f"Class1 和 Class2 类间相似度: {inter_class}") 


using device:cuda
analyse class:dataset/n01532829/
all_imgs : 100

using device:cuda
analyse class:dataset/n01558993/
all_imgs : 100

Class1 类内相似度: 0.6820870817550505
Class2 类内相似度: 0.685785886205808
Class1 和 Class2 类间相似度: 0.6185536980628967


In [10]:
dataset_path = "dataset/"  
# intra_class1, intra_class2, inter_class = batch_process_and_analyze(dataset_path,"n01532829/","n01558993/")  
intra_class3, intra_class4, inter_class0 = batch_process_and_analyze(dataset_path,"n02105505/","n02108089/")
print()
print(f"Class1 类内相似度: {intra_class3}")  
print(f"Class2 类内相似度: {intra_class4}")  
print(f"Class1 和 Class2 类间相似度: {inter_class0}") 


using device:cuda
analyse class:dataset/n02105505/
all_imgs : 100

using device:cuda
analyse class:dataset/n02108089/
all_imgs : 100

Class1 类内相似度: 0.6975414792455809
Class2 类内相似度: 0.669719114977904
Class1 和 Class2 类间相似度: 0.45617949962615967
