# Import Packages

In [1]:
import torchvision.transforms as transforms
import numpy as np
import random
from torch.utils.data import DataLoader,Dataset
import os
from PIL import Image
import torch
from transformers import CLIPModel, CLIPProcessor
import torch.nn as nn
import math
# 用于显示进度条
from tqdm import tqdm
# 绘制评估曲线
from torch.utils.tensorboard import SummaryWriter


# Configuration

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
config = {
    'seed': 6666,
    'dataset_dir': "C:\\dats_tec\\archive",
    'n_epochs': 10,
    'batch_size': 64,
    'learning_rate': 3e-4,
    'weight_decay': 1e-5,
    'early_stop': 3,
    'clip_flag': True,
    'save_path': './models/model.ckpt',
    'resnet_save_path': './models/resnet_model.ckpt',
    'num_workers': 12
}


# Some Utility Function

In [3]:
# 设置全局的随机种子
def all_seed(seed=6666):
    """
    设置随机种子
    """
    np.random.seed(seed)
    random.seed(seed)
    # CPU
    torch.manual_seed(seed)
    # GPU
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.cuda.manual_seed(seed)
        # python 全局
    os.environ['PYTHONHASHSEED'] = str(seed)
    # cudnn
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.enabled = False
    print(f'Set env random_seed = {seed}')


def save_checkpoint(model, optimizer, filename="my_checkpoint.pth.tar"):
    print("=> Saving checkpoint")
    checkpoint = {
        "state_dict": model.state_dict(),
        "optimizer": optimizer.state_dict(),
    }
    torch.save(checkpoint, filename)


def load_checkpoint(checkpoint_file, model, optimizer, lr):
    print("=> Loading checkpoint")
    checkpoint = torch.load(checkpoint_file, map_location=device)
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

    # If we don't do this then it will just have learning rate of old checkpoint
    # and it will lead to many hours of debugging \:
    for param_group in optimizer.param_groups:
        param_group["lr"] = lr


# Model-Clip

In [4]:
class CLIPClassifier(nn.Module):
    def __init__(self, clip_model, num_classes=100):
        super().__init__()
        self.clip = clip_model
        self.classifier = nn.Linear(self.clip.config.hidden_size, num_classes)

    def forward(self, pixel_values, input_ids):
        # clip有两个部分,一个NLP部分一个是Vision部分
        vision_outputs = self.clip.vision_model(pixel_values=pixel_values)
        text_outputs = self.clip.text_model(input_ids=input_ids)

        # 这是Vision_modelt与text_model的结果在经过最终的池化过程后得到的固定长度的向量,一般形式为(batch_size,hidden_layers)
        image_features = vision_outputs.pooler_output
        text_features = text_outputs.pooler_output

        # 文本特征融合
        combined_features = torch.cat((image_features, text_features), dim=-1)
        
        # 进行了一个分类操作
        logits = self.classifier(combined_features)
        return logits


# Transformer

In [13]:
# 一般情况下，我们不会在验证集和测试集上做数据扩增
# 我们只需要将图片裁剪成同样的大小并装换成Tensor就行
test_tfm = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# 当然，我们也可以再测试集中对数据进行扩增（对同样本的不同装换）
#  - 用训练数据的装化方法（train_tfm）去对测试集数据进行转化，产出扩增样本
#  - 对同个照片的不同样本分别进行预测
#  - 最后可以用soft vote / hard vote 等集成方法输出最后的预测
train_tfm = transforms.Compose([
    # 图片裁剪 (height = width = 224)
    transforms.Resize((224, 224)),
    # TODO:在这部分还可以增加一些图片处理的操作
    transforms.AutoAugment(transforms.AutoAugmentPolicy.IMAGENET),
    # ToTensor() 放在所有处理的最后
    transforms.ToTensor(),
])

def text_transformer(text):
    # 使用 CLIP 的处理器将文本转换为模型需要的格式
    # 这会自动处理标记化、数字化和张量化
    # return_tensors="pt" 告诉处理器返回 PyTorch 张量
    processed = processor(text=text, return_tensors="pt")
    # 通常只需要 input_ids，这是输入到模型的实际文本张量
    return processed.input_ids.squeeze()  # 移除不必要的批处理维度


def labels_to_text(label):
    return 'A photo of ' + label

# Dataset

