In [1]:
import random
import numpy as np
import pandas as pd
from tqdm import tqdm
from collections import defaultdict
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

In [2]:
df = pd.read_csv("freq_features_data.csv")

X = df.drop('inv_key', axis=1)
y = df['inv_key']
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42
)

In [3]:
char2idx = {chr(ord('a') + i): i for i in range(26)}
idx2char = {i: chr(ord('a') + i) for i in range(26)}
letters = [i for i in 'abcdefghijklmnopqrstuvwxyz']

def preprocess_X(X_train, X_val):
    """标准化输入特征（训练集拟合，验证集转换）"""
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)
    # 转换为PyTorch张量（float32）
    return torch.tensor(X_train_scaled, dtype=torch.float32), torch.tensor(X_val_scaled, dtype=torch.float32)

def preprocess_y(Y):
    Y = [[i for i in list(y) if i in letters] for y in Y]
    y_idx = np.array([[char2idx[c] for c in row] for row in Y])  # (样本数, 26)
    return torch.tensor(y_idx, dtype=torch.long)  # 类索引（long类型）

preprocess_y(y_train).shape

torch.Size([12000, 26])

In [4]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
INPUT_DIM = 702  # 输入特征维度
NUM_TASKS = 26  # 26个分类任务
NUM_CLASSES_PER_TASK = 26  # 每个任务26个类别（a-z）
BATCH_SIZE = 32
LEARNING_RATE = 5e-4
EPOCHS = 80
HIDDEN_DIM = 256

# ---------------------- 3. 自定义数据集类 ----------------------
class MultiTaskDataset(Dataset):
    def __init__(self, X, y):
        self.X = X  # (样本数, 702) 张量
        self.y = y  # (样本数, 26) 张量（每个元素是0-25的类索引）
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]  # 返回单个样本（特征+26个任务的标签）

# ---------------------- 4. MLP模型定义（共享特征+多任务输出） ----------------------
class MultiTaskMLP(nn.Module):
    def __init__(self):
        super(MultiTaskMLP, self).__init__()
        self.shared_layers = nn.Sequential(
            nn.Linear(INPUT_DIM, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(512, HIDDEN_DIM),
            nn.BatchNorm1d(HIDDEN_DIM),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
            nn.BatchNorm1d(HIDDEN_DIM),
            nn.ReLU(),
            nn.Dropout(0.1),

            nn.Linear(HIDDEN_DIM, HIDDEN_DIM),
            nn.BatchNorm1d(HIDDEN_DIM),
            nn.ReLU(),
            nn.Dropout(0.1),
        )
        self.task_heads = nn.ModuleList([
            nn.Linear(HIDDEN_DIM, NUM_CLASSES_PER_TASK) for _ in range(NUM_TASKS)
        ])
    
    def forward(self, x):
        """前向传播：输入→共享特征→26个任务输出"""
        shared_features = self.shared_layers(x)
        outputs = [head(shared_features) for head in self.task_heads]
        return outputs

def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0.0
    # 记录每个任务的总正确数和总样本数
    task_correct = [0] * NUM_TASKS
    task_total = [0] * NUM_TASKS
    
    for X_batch, y_batch in dataloader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)  # (batch_size, 26)
        
        # 前向传播：获取26个任务的输出（每个输出是(batch_size, 26)）
        outputs = model(X_batch)
        
        # 计算每个任务的损失，求和作为总损失
        loss = 0.0
        for i in range(NUM_TASKS):
            task_output = outputs[i]  # (batch_size, 26)
            task_y = y_batch[:, i]    # (batch_size,) 第i个任务的标签
            loss += criterion(task_output, task_y)
        
        # 反向传播与优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # 累计损失
        total_loss += loss.item() * X_batch.size(0)
        
        # 计算每个任务的准确率
        for i in range(NUM_TASKS):
            task_output = outputs[i]
            task_y = y_batch[:, i]
            _, predicted = torch.max(task_output, 1)  # 预测类别索引
            task_correct[i] += (predicted == task_y).sum().item()
            task_total[i] += task_y.size(0)
    
    # 计算平均损失和每个任务的准确率
    avg_loss = total_loss / len(dataloader.dataset)
    task_accs = [task_correct[i] / task_total[i] for i in range(NUM_TASKS)]
    return avg_loss, task_accs

