In [1]:
#GRU下的姓名分类

In [3]:
import torch

device = torch.device("cpu")
if torch.cuda.is_available():
    device = torch.device("cuda")
    
torch.set_default_device(device)
print(f"Using device = {torch.get_default_device()}")

Using device = cuda:0


In [5]:
import string
import unicodedata

# 使用_来代替未在字典中出现的字符
allowed_characters = string.ascii_letters + " .,;'" + "_"
n_letters = len(allowed_characters)

# 奖Unicode编码转为ASCII编码 
def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
        and c in allowed_characters
    )

In [7]:
#获取字符的序号
def letterToIndex(letter):
    if letter not in allowed_characters:
        return allowed_characters.find("_")
    else:
        return allowed_characters.find(letter)
#将输入字符转化为tensor
def lineToTensor(line):
    tensor = torch.zeros(len(line), 1, n_letters)
    for i, letter in enumerate(line):
        tensor[i][0][letterToIndex(letter)] = 1
    return tensor

In [9]:
from io import open
import glob
import os
import time

import torch
from torch.utils.data import Dataset

In [11]:
class NamesDataset(Dataset):
    def __init__(self, data_dir):
        self.data_dir = data_dir # 保存数据目录路径（用于记录数据来源）
        self.load_time = time.localtime() # 保存加载时间（用于记录数据来源）
        
        # 只存储文件路径和每个文件对应的标签，不加载内容，防止内容过载
        self.samples = []  # 存储 (name, label) 的元组
        text_files = glob.glob(os.path.join(data_dir, '*.txt'))
        
        for filename in text_files:
            label = os.path.splitext(os.path.basename(filename))[0]
            # 逐行读取，但只保存 (name, label)，不保存张量！
            with open(filename, encoding='utf-8') as f:
                for line in f:
                    name = line.strip()
                    if name:  # 忽略空行
                        self.samples.append((name, label))
        
        # 构建标签到索引的映射（只需一次）
        unique_labels = sorted(set(label for _, label in self.samples))
        self.label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
        self.idx_to_label = unique_labels  # 可选：用于反向查找

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        name, label = self.samples[idx]
        # 在这里才转换为张量
        data_tensor = lineToTensor(name)  # 按需计算
        label_tensor = torch.tensor([self.label_to_idx[label]], dtype=torch.long)
        return label_tensor, data_tensor, label, name

In [13]:
dataset = NamesDataset("./data/names/")


In [17]:
#将数据分为训练集和测试集
train_set, test_set = torch.utils.data.random_split(dataset, [.85, .15], generator=torch.Generator(device=device).manual_seed(2026))

print(f"train examples = {len(train_set)}, validation examples = {len(test_set)}")

train examples = 17063, validation examples = 3011


In [60]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence

class CharGRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(CharGRU, self).__init__()
        self.hidden_size = hidden_size
        self.gru = nn.GRU(input_size, hidden_size, batch_first=False)
        self.h2o = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input_seq, lengths=None):
        if lengths is not None:
            # 使用 enforce_sorted=True，要求 lengths 是降序（训练时已排序，预测时单样本天然满足）
            packed_input = pack_padded_sequence(
                input_seq, lengths, batch_first=False, enforce_sorted=False
            )
            _, hidden = self.gru(packed_input)
        else:
            _, hidden = self.gru(input_seq)
        
        last_hidden = hidden[-1]  # (batch, hidden_size)
        output = self.h2o(last_hidden)
        return self.softmax(output)

    def init_hidden(self, batch_size=1):
        """初始化隐藏状态（GRU 只需要一个 hidden tensor）"""
        return torch.zeros(1, batch_size, self.hidden_size)

In [62]:
n_hidden = 128
n_categories = len(dataset.label_to_idx) 
rnn = CharGRU(n_letters, n_hidden, n_categories)
print(rnn)

CharGRU(
  (gru): GRU(58, 128)
  (h2o): Linear(in_features=128, out_features=18, bias=True)
  (softmax): LogSoftmax(dim=1)
)


In [64]:
import torch
import torch.nn as nn
import random
import time
import numpy as np
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence

