In [1]:
import random
import numpy as np
import torch
import os

def set_seed(seed):
    """
    设置所有随机种子以确保结果可复现
    
    Args:
        seed (int): 随机种子数值
    """
    random.seed(seed)  # Python的随机种子
    np.random.seed(seed)  # Numpy的随机种子
    torch.manual_seed(seed)  # PyTorch的CPU随机种子
    torch.cuda.manual_seed(seed)  # PyTorch的GPU随机种子
    torch.cuda.manual_seed_all(seed)  # 如果使用多GPU，为所有GPU设置种子
    
    # 设置cudnn的随机种子
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
    # 设置Python的hash种子
    os.environ['PYTHONHASHSEED'] = str(seed)
seed = 2003  # 可以设置任何整数
set_seed(seed)

In [2]:
import os
import shutil
from tqdm import tqdm

def organize_files(src_root_dir, dst_root_dir):
    cases = ['case1', 'case2', 'case3', 'case4']

    # 为每个 case 创建目录
    for case in cases:
        case_dir = os.path.join(dst_root_dir, case)
        if not os.path.exists(case_dir):
            os.makedirs(case_dir)

    # 遍历源目录
    for main_dir in tqdm(os.listdir(src_root_dir), desc="处理目录"):
        main_dir_path = os.path.join(src_root_dir, main_dir)
        if os.path.isdir(main_dir_path):
            for sub_dir in os.listdir(main_dir_path):
                sub_dir_path = os.path.join(main_dir_path, sub_dir)
                if os.path.isdir(sub_dir_path):
                    case_name = sub_dir.split('_')[0]
                    case_dir = os.path.join(dst_root_dir, case_name)

                    for filename in os.listdir(sub_dir_path):
                        if filename.endswith('.npy'):
                            src_npy_path = os.path.join(sub_dir_path, filename)
                            dst_npy_path = os.path.join(case_dir, f"{main_dir}_{sub_dir}_{filename}")
                            shutil.copy(src_npy_path, dst_npy_path)
                            # print(f"已复制 {src_npy_path} 到 {dst_npy_path}")

src_directory = 'dataset_1k2k3k_nobandpass_extrafeatures'
dst_directory = 'data_1k2k3k_nobandpass_organized_dataset_1'

print(f"源目录: {src_directory}")
print(f"目标目录: {dst_directory}")

if not os.path.exists(src_directory):
    print(f"源目录不存在: {src_directory}")
else:
    organize_files(src_directory, dst_directory)
    print("文件整理完成。")

源目录: dataset_1k2k3k_withbandpass_extrafeatures
目标目录: data_1k2k3k_nobandpass_organized_dataset_1
源目录不存在: dataset_1k2k3k_withbandpass_extrafeatures


In [3]:
import numpy as np
import pandas as pd

from pathlib import Path
from tqdm import tqdm

import torchaudio
from sklearn.model_selection import train_test_split
import os
import sys

In [4]:
import os
from pathlib import Path
import numpy as np
from tqdm import tqdm

data = []

for case in ['case1', 'case2', 'case3', 'case4']:
    case_path = Path(f'data_1k2k3k_nobandpass_organized_dataset_1/{case}')
    for path in tqdm(case_path.glob("*.npy"), desc=f"处理 {case}"):
        name = path.stem
        # 获取文件名的各个部分
        parts = path.name.split('_')
        prefix = parts[0]  # 前缀
        case_id = '_'.join(parts[1:-2])  # case_id（可能包含多个部分）
        sample_set = parts[-2]  # sample_set

        try:
            # 加载 .npy 文件
            npy_path = path.with_suffix('.npy')
            if npy_path.exists():
                energy_features = np.load(npy_path)

            data.append({
                "name": name,
                "path": str(path),
                "case": case,
                "prefix": prefix,
                "case_id": case_id,
                "sample_set": sample_set,
                "energy_features": energy_features[0]
            })
        except Exception as e:
            print(f"处理文件 {path} 时出错: {str(e)}")
            # 跳过损坏的文件
            pass

# 显示收集到的数据条目数
print(f"收集了 {len(data)} 个项目。")

# 数据统计
case_counts = {case: sum(1 for item in data if item['case'] == case) for case in ['case1', 'case2', 'case3', 'case4']}
print("\n数据分布:")
for case, count in case_counts.items():
    print(f"{case}: {count} 个项目")

# 检查是否所有项目都有能量特征
items_with_features = sum(1 for item in data if item['energy_features'] is not None)
print(f"\n具有能量特征的项目: {items_with_features} / {len(data)}")

