In [1]:
from typing import Callable
# 忽略"reportGeneralTypeIssues"检查，`override`是有效的导入符号。
from typing_extensions import override  # type: ignore
import io
import os
import pickle
import random
import zipfile

from torch import nn
from torch.nn import functional as F
from torch.utils import data
from torchvision.transforms import v2
from tqdm import notebook
import numpy as np
import torch

from google.colab import auth, userdata
from googleapiclient import discovery, http

# 批准此笔记本访问用户 Google API，用户须已将 DEAP 数据集的存档文件托管于
# Google 云端硬盘并知晓其文件标识符。
auth.authenticate_user()

In [2]:
class DEAPdataset(data.Dataset):
    """[**使用脑电图、生理和视频信号进行情绪分析的数据集**](https://www.eecs.qmul.ac.uk/mmv/datasets/deap/readme.html)，简称"DEAP 数据集"。

    研究者在 32 名受试者分别观看 40 段一分钟长的音乐视频片段时，记录了他们的脑
    电图和周围生理信号，详情按[_DEAP：使用生理信号进行情绪分析的数据库（译）_](https://www.eecs.qmul.ac.uk/mmv/datasets/deap/doc/tac_special_issue_2011.pdf)。

    DEAP 数据集在`data_preprocessed_python.zip`存档文件中发布了经以下预处理的脑
    电图数据：

        1. 将原始脑电图信号降采样至 128 赫兹。
        2. 滤除眼电图扰动。
        3. 实施 4 至 45 赫兹带通滤波并根据公共参考取均值。
        4. 逐 60 秒划分试次，删除 3 秒试次前基线。
        5. 依照日内瓦顺序将数据通道重新排列。

    我们的数据下载机能假设用户的 Google 云端硬盘中托管有此存档文件，并为其标识符
    建立名称为`DEAP_ARCHIVE_ID`的 Colab Serect。
    """
    # 每位受试者参与脑电图采集的试次数。
    TRIALS_PER_SUBJECT = 40
    # 参与脑电图采集的受试者总数。
    TOTAL_SUBJECT_NUM = 32

    # Google 云端硬盘为存档的数据集文件`data_preprocessed_python.zip`分配的唯一
    # 文件标识符。
    _COLAB_SECRET_NAME = 'DEAP_ARCHIVE_ID'
    # 每试次数据仅前 32 通道直接来自于各脑电极。
    _USING_DATA_SLICE = np.s_[:, 0:32, :]

    def __init__(
        self,
        root: str | os.PathLike[str] = 'data',
        split: str = 'train',
        transforms: Callable[[torch.Tensor], torch.Tensor] | None = None,
        download: bool = False,
    ) -> None:
        """尝试发现或下载存档文件`data_preprocessed_python.zip`，并从中加载指定
        的 DEAP 数据集划分，每样本包含一个试次。

        参数：
            `root`：存放所属项目数据集的根目录。

            `split`：期望加载的数据划分，可能的划分包括"train"、"val" 和"test"。
                数据划分以受试者为单位，依照 train/val/test=2:1:1 的比例实施。

            `transforms`：向每笔样本（试次）的脑电图数据附加的用户预处理。

            `download`：随实例化将存档于 Google 云端硬盘的数据集文件下载到环境并
                解压缩。
        """
        super().__init__()
        self.root = os.path.expanduser(os.fspath(root))
        self.split = split
        self.transforms = transforms
        self._base_folder = os.path.join(self.root, 'DEAPdataset')
        self._data_folder = os.path.join(
            self._base_folder, 'data_preprocessed_python')
        self._data_archive = self._data_folder + '.zip'

        if download:
            self.download()

        if not self._check_exists():
            raise RuntimeError(
                '找不到数据集，你可以指定参数 `download=True` 下载它。')

        self.data: list[torch.Tensor] = []
        self.label: list[int] = []

        data_files: list[str] = []

        for sub_id in range(1, 1 + self.TOTAL_SUBJECT_NUM):
            data_file = os.path.join(self._data_folder, f"s{sub_id:02d}.dat")
            data_files.append(data_file)
        random.shuffle(data_files)

        match self.split:
            case 'train':
                data_files = data_files[:24]
            case 'val':
                data_files = data_files[24:28]
            case 'test':
                data_files = data_files[28:]
            case _:
                raise ValueError(
                    f"参数 `split` 取得非法值 \"{split}\"，"
                    "有效值为 \"train\"、\"val\" 和 \"test\"。",
                )
        self._load_data(data_files)

    def _load_data(self, data_files: list[str]) -> None:
        for data_file in data_files:
            self.label.append(int(os.path.basename(data_file)[1:3]))
            with open(data_file, 'rb') as dat:
                raw_data = pickle.load(dat, encoding='latin1')['data']
                data = torch.from_numpy(
                    raw_data[self._USING_DATA_SLICE].copy())
                self.data.append(data.to(torch.float))

    def __len__(self) -> int:
        return len(self.data) * self.TRIALS_PER_SUBJECT

    def _check_exists(self) -> bool:
        return os.path.isdir(self._data_folder)

    def download(self) -> None:
        """从 Google 云端硬盘下载`data_preprocessed_python.zip`文件并解压缩"""

        if self._check_exists():
            return

        os.makedirs(self._base_folder, exist_ok=True)
        with (
            discovery.build('drive', 'v3') as drive_service,
            io.FileIO(self._data_archive, mode='wb') as downloaded,
        ):
            request = drive_service.files().get_media(
                fileId=userdata.get(self._COLAB_SECRET_NAME))
            downloader = http.MediaIoBaseDownload(downloaded, request)
            self._download_media(downloader)

        with zipfile.ZipFile(self._data_archive) as zf:
            zf.extractall(self._base_folder)

    def _download_media(self, downloader: http.MediaIoBaseDownload) -> None:
        KILOBYTE_IN_BYTES = 1024
        with notebook.tqdm(
            desc='数据下载中', unit='千字节', dynamic_ncols=True) as pbar:
            done = False
            last_progress = 0
            while not done:
                status, done = downloader.next_chunk()
                if last_progress == 0 and status.total_size is not None:
                    pbar.reset(total=status.total_size // KILOBYTE_IN_BYTES)
                chunk_size = status.resumable_progress - last_progress
                pbar.update(chunk_size // KILOBYTE_IN_BYTES)
                last_progress = status.resumable_progress

    @override
    def __getitem__(self, index: int) -> tuple[torch.Tensor, int]:
        subject_idx, trial_idx = divmod(index, self.TRIALS_PER_SUBJECT)
        data = torch.select(self.data[subject_idx], dim=0, index=trial_idx)

        if self.transforms is not None:
            data = self.transforms(data)

        return data, self.label[subject_idx]

In [None]:
class SENet(nn.Module):
    """挤压-激励网络（Squeeze-and-Excitation Network, SENet）的复现。

    挤压-激励网络的目标是通过明确建模卷积特征通道之间的相互依赖关系来提高网络的
    表示能力。其机制受注意力/自门控启发，使网络能够执行特征重校准：学习使用全局
    信息选择性地强调有信息量的特征并抑制不太有用的特征。此模块由[挤压激励网络（译）](https://doi.org/10.1109/CVPR.2018.00745)
    一文首次提出，所提供的[开源代码](https://github.com/hujie-frank/SENet)基于 CUDA/C++ 编程实现。

    挤压-激励网络由以下步骤顺次构成：

        1. 挤压：对输入特征图沿通道实施全局池化，将空间维度信息压缩为通道描述
        符，捕获全局上下文。
        2. 激励：根据挤压步骤得出的描述符，学习针对通道级别的门控权重。这是通过一个具比例收缩隐层的两层感知机实现的，其中隐层由线性整流函数激活，而门控
        权重由逻辑斯谛函数计算感知机输出得到。
        3. 缩放：将门控权重作用于原始输入特征图，使网络能够"关注"重要通道，"忽略
        "不相关通道。

    一般认为，该网络的创新点包括计算负载低、易于集成到现有架构和基于输入的通道权
    重动态适应。
    """
    def __init__(self, num_channels: int, reduction_ratio: int = 16) -> None:
        """
        """
        super().__init__()
        self.num_channels = num_channels
        self.reduction_ratio = reduction_ratio

        self.sq = nn.AdaptiveAvgPool2d(1)
        # 忽略`reportGeneralTypeIssues`检查，由`nn.Module`构成的列表是合法的
        # `nn.Sequential`实例初始化参数。
        self.ex = nn.Sequential(
            nn.Linear(
                num_channels, num_channels // reduction_ratio, bias=False), #type: ignore
            nn.ReLU(),
            nn.Linear(
                num_channels // reduction_ratio, num_channels, bias=False),
            nn.Sigmoid(),
        )

    @override
    def forward(self, u: torch.Tensor) -> torch.Tensor:
        self._validate_channel(u)
        return u @ torch.diag(self.ex(self.sq(u)))

    def _validate_channel(self, u: torch.Tensor) -> None:
        in_channels = u.shape[0] if len(u.shape) == 2 else u.shape[1]
        if in_channels != self.num_channels:
            raise ValueError(f"声明的特征通道数（{self.num_channels}）"
                f"与实际张量的通道数（{in_channels}）不符。")

In [None]:
# TODO: 完成骨干网络。
class Block(nn.Module):
    def __init__(self, in_channels: int) -> None:
        super().__init__()
        self.conv = nn.Conv1d(in_channels, )

    @override
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return x