In [24]:
import numpy as np
import torch
import torch.nn.functional as F
import os
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader,TensorDataset,random_split
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from sklearn.metrics import precision_score, recall_score, f1_score

In [25]:
def load_npy_data_with_labels(folder_path, label):
    data_list = []  # 用于存储所有读取的数据
    labels_list = []  # 用于存储所有的标签
    for file in os.listdir(folder_path):
        if file.endswith('.npy'):
            file_path = os.path.join(folder_path, file)
            data = np.load(file_path)
            data_list.append(data)
            labels_list.append(label)
    return data_list, labels_list

# 使用你的文件夹路径替换这里
base_folder_path = 'train_data'  # e.g., 'path_to_your_data/train_data'

# 加载 language_0 的数据并分配标签 0
language_0_folder_path = os.path.join(base_folder_path, 'language_0')
language_0_data, language_0_labels = load_npy_data_with_labels(language_0_folder_path, 0)

# 加载 language_1 的数据并分配标签 1
language_1_folder_path = os.path.join(base_folder_path, 'language_1')
language_1_data, language_1_labels = load_npy_data_with_labels(language_1_folder_path, 1)

# 合并两种语言的数据和标签
train_data_raw = language_0_data + language_1_data
train_labels = language_0_labels + language_1_labels
train_data_cropped = [matrix[:40] for matrix in train_data_raw]
def min_max_normalize(tensor):
    min_val = torch.min(tensor)
    max_val = torch.max(tensor)
    normalized_tensor = (tensor - min_val) / (max_val - min_val)
    return normalized_tensor
def standardize(tensor):
    mean = torch.mean(tensor)
    std = torch.std(tensor)
    standardized_tensor = (tensor - mean) / std
    return standardized_tensor
# 打印出一些信息来确认数据已被加载
print(f"Loaded {len(language_0_data)} samples for language 0 and {len(language_1_data)} samples for language 1.")
train_data_cropped[0]

Loaded 2000 samples for language 0 and 2000 samples for language 1.


array([[-11.512925 , -11.512925 , -11.512925 , ..., -11.512925 ,
        -11.512925 , -11.512925 ],
       [-11.512925 , -11.512925 , -11.512925 , ..., -11.512925 ,
        -11.512925 , -11.512925 ],
       [-11.512925 , -11.512925 , -11.512925 , ..., -11.512925 ,
        -11.512925 , -11.512925 ],
       ...,
       [ -5.672583 ,  -6.862608 ,  -6.7738132, ...,  -6.6783767,
         -7.703056 ,  -8.840825 ],
       [ -6.2198935,  -6.979812 ,  -6.4984074, ...,  -6.659245 ,
         -7.500062 ,  -8.744839 ],
       [ -6.584874 ,  -7.218131 ,  -7.068345 , ...,  -6.657686 ,
         -7.625363 ,  -8.85662  ]], dtype=float32)

In [26]:
def calculate_accuracy(y_true, y_pred):
    return np.mean(y_true == y_pred)

In [27]:
# 创建 TensorDataset 和 DataLoader
train_dataset = TensorDataset(min_max_normalize(torch.tensor(train_data_cropped)), torch.tensor(train_labels))
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size

# 随机分割成训练集和验证集
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader=DataLoader(val_dataset,batch_size=64,shuffle=True)
train_dataset[0]

(tensor([[-0.5467, -0.5467, -0.5467,  ..., -0.5467, -0.5467, -0.5467],
         [-0.5467, -0.5467, -0.5467,  ..., -0.5467, -0.5467, -0.5467],
         [-0.5467, -0.5467, -0.5467,  ..., -0.5467, -0.5467, -0.5467],
         ...,
         [ 1.9457,  1.9139,  2.6182,  ...,  0.7078,  0.7386,  0.7527],
         [ 1.9737,  1.9639,  2.5831,  ...,  0.8225,  1.0751,  1.0740],
         [ 2.1232,  1.7222,  2.4205,  ...,  0.9448,  1.1420,  1.1041]]),
 tensor(1))

In [28]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(1), :]
        return x

class TransformerClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, nhead, num_layers, dropout=0.2):
        super(TransformerClassifier, self).__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(input_size)
        self.input_fc = nn.Linear(input_size, hidden_size)
        self.encoder_layer = nn.TransformerEncoderLayer(hidden_size, nhead=nhead,dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(hidden_size, num_classes)
        self.init_weights()
        self.sigmoid = nn.Sigmoid()

    def init_weights(self):
        initrange = 0.1
        self.fc.bias.data.zero_()
        self.fc.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, src_padding_mask):
        src = self.pos_encoder(src)
        output = self.transformer_encoder(self.input_fc(src), src_key_padding_mask=src_padding_mask)
        output = self.fc(output.mean(dim=1))
        out = torch.sigmoid(output).squeeze()
        return out
    

In [29]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [33]:
model = TransformerClassifier(input_size=80, hidden_size=128, num_classes=1, nhead=4, num_layers=4).to(device)
print(model)
criterion = nn.BCEWithLogitsLoss()  # 二元分类
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
num_epochs=100
# 训练循环
for epoch in range(num_epochs):
    model.train()
    for sequence,label in train_loader:
        sequence, label = sequence.to(device), label.to(device)
        # 前向传播
        label = label.float()
        outputs = model(sequence,None)
        loss = criterion(outputs, label)
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    model.eval()
    total=0
    correct=0
    with torch.no_grad():
        for sequences, labels in val_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            outputs = model(sequences,None)
            predicted = (outputs > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')
    print('Accuracy of the model on the validation data: %d %%' % accuracy)



TransformerClassifier(
  (pos_encoder): PositionalEncoding()
  (input_fc): Linear(in_features=80, out_features=128, bias=True)
  (encoder_layer): TransformerEncoderLayer(
    (self_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
    )
    (linear1): Linear(in_features=128, out_features=2048, bias=True)
    (dropout): Dropout(p=0.2, inplace=False)
    (linear2): Linear(in_features=2048, out_features=128, bias=True)
    (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
    (dropout1): Dropout(p=0.2, inplace=False)
    (dropout2): Dropout(p=0.2, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-3): 4 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
        )
        (line