处理 case1: 0it [00:00, ?it/s]
处理 case2: 0it [00:00, ?it/s]
处理 case3: 0it [00:00, ?it/s]
处理 case4: 0it [00:00, ?it/s]

收集了 0 个项目。

数据分布:
case1: 0 个项目
case2: 0 个项目
case3: 0 个项目
case4: 0 个项目

具有能量特征的项目: 0 / 0





In [5]:
import pandas as pd
df = pd.DataFrame(data)
df.head()

In [6]:
import pandas as pd
df = pd.DataFrame(data)
df.head()
df = df.sample(frac=1).reset_index(drop=True)

Let's display some random sample of the dataset and run it a couple of times to get a feeling for the audio and the emotional label.

In [7]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from datasets import Dataset
from transformers import TrainingArguments, Trainer
import torch
from torch import nn

# 处理 energy_features 列的函数
def process_energy_features(df):
    new_values = []
    for v in df["energy_features"]:
        if isinstance(v, np.ndarray):
            new_values.append(float(v.item()))  # 使用 item() 来获取数组中的单个值
        else:
            new_values.append(float(v))
    df["energy_float"] = new_values
    return df

# 处理数据
df = process_energy_features(df)
df = df.drop(columns=["energy_features"])
df = df.rename(columns={"energy_float": "energy_features"})

# 定义辅助函数
def label_to_id(label, label_list):
    return label_list.index(label)

# 数据集分割
train_df, test_df = train_test_split(df, test_size=0.2, random_state=101, stratify=df["case"])

# 重置索引
train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

# 打印数据集的形状
print("训练数据集形状:", train_df.shape)
print("测试数据集形状:", test_df.shape)

# 创建 Dataset 对象
train_dataset = Dataset.from_pandas(train_df)
eval_dataset = Dataset.from_pandas(test_df)

# 指定输出列
output_column = "case"

# 打印每个 case 的样本数
print("\n训练数据集中每个 case 的样本数:")
print(train_df[output_column].value_counts())
print("\n验证数据集中每个 case 的样本数:")
print(test_df[output_column].value_counts())

# 识别和排序标签列表
label_list = sorted(df["case"].unique())
num_labels = len(label_list)
print(f"\n这是一个有 {num_labels} 个类别的分类问题: {label_list}")

# 预处理函数
def preprocess_function(examples):
    labels = [label_to_id(label, label_list) for label in examples["case"]]
    energy_features = examples["energy_features"]
    return {
        "labels": labels, 
        "energy_features": energy_features
    }

# 应用预处理
train_dataset = train_dataset.map(preprocess_function, batched=True, remove_columns=train_dataset.column_names)
eval_dataset = eval_dataset.map(preprocess_function, batched=True, remove_columns=eval_dataset.column_names)

# 检查能量特征加载情况
print("\n带有能量特征的训练数据集:")
print(train_dataset[:5])
print("\n带有能量特征的验证数据集:")
print(eval_dataset[:5])

# 统计包含能量特征的样本数
train_with_features = sum(1 for item in train_dataset if item['energy_features'] is not None)
eval_with_features = sum(1 for item in eval_dataset if item['energy_features'] is not None)
print(f"\n训练样本中包含能量特征的数量: {train_with_features} / {len(train_dataset)}")
print(f"验证样本中包含能量特征的数量: {eval_with_features} / {len(eval_dataset)}")

# 定义模型
class SimpleClassifier(nn.Module):
    def __init__(self, input_dim, num_labels):
        super().__init__()
        self.linear = nn.Linear(input_dim, num_labels)
    
    def forward(self, x):
        return self.linear(x)

# 数据整理函数
def collate_fn(batch):
    energy_features = torch.tensor([item['energy_features'] for item in batch], dtype=torch.float)
    labels = torch.tensor([item['labels'] for item in batch], dtype=torch.long)
    return {'energy_features': energy_features, 'labels': labels}

# 计算指标
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return {'accuracy': (predictions == labels).mean()}

# 初始化模型
model = SimpleClassifier(input_dim=1, num_labels=num_labels)

# 打印一些数据检查信息
print("\n数据类型检查:")
print("energy_features 的数据类型:", df["energy_features"].dtype)
print("\n前5个样本的 energy_features:")
print(df["energy_features"].head())

  from .autonotebook import tqdm as notebook_tqdm


KeyError: 'energy_features'

In [None]:
df.head(5)

## Model