def train_rnn(model, training_data, n_epoch=10, n_batch_size=64, report_every=50,
              learning_rate=0.01, criterion=nn.NLLLoss(), save_path="best_name_classify.pth"):
    """
    Train an RNN (LSTM or GRU) model on variable-length name classification data.
    Handles batching and padding properly using pack_padded_sequence.
    """
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    all_losses = []
    
    print(f"Training on dataset with n = {len(training_data)}")
    start = time.time()

    best_loss = float('inf')  # 全局最佳 loss

    for epoch in range(1, n_epoch + 1):
        indices = list(range(len(training_data)))
        random.shuffle(indices)
        
        total_loss = 0
        num_batches = 0
        
        for i in range(0, len(indices), n_batch_size):
            batch_indices = indices[i:i + n_batch_size]
            
            batch_texts = []
            batch_labels = []
            batch_lengths = []
            
            for idx in batch_indices:
                label_tensor, text_tensor, label, text = training_data[idx]
                batch_texts.append(text_tensor.squeeze(1))  # (seq_len, input_size)
                batch_labels.append(label_tensor)
                batch_lengths.append(text_tensor.size(0))
            
            labels = torch.cat(batch_labels).squeeze()  # (batch_size,)
            padded_seqs = nn.utils.rnn.pad_sequence(batch_texts, batch_first=False, padding_value=0)
            lengths_tensor = torch.LongTensor(batch_lengths).cpu()  # ⚠️ 必须在 CPU！
            
            optimizer.zero_grad()
            output = model(padded_seqs, lengths_tensor)  # 兼容 CharLSTM / CharGRU
            loss = criterion(output, labels)
            loss.backward()
            
            # 梯度裁剪（对 LSTM/GRU 都推荐）
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
            
            optimizer.step()
            
            total_loss += loss.item()
            num_batches += 1
        
        avg_loss = total_loss / num_batches
        all_losses.append(avg_loss)

        # 仅当达到历史最低 loss 时保存
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), save_path)
            print(f"New best model saved at epoch {epoch} (loss={avg_loss:.4f})")
            
        if epoch % report_every == 0:
            elapsed = time.time() - start
            print(f"Epoch {epoch} ({epoch/n_epoch:.0%}): "
                  f"Average Loss = {avg_loss:.4f}, Time = {elapsed:.1f}s")
    
    return all_losses

In [66]:
#进行训练
start = time.time()
all_losses = train_lstm(rnn, train_set, n_epoch=6, learning_rate=0.05, report_every=5)
end = time.time()
print(f"training took {end-start}s")

Training on dataset with n = 17063
New best model saved at epoch 1 (loss=1.0029)
New best model saved at epoch 2 (loss=0.7961)
New best model saved at epoch 4 (loss=0.7135)
New best model saved at epoch 5 (loss=0.6900)
Epoch 5 (83%): Average Loss = 0.6900, Time = 56.1s
New best model saved at epoch 6 (loss=0.6899)
training took 66.34404492378235s


In [67]:
# 加载训练好的参数
rnn.load_state_dict(torch.load('best_name_classify.pth', weights_only=True))
rnn.eval();  # 切换到评估模式

In [71]:
def predict_name(model, name, dataset):
    model.eval()
    with torch.no_grad():
        input_tensor = lineToTensor(name)  # 可能是 (L, D) 或 (L, 1, D)
        
        if input_tensor.dim() == 2:
            # (L, D) → 添加 batch 维度
            input_batch = input_tensor.unsqueeze(1)  # (L, 1, D)
        elif input_tensor.dim() == 3:
            # 已经是 (L, B, D)，检查 batch 是否为 1
            if input_tensor.size(1) != 1:
                raise ValueError(f"Expected batch size 1, got {input_tensor.size(1)}")
            input_batch = input_tensor
        else:
            raise ValueError(f"Unexpected input dimension: {input_tensor.dim()}")
        
        lengths = torch.tensor([len(name)], dtype=torch.long, device='cpu')
        output = model(input_batch, lengths)
        predicted_idx = output.argmax(dim=1).item()
        return dataset.idx_to_label[predicted_idx]

In [83]:
# 3. 预测
result = predict_name(rnn, "Albert", dataset)
print(result)  # → 应该输出 "English"

English