def val_epoch(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    task_correct = [0] * NUM_TASKS
    task_total = [0] * NUM_TASKS
    
    with torch.no_grad():  # 验证时不计算梯度
        for X_batch, y_batch in dataloader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            
            outputs = model(X_batch)
            
            # 计算损失
            loss = 0.0
            for i in range(NUM_TASKS):
                task_output = outputs[i]
                task_y = y_batch[:, i]
                loss += criterion(task_output, task_y)
            total_loss += loss.item() * X_batch.size(0)
            
            # 计算准确率
            for i in range(NUM_TASKS):
                task_output = outputs[i]
                task_y = y_batch[:, i]
                _, predicted = torch.max(task_output, 1)
                task_correct[i] += (predicted == task_y).sum().item()
                task_total[i] += task_y.size(0)
    
    avg_loss = total_loss / len(dataloader.dataset)
    task_accs = [task_correct[i] / task_total[i] for i in range(NUM_TASKS)]
    return avg_loss, task_accs

def predict(model, X_val, device):
    model.eval()
    X_val = X_val.to(device)
    with torch.no_grad():
        outputs = model(X_val)  # 26个任务的输出，每个(batch_size, 26)
    
    # 概率→类别索引→字符
    y_pred = []
    for sample_idx in range(len(X_val)):
        sample_chars = []
        for task_idx in range(NUM_TASKS):
            task_output = outputs[task_idx][sample_idx]  # (26,)
            pred_idx = torch.argmax(task_output).item()  # 预测索引
            sample_chars.append(idx2char[pred_idx])      # 转换为字符
        y_pred.append(sample_chars)
    
    return y_pred  # 形状：(样本数, 26)，与原始y格式一致

In [5]:
np.random.seed(42)
X_train_tensor, X_val_tensor = preprocess_X(X_train, X_val)
y_train_tensor = preprocess_y(y_train)
y_val_tensor = preprocess_y(y_val)

train_dataset = MultiTaskDataset(X_train_tensor, y_train_tensor)
val_dataset = MultiTaskDataset(X_val_tensor, y_val_tensor)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

# ---------------------- 初始化模型、损失函数、优化器 ----------------------
model = MultiTaskMLP().to(DEVICE)
criterion = nn.CrossEntropyLoss()#label_smoothing=0.1)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)

# ---------------------- 模型训练 ----------------------
best_val_acc = 0.0
for epoch in range(EPOCHS):
    # 训练一个epoch
    train_loss, train_accs = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
    # 验证一个epoch
    val_loss, val_accs = val_epoch(model, val_loader, criterion, DEVICE)
    
    # 学习率衰减（根据验证损失）
    scheduler.step(val_loss)
    
    # 计算平均准确率（所有任务的均值）
    avg_train_acc = np.mean(train_accs)
    avg_val_acc = np.mean(val_accs)
    
    # 打印日志
    if epoch%4 == 0 or epoch == EPOCHS-1:
        print(f"Epoch [{epoch+1}/{EPOCHS}]")
        print(f"Train Loss: {train_loss:.4f} | Avg Train Acc: {avg_train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f} | Avg Val Acc: {avg_val_acc:.4f}")
        print(f"Task Accs (0-25): {[f'{acc:.4f}' for acc in val_accs[:5]]}...")  # 打印前5个任务的准确率
        print("-" * 80)
    
    # 保存最佳模型
    if avg_val_acc > best_val_acc:
        best_val_acc = avg_val_acc
        torch.save(model.state_dict(), "best_multi_task_mlp.pth")
        print(f"Best model saved (Epoch: {epoch+1} | Val Acc: {best_val_acc:.4f})")


Epoch [1/80]
Train Loss: 83.5402 | Avg Train Acc: 0.0707
Val Loss: 79.9627 | Avg Val Acc: 0.0980
Task Accs (0-25): ['0.0797', '0.0947', '0.0970', '0.0943', '0.1057']...
--------------------------------------------------------------------------------
Best model saved (Epoch: 1 | Val Acc: 0.0980)
Best model saved (Epoch: 2 | Val Acc: 0.1209)
Best model saved (Epoch: 3 | Val Acc: 0.1395)
Best model saved (Epoch: 4 | Val Acc: 0.1561)
Epoch [5/80]
Train Loss: 68.1812 | Avg Train Acc: 0.1757
Val Loss: 67.0656 | Avg Val Acc: 0.1717
Task Accs (0-25): ['0.1593', '0.1703', '0.1820', '0.1743', '0.1927']...
--------------------------------------------------------------------------------
Best model saved (Epoch: 5 | Val Acc: 0.1717)
Best model saved (Epoch: 6 | Val Acc: 0.1866)
Best model saved (Epoch: 7 | Val Acc: 0.1954)
Best model saved (Epoch: 8 | Val Acc: 0.2045)
Epoch [9/80]
Train Loss: 61.9764 | Avg Train Acc: 0.2226
Val Loss: 60.7244 | Avg Val Acc: 0.2174
Task Accs (0-25): ['0.2033', '0.214

In [6]:
# ---------------------- 模型预测 ----------------------
# 加载最佳模型
model.load_state_dict(torch.load("best_multi_task_mlp.pth"))
# 对验证集预测
y_pred = predict(model, X_val_tensor, DEVICE)

# 打印预测结果示例（前3个样本）
print("\nValidation samples:")
for i in range(3):
    print(f"True key: {' '.join( [i for i in list(y_val.iloc[i]) if i in letters] )}")
    print(f"Pred key: {' '.join(y_pred[i])}")
    print("-" * 65)


Validation samples:
True key: l j v f n a s h w z k b d o g c u p q x m t e i y r
Pred key: l x b f n i t c z j y v y o g d u p j w l t e i y n
-----------------------------------------------------------------
True key: n s w x k g r e j l f b p z y u d c t v h o q m a i
Pred key: s s k b q b c e q m f v y x b u l p s v o o w m e i
-----------------------------------------------------------------
True key: l n u z e v o p q b f d s m r j k h g y w i c t a x
Pred key: m t u z e j o y q g f m t y r v v c g d w a n t a q
-----------------------------------------------------------------