Before diving into the training part, we need to build our classification model based on the merge strategy.

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple, Dict, List, Union
import torch
import torch.nn as nn
import numpy as np
from transformers import PreTrainedModel, AutoConfig, TrainingArguments, Trainer, EvalPrediction
from transformers.file_utils import ModelOutput

@dataclass
class EnergyFeatureClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

class Wav2Vec2ClassificationHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features):
        x = self.dropout(features)
        x = torch.tanh(self.dense(x))
        x = self.dropout(x)
        return self.out_proj(x)

class EnergyFeatureEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.proj = nn.Sequential(
            nn.Linear(1, 256),  # 1D input for energy features
            nn.ReLU(),
            nn.Linear(256, config.hidden_size)
        )
        self.layer_norm = nn.LayerNorm(config.hidden_size)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, energy_features):
        hidden_states = self.proj(energy_features.unsqueeze(-1))
        hidden_states = self.layer_norm(hidden_states)
        return self.dropout(hidden_states)

class SimpleWav2Vec2Model(nn.Module):
    def __init__(self, config):
        super().__init__()
        encoder_layer = nn.TransformerEncoderLayer(d_model=config.hidden_size, nhead=config.num_attention_heads)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=config.num_hidden_layers)

    def forward(self, hidden_states, attention_mask=None):
        if attention_mask is not None:
            attention_mask = (1.0 - attention_mask.unsqueeze(1).unsqueeze(2)) * -10000.0
        return self.encoder(hidden_states.transpose(0, 1), src_key_padding_mask=attention_mask).transpose(0, 1)

class Wav2Vec2ForEnergyClassification(PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.config = config
        self.num_labels = config.num_labels  # Add this line
        self.energy_feature_encoder = EnergyFeatureEncoder(config)
        self.wav2vec2 = SimpleWav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

    def freeze_feature_extractor(self):
        for param in self.energy_feature_encoder.parameters():
            param.requires_grad = False

    def merged_strategy(self, hidden_states, mode="mean"):
        if mode == "mean":
            return torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            return torch.sum(hidden_states, dim=1)
        elif mode == "max":
            return torch.max(hidden_states, dim=1)[0]
        else:
            raise ValueError("Invalid pooling mode: choose from ['mean', 'sum', 'max']")

    def forward(self, energy_features, attention_mask=None, labels=None, return_dict=True):
        hidden_states = self.energy_feature_encoder(energy_features).unsqueeze(1)  # Add sequence dimension
        hidden_states = self.wav2vec2(hidden_states, attention_mask)
        pooled_output = self.merged_strategy(hidden_states, mode=self.config.pooling_mode)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss() if self.config.problem_type == "single_label_classification" else nn.MSELoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        if not return_dict:
            return (loss, logits) if loss is not None else logits

        return EnergyFeatureClassifierOutput(loss=loss, logits=logits)

@dataclass
class DataCollatorForEnergyFeatures:
    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor, np.ndarray]]]) -> Dict[str, torch.Tensor]:
        labels = [feature["labels"] for feature in features]
        energy_features = [feature["energy_features"] for feature in features]

        batch = {
            "labels": torch.tensor(labels, dtype=torch.long if isinstance(labels[0], int) else torch.float),
            "energy_features": torch.tensor(energy_features, dtype=torch.float),
        }
        return batch

def compute_metrics(p: EvalPrediction):
    preds = np.argmax(p.predictions, axis=1) if not is_regression else np.squeeze(p.predictions)
    accuracy = (preds == p.label_ids).mean()
    return {"accuracy": accuracy} if not is_regression else {"mse": ((preds - p.label_ids) ** 2).mean()}

# Load configuration
model_name_or_path = "facebook/wav2vec2-large-xlsr-53"  # Example pretrained model
config = AutoConfig.from_pretrained(model_name_or_path)

# Update configuration for energy features
config.energy_feature_dim = 1
config.num_labels = 4
config.problem_type = "single_label_classification"
config.pooling_mode = "mean"
config.hidden_dropout_prob = 0.1
config.final_dropout = 0.1
config.hidden_size = 768
config.num_attention_heads = 12
config.num_hidden_layers = 12

# Initialize model, data collator, and trainer
model = Wav2Vec2ForEnergyClassification(config)
data_collator = DataCollatorForEnergyFeatures()
is_regression = config.problem_type == "regression"

training_args = TrainingArguments(
    output_dir="./results_energybased",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=1,
    fp16=True,
    save_steps=10,
    eval_steps=10,
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=2,
)

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,  # Ensure these datasets are defined
    eval_dataset=eval_dataset,
)

# Start training
trainer.train()


In [None]:
trainer.evaluate(eval_dataset=eval_dataset)