In [1]:
import os

os.environ["CUDA_VISIBLE_DEVICES"]="1"

In [2]:
import torch
import torch.nn as nn
import timm
from torchvision import transforms as Transforms
import torch.nn.functional as F
from torch.utils.data import DataLoader
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import glob
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# 递归删除指定目录下的.ipynb_checkpoints文件夹
def remove_ipynb_checkpoints(root_folder):
    for root, dirs, files in os.walk(root_folder):
        for dir in dirs:
            if dir == ".ipynb_checkpoints":
                folder_path = os.path.join(root, dir)
                shutil.rmtree(folder_path)
                print(f"Deleted: {folder_path}")

class SwinTransformerTeacher(nn.Module):
    def __init__(self, num_features=512):
        super(SwinTransformerTeacher, self).__init__()
        self.model = timm.create_model('swin_base_patch4_window7_224')
        self.num_features = num_features
        self.feat = nn.Linear(1024, num_features) if num_features > 0 else None

    def extract_feat(self, x):
        # 创建一个空列表，用于保存各层的输出特征
        features = []
        
        patch_embed = self.model.patch_embed  # Patch Embedding 层
        pos_drop = self.model.pos_drop
        layers = self.model.layers  # 基本层（包含多个 SwinBlock）
        
        x = patch_embed(x)  # Patch Embedding
        x = pos_drop(x)
        for layer in layers:  # 逐个通过 BasicLayer
            # x = layer(x)
            # features.append(x)
            for block in layer.blocks:
                x = block(x)
            features.append(x)
            if layer.downsample is not None:
                x = layer.downsample(x)
        return tuple(features)

    def forward_specific_stage(self, x, stage, down_sample=True):
        BS, L, C = x.shape

        if stage == 2:
            if down_sample:
                x = self.model.layers[-4].downsample(x)

            for block in self.model.layers[-3].blocks:
                x = block(x)

        if stage == 3:
            if down_sample:
                x = self.model.layers[-3].downsample(x)

            for block in self.model.layers[-2].blocks:
                x = block(x)

        if stage == 4:
            if down_sample:
                x = self.model.layers[-2].downsample(x)

            for block in self.model.layers[-1].blocks:
                x = block(x)

            norm_layer = self.model.norm
            x = norm_layer(x)

        return x
        
    def forward_features(self, x):
        x = self.model.forward_features(x)
        return x

    def forward(self, x):
        x = self.model.forward_features(x)
        if not self.feat is None:
            x = self.feat(x)
        return x

'''
class ResNetStudent(nn.Module):
    def __init__(self, num_features=512):
        super(ResNetStudent, self).__init__()
        self.model = timm.create_model('resnet50', pretrained=True)  # 使用ResNet-50作为学生模型
        self.gap = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten_dim = 2048
        self.feat = nn.Linear(self.flatten_dim, num_features) if num_features > 0 else None
    
    def extract_feat(self, x):
        # 创建一个空列表，用于保存各层的输出特征
        features = []
        
        # 提取每个阶段的层
        conv1 = self.model.conv1  # 初始卷积层
        bn1 = self.model.bn1
        act1 = self.model.act1
        maxpool = self.model.maxpool
        layer1 = self.model.layer1  # 第一阶段（残差块1）
        layer2 = self.model.layer2  # 第二阶段（残差块2）
        layer3 = self.model.layer3  # 第三阶段（残差块3）
        layer4 = self.model.layer4  # 第四阶段（残差块4）
        
        x = conv1(x)
        x = bn1(x)
        x = act1(x)
        x = maxpool(x)
        stage1_out = layer1(x)  # 第一阶段的输出
        features.append(stage1_out)
        stage2_out = layer2(stage1_out)  # 第二阶段的输出
        features.append(stage2_out)
        stage3_out = layer3(stage2_out)  # 第三阶段的输出
        features.append(stage3_out)
        stage4_out = layer4(stage3_out)  # 第四阶段的输出
        features.append(stage4_out)
        return tuple(features)
        
    def forward_features(self, x):
        x = self.model.forward_features(x)
        return x

    def forward(self, x):
        x = self.model.forward_features(x)
        # 池化操作，[batch_size, 2048, 7, 7] -> [batch_size, 2048, 1, 1]
        x = self.gap(x)
        # 展平特征图，将其变为 [batch_size, 2048 * 1 * 1]
        x = x.view(x.size(0), -1)  # 展平
        if not self.feat is None:
            x = self.feat(x)
        return x
'''

