In [7]:
!pip install timm
!pip install albumentaions
!pip install loguru

[31mERROR: Could not find a version that satisfies the requirement albumentaions (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for albumentaions[0m[31m
[0mCollecting loguru
  Obtaining dependency information for loguru from https://files.pythonhosted.org/packages/03/0a/4f6fed21aa246c6b49b561ca55facacc2a44b87d65b8b92362a8e99ba202/loguru-0.7.2-py3-none-any.whl.metadata
  Downloading loguru-0.7.2-py3-none-any.whl.metadata (23 kB)
Downloading loguru-0.7.2-py3-none-any.whl (62 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.5/62.5 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: loguru
Successfully installed loguru-0.7.2


In [8]:
import cv2 as cv
import matplotlib.pyplot as plt
import os
import torch
import torchvision
import albumentations

from albumentations.pytorch import ToTensorV2
from torch import nn
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from loguru import logger

# 第一步：定义数据集处理函数

In [None]:
class VideoDataset(torch.utils.data.Dataset):
    """
    用于加载视频及其类标签的自定义数据集
    :param data_dir: 数据集的路径
    :param num_class: 数据集中视频的类别数
    :param num_frame: 每个视频采样的帧数
    :param transform: 数据预处理的转换函数
    """

    def __init__(self, data_dir, num_class=10, num_frame=20, transform=None):
        super().__init__()

        self.data_dir = data_dir
        self.num_classes = num_class
        self.num_frames = num_frame
        self.transform = transform

        self.video_filename_list = []
        self.classesIdx_list = []

        self.class_dict = {class_label: idx for idx, class_label in enumerate(
            sorted(os.listdir(self.data_dir)))}

        for class_label, class_idx in self.class_dict.items():
            class_dir = os.path.join(self.data_dir, class_label)
            for video_filename in sorted(os.listdir(class_dir)):
                self.video_filename_list.append(
                    os.path.join(class_label, video_filename))
                self.classesIdx_list.append(class_idx)

    # 返回数据集中视频的数量
    def __len__(self):
        return len(self.video_filename_list)

    # 读取视频文件，并进行帧的采样和数据预处理。返回采样后的帧序列。
    def read_video(self, video_path):
        frames = []
        cap = cv.VideoCapture(video_path)
        count_frames = 0
        while True:
            ret, frame = cap.read()
            if ret:
                if self.transform:
                    transformed = self.transform(image=frame)
                    frame = transformed['image']

                frames.append(frame)
                count_frames += 1
            else:
                break

        stride = count_frames // self.num_frames
        new_frames = []
        count = 0
        for i in range(0, count_frames, stride):
            if count >= self.num_frames:
                break
            new_frames.append(frames[i])
            count += 1

        cap.release()

        return torch.stack(new_frames, dim=0)

    # 返回数据集中索引idx对应的视频及其类别标签
    def __getitem__(self, idx):
        classIdx = self.classesIdx_list[idx]
        video_filename = self.video_filename_list[idx]
        video_path = os.path.join(self.data_dir, video_filename)
        frames = self.read_video(video_path)
        return frames, classIdx

# 第二步：定义模型

## LSTM模型

In [None]:
class Lstm(nn.Module):
    """
    定义LSTM模型
    :param latent_dim: LSTM的输入维度
    :param hidden_size: LSTM的隐藏层维度
    :param lstm_layers: LSTM的层数
    :param bidirectional: LSTM是否为双向
    """

    def __init__(self, latent_dim, hidden_size, lstm_layers, bidirectional):
        super(Lstm, self).__init__()
        self.Lstm = nn.LSTM(latent_dim, hidden_size, num_layers=lstm_layers, batch_first=True,
                            bidirectional=bidirectional)
        self.hidden_state = None

    # 重置LSTM的隐藏层状态
    def reset_hidden_state(self):
        self.hidden_state = None

    def forward(self, x):
        output, self.hidden_state = self.Lstm(x, self.hidden_state)
        return output

## 预训练CNN

In [None]:
class PretrainedConv(nn.Module):
    """
    使用预训练的ResNet152模型作为卷积层
    :param latent_dim: 输出的特征维度
    """

    def __init__(self, latent_dim):
        super(PretrainedConv, self).__init__()
        # 使用预训练的ResNet152模型
        self.conv_model = torchvision.models.resnet152(pretrained=True)
        # ====== 固定卷积层的参数 ======
        for param in self.conv_model.parameters():
            param.requires_grad = False
        # ====== 修改最后一层全连接层 ======
        # latent_dim为输出的特征维度，也是LSTM的输入维度
        self.conv_model.fc = nn.Linear(self.conv_model.fc.in_features, latent_dim)

    def forward(self, x):
        return self.conv_model(x)

## 使用预训练的CNN和LSTM构建模型

In [None]:
class PretrainedConvLstm(nn.Module):
    """
    使用预训练的CNN和LSTM构建模型
    :param latent_dim: LSTM的输入维度
    :param hidden_size: LSTM的隐藏层维度
    :param lstm_layers: LSTM的层数
    :param bidirectional: LSTM是否为双向
    :param n_class: 分类的类别数
    """

    def __init__(self, latent_dim, hidden_size, lstm_layers, bidirectional, n_class):
        super(PretrainedConvLstm, self).__init__()
        self.conv_model = PretrainedConv(latent_dim)
        self.Lstm = Lstm(latent_dim, hidden_size, lstm_layers, bidirectional)
        self.output_layer = nn.Sequential(
            nn.Linear(2 * hidden_size if bidirectional == True else hidden_size, n_class),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        batch_size, time_steps, channel_x, height, width = x.shape
        conv_input = x.view(batch_size * time_steps, channel_x, height, width)
        conv_output = self.conv_model(conv_input)
        lstm_input = conv_output.view(batch_size, time_steps, -1)
        lstm_output = self.Lstm(lstm_input)
        lstm_output = lstm_output[:, -1, :]
        output = self.output_layer(lstm_output)
        return output

## 自定义的普通CNN

In [None]:
class Conv(nn.Module):
    """
    自定义的普通CNN模型
    :param latent_dim: 输出的特征维度
    """

    def __init__(self, latent_dim):
        super(Conv, self).__init__()
        self.conv_model = nn.Sequential(
            # 输入维度：(batch_size, 3, 128, 128)
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=6, stride=2, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 输出维度：(batch_size, 64, 32, 32)

            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 输出维度：(batch_size, 64, 16, 16)

            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=4, stride=2, padding=2), 
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 输出维度：(batch_size, 64, 8, 8)
            
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 输出维度：(batch_size, 64, 4, 4)
        )
        self.fc = nn.Linear(64 * 4 * 4, latent_dim)

    def forward(self, x):
        batch_size, time_steps, channel_x, height, width = x.shape
        x = x.view(batch_size * time_steps, channel_x, height, width)
        x = self.conv_model(x)
        x = x.view(batch_size * time_steps, -1)
        x = self.fc(x)
        x = x.view(batch_size, time_steps, -1)
        return x

## 自定义的CNN和LSTM构建模型

In [None]:
class ConvLstm(nn.Module):
    """
    使用自定义的CNN和LSTM构建模型
    :param latent_dim: LSTM的输入维度
    :param hidden_size: LSTM的隐藏层维度
    :param lstm_layers: LSTM的层数
    :param bidirectional: LSTM是否为双向
    :param n_class: 分类的类别数
    """

    def __init__(self, latent_dim, hidden_size, lstm_layers, bidirectional, n_class):
        super(ConvLstm, self).__init__()
        self.conv_model = Conv(latent_dim)
        self.Lstm = Lstm(latent_dim, hidden_size, lstm_layers, bidirectional)
        self.output_layer = nn.Sequential(
            nn.Linear(2 * hidden_size if bidirectional == True else hidden_size, n_class),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        batch_size, time_steps, channel_x, height, width = x.shape
        conv_input = x.view(batch_size, time_steps, channel_x, height, width)
        conv_output = self.conv_model(conv_input)
        lstm_input = conv_output.view(batch_size, time_steps, -1)
        lstm_output = self.Lstm(lstm_input)
        lstm_output = lstm_output[:, -1, :]
        output = self.output_layer(lstm_output)
        return output

## 自定义的ResNet

In [None]:
# 定义基本的残差块
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(residual)
        out = self.relu(out)
        return out

# 定义ResNet模型
class ResNet(nn.Module):
    def __init__(self, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self.make_layer(64, 3)
        self.layer2 = self.make_layer(128, 4, stride=2)
        self.layer3 = self.make_layer(256, 6, stride=2)
        self.layer4 = self.make_layer(512, 3, stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def make_layer(self, out_channels, num_blocks, stride=1):
        layers = []
        layers.append(ResidualBlock(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, num_blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

## 使用自定义的ResNet和LSTM构建模型

In [None]:
class ResNetLstm(nn.Module):
    """
    使用自定义的ResNet和LSTM构建模型
    :param latent_dim: LSTM的输入维度
    :param hidden_size: LSTM的隐藏层维度
    :param lstm_layers: LSTM的层数
    :param bidirectional: LSTM是否为双向
    :param n_class: 分类的类别数
    """

    def __init__(self, latent_dim, hidden_size, lstm_layers, bidirectional, n_class):
        super(ResNetLstm, self).__init__()
        self.conv_model = ResNet(num_classes=latent_dim)
        self.Lstm = Lstm(latent_dim, hidden_size, lstm_layers, bidirectional)
        self.output_layer = nn.Sequential(
            nn.Linear(2 * hidden_size if bidirectional == True else hidden_size, n_class),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        batch_size, time_steps, channel_x, height, width = x.shape
        conv_input = x.view(batch_size, time_steps, channel_x, height, width)
        conv_output = self.conv_model(conv_input)
        lstm_input = conv_output.view(batch_size, time_steps, -1)
        lstm_output = self.Lstm(lstm_input)
        lstm_output = lstm_output[:, -1, :]
        output = self.output_layer(lstm_output)
        return output

# 第三步：定义评估函数

## 定义评估函数

In [None]:
def evaluate(model, val_data, loss_fn, device):
    """
    评估模型在验证集上的性能
    :param model: 评估的模型
    :param val_data: 验证数据集的数据加载器
    :param loss_fn: 损失函数
    :param device: 训练设备
    :return: 
    """

    # 将模型移动到设备上（如GPU）
    model = model.to(device)

    with torch.no_grad():
        # 在评估阶段，关闭梯度计算
        model.eval()

        # 初始化变量用于计算准确率和损失
        val_correct = 0
        val_total = len(val_data) * val_data.batch_size
        running_loss = 0.
        # 使用 tqdm 进度条显示进度
        val_data = tqdm(val_data, desc='Evaluate: ', ncols=100)

        # 遍历验证数据集
        for data_batch, label_batch in val_data:
            data_batch, label_batch = data_batch.to(device), label_batch.to(device)

            # 前向传播计算输出
            output_batch = model(data_batch)

            # 计算损失
            loss = loss_fn(output_batch, label_batch.long())
            running_loss += loss.item()

            # 预测标签
            _, predicted_labels = torch.max(output_batch.data, dim=1)

            # 统计正确预测的数量
            val_correct += (label_batch == predicted_labels).sum().item()

        # 计算平均损失和准确率
        val_loss = running_loss / len(val_data)
        val_acc = val_correct / val_total

        # 返回验证集的损失和准确率
        return val_loss, val_acc

## 定义可视化函数

In [None]:
def visualize_history(history):
    """
    可视化训练过程中的损失和准确率
    :param history: 训练过程中的损失和准确率
    :return: 
    """
    plt.figure(figsize=(10, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='train_loss')
    plt.plot(history['val_loss'], label='val_loss')
    plt.legend()
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Loss vs Epochs')

    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='train_acc')
    plt.plot(history['val_acc'], label='val_acc')
    plt.legend()
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Accuracy vs Epochs')
    plt.show()

# 第四步：定义训练函数

In [None]:
def train(model, train_data, loss_fn, optimizer, epochs, device, save_last_weights_path=None,
          save_best_weights_path=None, steps_per_epoch=None,
          validation_data=None, scheduler=None):
    """
    训练模型
    :param model: 要训练的模型。
    :param train_data: 训练数据集的数据加载器。
    :param loss_fn: 损失函数。
    :param optimizer: 优化器。
    :param epochs: 训练的轮数。
    :param device: 训练设备。
    :param save_last_weights_path: 可选参数，保存最后模型权重的路径。
    :param save_best_weights_path: 可选参数，保存最佳模型权重的路径。
    :param steps_per_epoch: 可选参数，每个epoch的步数。
    :param validation_data: 可选参数，用于验证的数据加载器。
    :param scheduler: 可选参数，学习率调度器。
    :return: 
    """

    if save_best_weights_path:
        # 评估当前模型在验证数据集上的损失
        best_loss, _ = evaluate(model, validation_data, loss_fn, device)

    if steps_per_epoch is None:
        # 如果没有指定每个epoch的步数，则将其设置为训练数据集的长度
        steps_per_epoch = len(train_data)

    num_steps = len(train_data)
    iterator = iter(train_data)
    count_steps = 1

    history = {
        'train_loss': [],
        'train_acc': [],
        'val_acc': [],
        'val_loss': []
    }

    # 将模型移动到设备上
    model = model.to(device)

    # 遍历每个epoch
    for epoch in range(1, epochs + 1):

        running_loss = 0.
        train_correct = 0
        train_total = steps_per_epoch * train_data.batch_size

        model.train()

        for step in tqdm(range(steps_per_epoch), desc=f'epoch: {epoch}/{epochs}: ', ncols=100):
            img_batch, label_batch = next(iterator)
            img_batch, label_batch = img_batch.to(device), label_batch.to(device)
            # 将梯度置零
            optimizer.zero_grad()
            # 前向传播计算输出
            output_batch = model(img_batch)
            # 计算损失
            loss = loss_fn(output_batch, label_batch.long())
            # 反向传播计算梯度
            loss.backward(retain_graph=True)
            # 更新参数
            optimizer.step()
            # 预测标签
            _, predicted_labels = torch.max(output_batch.data, dim=1)
            # 统计正确预测的数量
            train_correct += (label_batch == predicted_labels).sum().item()
            # 计算平均损失
            running_loss += loss.item()
            # 打印训练损失和准确率
            if count_steps == num_steps:
                # 循环迭代器，以便继续训练数据集的下一个epoch
                count_steps = 0
                iterator = iter(train_data)
            count_steps += 1

        train_loss = running_loss / steps_per_epoch
        train_accuracy = train_correct / train_total

        if scheduler:
            # 如果提供了学习率调度器，则根据训练损失调整学习率
            scheduler.step(train_loss)

        history['train_loss'].append(float(train_loss))
        history['train_acc'].append(float(train_accuracy))

        # 评估模型在验证数据集上的性能
        val_loss, val_acc = evaluate(model, validation_data, loss_fn, device)
        # 打印训练损失和准确率
        print(
            f'epoch: {epoch}, train_accuracy: {train_accuracy:.2f}, loss: {train_loss:.3f}, val_accuracy: {val_acc:.2f}, val_loss: {val_loss:.3f}')

        if save_best_weights_path:
            if val_loss < best_loss:
                # 如果验证损失更小，则保存模型的权重
                best_loss = val_loss
                torch.save(model.state_dict(), save_best_weights_path)
                print(f'Saved successfully best weights to:', save_best_weights_path)
        history['val_loss'].append(float(val_loss))
        history['val_acc'].append(float(val_acc))

    if save_last_weights_path:
        # 如果提供了保存最后权重的路径，则保存模型的权重
        torch.save(model.state_dict(), save_last_weights_path)
        print(f'Saved successfully last weights to:', save_last_weights_path)

    return model, history

# 第五步：训练模型

## 指定模型参数

In [None]:
# 固定的参数
num_classes = 10
batch_size = 4
num_workers = 4
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 可调整的参数
num_frames = 15  # You can adjust this to balance speed and accuracy
img_size = (128, 128)  # You can adjust this to balance speed and accuracy
latent_dim = 2048
hid_size = 128
num_lstm_layers = 2
learning_rate = 2e-5

## 加载数据集

In [None]:
# 数据预处理的转换流程。
# 使用albumentations库进行图像处理，包括图像大小调整、归一化和转换为张量。
transform = albumentations.Compose(
    [
        albumentations.Resize(height=img_size[0], width=img_size[1]),
        albumentations.Normalize(),
        ToTensorV2()
    ]
)

logger.info('Loading dataset')
# 加载数据集并指定数据集的路径、帧数、类别数和数据预处理的转换函数。
full_dataset = VideoDataset(data_dir="/kaggle/input/ucf-101-dataset-extract-10/data", num_class=num_classes,
                            num_frame=num_frames, transform=transform)
# 将数据集分为训练集和测试集，其中测试集的比例为0.2。
train_dataset, test_dataset = train_test_split(full_dataset, test_size=0.2, random_state=42)
# 使用PyTorch的DataLoader加载数据集。
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
logger.info('Dataset loaded')

for batch_idx, (data, target) in enumerate(train_loader):
    print(data.shape)
    print(target.shape)
    break

## 创建模型

In [None]:
model = PretrainedConvLstm(latent_dim=latent_dim, hidden_size=hid_size, lstm_layers=num_lstm_layers, bidirectional=True,
                           n_class=num_classes)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, mode='min', patience=3, verbose=True)

## 训练模型

In [None]:
model, history = train(model, train_loader, loss_fn, optimizer, epochs=30, device=device,
                       save_last_weights_path='/kaggle/working/last_model.pth',
                       save_best_weights_path='/kaggle/working/last_model.pth', validation_data=test_loader,
                       scheduler=scheduler)

# 第六步：评估模型

In [None]:
visualize_history(history)

test_loss, test_acc = evaluate(model, val_data=test_loader, loss_fn=loss_fn, device=device)
print(f'Loss: {test_loss : .3f}, Acc: {test_acc: .3f}')

test_loss, test_acc = evaluate(model, val_data=train_loader, loss_fn=loss_fn, device=device)
print(f'Loss: {test_loss : .3f}, Acc: {test_acc: .3f}')