In [14]:
class sportsDataset(Dataset):
    def __init__(self, path, transformer=test_tfm, text_transformer=text_transformer, label_to_text=labels_to_text):
        self.transformer = transformer
        self.text_transformer = text_transformer
        self.label_to_text = label_to_text
        self.labels = []
        self.images = []
        for dirpath, dirnames, filenames in os.walk(path):
            ''' 
            dirpath 是当前正在遍历的文件夹的路径
            dirnames 是当前文件夹中所有子文件夹的名字列表
            filenames 是当前文件夹中所有文件的名字列表
            '''
            for dirname in dirnames:
                dir = os.path.join(dirpath, dirname)
                for file in os.listdir(dir):
                    self.labels.append(dirname)
                    self.images.append(os.path.join(dir, file))

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert('RGB')
        image = self.transformer(image)
        text = self.text_transformer(labels_to_text(self.labels[idx]))
        return text, image


# Train

In [24]:
def trainer(train_loader, valid_loader, model, config, device, rest_net_flag=False):
    # 对于分类任务, 我们常用cross-entropy评估模型表现.
    criterion = nn.CrossEntropyLoss()
    # 初始化优化器
    optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'], weight_decay=config['weight_decay'])
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.5) 
    # 模型存储位置
    save_path = config['save_path'] if rest_net_flag else config['resnet_save_path']

    writer = SummaryWriter()
    if not os.path.isdir('./models'):
        os.mkdir('./models')

    n_epochs, best_loss, step, early_stop_count = config['n_epochs'], math.inf, 0, 0
    for epoch in range(n_epochs):
        model.train()
        loss_record = []
        train_accs = []
        train_pbar = tqdm(train_loader, position=0, leave=True)

        for x, y in train_pbar:
            scheduler.zero_grad()
            x, y = x.to(device), y.to(device)
            pred = model(x)
            loss = criterion(pred, y)
            loss.backward()

            scheduler.step()
            step += 1
            acc = (pred.argmax(dim=-1) == y.to(device)).float().mean()
            l_ = loss.detach().item()
            loss_record.append(l_)
            train_accs.append(acc.detach().item())
            train_pbar.set_description(f'Epoch [{epoch + 1}/{n_epochs}]')
            train_pbar.set_postfix({'loss': f'{l_:.5f}', 'acc': f'{acc:.5f}'})

        mean_train_acc = sum(train_accs) / len(train_accs)
        mean_train_loss = sum(loss_record) / len(loss_record)
        writer.add_scalar('Loss/train', mean_train_loss, step)
        writer.add_scalar('ACC/train', mean_train_acc, step)
        model.eval()  # 设置模型为评估模式
        loss_record = []
        test_accs = []
        for x, y in valid_loader:
            x, y = x.to(device), y.to(device)
            with torch.no_grad():
                pred = model(x)
                loss = criterion(pred, y)
                acc = (pred.argmax(dim=-1) == y.to(device)).float().mean()

            loss_record.append(loss.item())
            test_accs.append(acc.detach().item())

        mean_valid_acc = sum(test_accs) / len(test_accs)
        mean_valid_loss = sum(loss_record) / len(loss_record)
        print(
            f'Epoch [{epoch + 1}/{n_epochs}]: Train loss: {mean_train_loss:.4f},acc: {mean_train_acc:.4f} Valid loss: {mean_valid_loss:.4f},acc: {mean_valid_acc:.4f} ')
        writer.add_scalar('Loss/valid', mean_valid_loss, step)
        writer.add_scalar('ACC/valid', mean_valid_acc, step)
        if mean_valid_loss < best_loss:
            best_loss = mean_valid_loss
            torch.save(model.state_dict(), save_path)  # 保存最优模型
            print('Saving model with loss {:.3f}...'.format(best_loss))
            early_stop_count = 0
        else:
            early_stop_count += 1

        if early_stop_count >= config['early_stop']:
            print('\nModel is not improving, so we halt the training session.')
            return


def main():
    # 设计随机种子
    all_seed(config['seed'])
    processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
    for param in model.parameters():
        param.requires_grad = False

    # 创建数据集
    train_dataset = sportsDataset(os.path.join(config['dataset_dir'], 'train'),
                                  transformer=train_tfm)
    valid_dataset = sportsDataset(os.path.join(config['dataset_dir'], 'valid'),
                                  transformer=test_tfm)
    # 装载数据
    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True,
                              num_workers=config['num_workers'], pin_memory=True)
    valid_loader = DataLoader(valid_dataset, batch_size=config['batch_size'], shuffle=True,
                              num_workers=config['num_workers'], pin_memory=True)

    my_model = CLIPClassifier(model).to(device)

    trainer(train_loader, valid_loader, my_model, config, device)

# 运行

In [None]:
if __name__ == '__main__':
    main()

Set env random_seed = 6666


  0%|          | 0/211 [00:00<?, ?it/s]