class ResNetStudent(nn.Module):
    def __init__(self, num_features=512):
        super(ResNetStudent, self).__init__()
        self.model = timm.create_model('resnet50', pretrained=True)  # 使用ResNet-50作为学生模型
        # 修改 layer1 和 layer2，向每个残差块中的 ReLU 前加上 InstanceNorm2d
        self._modify_layer(self.model.layer1)
        self._modify_layer(self.model.layer2)
        self._modify_layer_stride(self.model.layer4[0].conv2, self.model.layer4[0].downsample[0])
        # 输出设置
        self.gap = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten_dim = 2048
        self.feat = nn.Linear(self.flatten_dim, num_features) if num_features > 0 else None
    
    def extract_feat(self, x):
        # 创建一个空列表，用于保存各层的输出特征
        features = []
        
        # 提取每个阶段的层
        conv1 = self.model.conv1  # 初始卷积层
        bn1 = self.model.bn1
        act1 = self.model.act1
        maxpool = self.model.maxpool
        layer1 = self.model.layer1  # 第一阶段（残差块1）
        layer2 = self.model.layer2  # 第二阶段（残差块2）
        layer3 = self.model.layer3  # 第三阶段（残差块3）
        layer4 = self.model.layer4  # 第四阶段（残差块4）
        
        x = conv1(x)
        x = bn1(x)
        x = act1(x)
        x = maxpool(x)
        stage1_out = layer1(x)  # 第一阶段的输出
        features.append(stage1_out)
        stage2_out = layer2(stage1_out)  # 第二阶段的输出
        features.append(stage2_out)
        stage3_out = layer3(stage2_out)  # 第三阶段的输出
        features.append(stage3_out)
        stage4_out = layer4(stage3_out)  # 第四阶段的输出
        features.append(stage4_out)
        return tuple(features)

    def _modify_layer(self, layer):
        """
        在每个残差块中的 ReLU 前加上 InstanceNorm2d 操作。
        """
        for block in layer:
            # 修改 conv1 和 conv2 之后的 ReLU，将 InstanceNorm2d 放在 ReLU 前面
            # 对于每个残差块，将 InstanceNorm2d 加入到 ReLU 之前
            block.act3 = nn.Sequential(
                nn.InstanceNorm2d(block.conv3.out_channels, affine=True),
                nn.ReLU(inplace=True)
            )
            
    def _modify_layer_stride(self, last_layer, last_layer_downsample):
        # 在最后一层将stride改为1
        last_layer.stride = (1, 1)
        last_layer_downsample.stride = (1, 1)
        
    def forward_features(self, x):
        x = self.model.forward_features(x)
        return x

    def forward(self, x):
        x = self.model.forward_features(x)
        # 池化操作，[batch_size, 2048, 7, 7] -> [batch_size, 2048, 1, 1]
        x = self.gap(x)
        # 展平特征图，将其变为 [batch_size, 2048 * 1 * 1]
        x = x.view(x.size(0), -1)  # 展平
        if not self.feat is None:
            x = self.feat(x)
        return x

'''
class ResNetStudent(nn.Module):
    def __init__(self, num_features=512):
        super(ResNetStudent, self).__init__()
        self.model = timm.create_model('resnet50', pretrained=True)  # 使用ResNet-50作为学生模型
        # 修改 layer1 和 layer2，向每个残差块中的 ReLU 前加上 InstanceNorm2d
        self._modify_layer(self.model.layer1)
        self._modify_layer(self.model.layer2)
        self._modify_layer_stride(self.model.layer4[0].conv2, self.model.layer4[0].downsample[0])
        # 进行特征映射
        self.projector_1 = AttentionProjector(student_dims=256, teacher_dims=128, hw_dims=(56, 56), pos_dims=128, window_shapes=(1, 1), self_query=True, 
                                 softmax_scale=5.0, num_heads=4)
        self.projector_2 = AttentionProjector(student_dims=512, teacher_dims=256, hw_dims=(28, 28), pos_dims=256, window_shapes=(1, 1), self_query=True, 
                                         softmax_scale=5.0, num_heads=8)
        self.projector_3 = AttentionProjector(student_dims=1024, teacher_dims=512, hw_dims=(14, 14), pos_dims=512, window_shapes=(1, 1), self_query=True, 
                                         softmax_scale=5.0, num_heads=16)
        self.projector_4 = AttentionProjector(student_dims=2048, teacher_dims=1024, hw_dims=(7, 7), pos_dims=1024, window_shapes=(1, 1), self_query=True, 
                                 softmax_scale=5.0, num_heads=32)
        # 输出设置
        self.gap = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten_dim = 2048
        self.feat = nn.Linear(self.flatten_dim, num_features) if num_features > 0 else None
    
    def extract_feat(self, x):
        # 创建一个空列表，用于保存各层的输出特征
        features = []
        
        # 提取每个阶段的层
        conv1 = self.model.conv1  # 初始卷积层
        bn1 = self.model.bn1
        act1 = self.model.act1
        maxpool = self.model.maxpool
        layer1 = self.model.layer1  # 第一阶段（残差块1）
        layer2 = self.model.layer2  # 第二阶段（残差块2）
        layer3 = self.model.layer3  # 第三阶段（残差块3）
        layer4 = self.model.layer4  # 第四阶段（残差块4）
        
        x = conv1(x)
        x = bn1(x)
        x = act1(x)
        x = maxpool(x)
        stage1_out = layer1(x)  # 第一阶段的输出
        # features.append(stage1_out)
        stage2_out = layer2(stage1_out)  # 第二阶段的输出
        # features.append(stage2_out)
        stage3_out = layer3(stage2_out)  # 第三阶段的输出
        student_feature_proj3 = self.projector_3(stage3_out)
        features.append(student_feature_proj3)
        stage4_out = layer4(stage3_out)  # 第四阶段的输出
        student_feature_proj4 = self.projector_4(stage4_out)
        features.append(student_feature_proj4)
        return tuple(features)
        
    def extract_feat_proj(self, x):
        features = []
        student_features = self.extract_feat(x)
        # student_feature_proj1 = self.projector_1(student_features[0])
        # features.append(student_feature_proj1)
        # student_feature_proj2 = self.projector_2(student_features[1])
        # features.append(student_feature_proj2)
        student_feature_proj3 = self.projector_3(student_features[2])
        features.append(student_feature_proj3)
        student_feature_proj4 = self.projector_4(student_features[3])
        features.append(student_feature_proj4)
        return tuple(features)
        
    def _modify_layer(self, layer):
        """
        在每个残差块中的 ReLU 前加上 InstanceNorm2d 操作。
        """
        for block in layer:
            # 修改 conv1 和 conv2 之后的 ReLU，将 InstanceNorm2d 放在 ReLU 前面
            # 对于每个残差块，将 InstanceNorm2d 加入到 ReLU 之前
            block.act3 = nn.Sequential(
                nn.InstanceNorm2d(block.conv3.out_channels, affine=True),
                nn.ReLU(inplace=True)
            )
            
    def _modify_layer_stride(self, last_layer, last_layer_downsample):
        # 在最后一层将stride改为1
        last_layer.stride = (1, 1)
        last_layer_downsample.stride = (1, 1)
        
    def forward_features(self, x):
        x = self.model.forward_features(x)
        return x

    def forward(self, x):
        x = self.model.forward_features(x)
        # 池化操作，[batch_size, 2048, 7, 7] -> [batch_size, 2048, 1, 1]
        x = self.gap(x)
        # 展平特征图，将其变为 [batch_size, 2048 * 1 * 1]
        x = x.view(x.size(0), -1)  # 展平
        if not self.feat is None:
            x = self.feat(x)
        return x
'''
class Data_Processor(object):
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.transformer = Transforms.Compose([
            Transforms.Resize((self.height, self.width)),
            Transforms.ToTensor(),
            Transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __call__(self, img):
        return self.transformer(img).unsqueeze(0)

data_processor = Data_Processor(height=224, width=224)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# swin_model = SwinTransformerTeacher(num_features=512).cuda()
# swin_model.eval()

# resnet_model = ResNetStudent(num_features=512).cuda()
# resnet_model.eval()

In [4]:
swin_model = SwinTransformerTeacher(num_features=512).cuda()
swin_model.eval()

resnet_model = ResNetStudent(num_features=512).cuda()
resnet_model.eval()

# swin_transformer
weight_path = '/homec/xiaolei/projects/ISR/weights/swin_base_patch4_window7_224.pth'
weight = torch.load(weight_path)
swin_model.load_state_dict(weight['state_dict'], strict=True)

# 残差网络
weight_path = 'weights/student_model_base5_strong_reid_mmd_mse_loss/best_student_model.pth'
weight = torch.load(weight_path)
resnet_model.load_state_dict(weight, strict=True)

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


<All keys matched successfully>

In [5]:
def get_model_size(model):
    param_size = 0
    for param in model.parameters():
        param_size += param.nelement() * param.element_size()  # 参数数量 * 每个参数的字节数
    buffer_size = 0
    for buffer in model.buffers():
        buffer_size += buffer.nelement() * buffer.element_size()
    size_all_mb = (param_size + buffer_size) / 1024**2  # 转换为MB
    return size_all_mb

# 比较两个模型的大小
print(f"swin_model size (MB): {get_model_size(swin_model)}")
print(f"resnet_model size (MB): {get_model_size(resnet_model)}")

swin_model size (MB): 338.31324768066406
resnet_model size (MB): 101.71881866455078


In [6]:
from ptflops import get_model_complexity_info

# 比较两个模型的 FLOPs
with torch.cuda.device(0):  # 如果有GPU可以指定使用
    flops_a, params_a = get_model_complexity_info(swin_model, (3, 224, 224), as_strings=True, print_per_layer_stat=False)
    flops_b, params_b = get_model_complexity_info(resnet_model, (3, 224, 224), as_strings=True, print_per_layer_stat=False)

print(f"swin_model - FLOPs: {flops_a}, Params: {params_a}")
print(f"resnet_model - FLOPs: {flops_b}, Params: {params_b}")

swin_model - FLOPs: 15.19 GMac, Params: 88.29 M
resnet_model - FLOPs: 6.27 GMac, Params: 26.61 M


In [7]:
import torch
import torch.cuda

def get_memory_usage(model, input_size=(1, 3, 224, 224)):
    # 清理缓存，确保数据准确
    torch.cuda.empty_cache()
    input_data = torch.randn(input_size).cuda()  # 假设输入大小为 224x224 的图像
    model = model.cuda()

    with torch.no_grad():
        torch.cuda.reset_peak_memory_stats()
        _ = model(input_data)
        max_memory = torch.cuda.max_memory_allocated() / 1024**2  # 转换为 MB

    return max_memory

print(f"swin_model Memory Usage (MB): {get_memory_usage(swin_model)}")
print(f"resnet_model Memory Usage (MB): {get_memory_usage(resnet_model)}")

print(f"swin_model Memory Usage (MB) (256 pics): {get_memory_usage(swin_model, (256, 3, 224, 224))}")
print(f"resnet_model Memory Usage (MB) (256 pics): {get_memory_usage(resnet_model, (256, 3, 224, 224))}")

swin_model Memory Usage (MB): 571.97314453125
resnet_model Memory Usage (MB): 577.12939453125
swin_model Memory Usage (MB) (256 pics): 6575.02392578125
resnet_model Memory Usage (MB) (256 pics): 3244.02392578125


In [8]:
import time

def get_inference_time(model, input_size=(1, 3, 224, 224), iterations=100):
    input_data = torch.randn(input_size).cuda()
    model = model.cuda()
    
    # 热身运行，避免初始加载影响结果
    with torch.no_grad():
        _ = model(input_data)

    start_time = time.time()
    with torch.no_grad():
        for _ in range(iterations):
            _ = model(input_data)
    avg_inference_time = (time.time() - start_time) / iterations
    return avg_inference_time

print(f"swin_model Inference Time (seconds): {get_inference_time(swin_model)}")
print(f"resnet_model Inference Time (seconds): {get_inference_time(resnet_model)}")

print(f"swin_model Inference Time (seconds) (256 pics): {get_inference_time(swin_model, input_size=(256, 3, 224, 224))}")
print(f"resnet_model Inference Time (seconds) (256 pics): {get_inference_time(resnet_model, input_size=(256, 3, 224, 224))}")

swin_model Inference Time (seconds): 0.017622420787811278
resnet_model Inference Time (seconds): 0.006609113216400147
swin_model Inference Time (seconds) (256 pics): 1.0379394555091859
resnet_model Inference Time (seconds) (256 pics): 0.36501954078674315
