In [67]:
import csv
import torch
import tqdm
import json, os
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# from torchcrf import CRF
from collections import Counter
from collections import defaultdict
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support
from sklearn.feature_extraction.text import TfidfTransformer

# 基于机器学习的文本分类
## 文本向量化

- 将文本数据转换为 Bag-of-Words（词袋）特征向量

In [2]:
def tokenize(text):
    # 简单空格分词，实际可扩展
    return text.lower().split()

def build_vocab(texts, max_vocab_size=10000, min_freq=1):
    # 统计词频，建立词表
    counter = Counter()
    for text in texts:
        counter.update(tokenize(text))
    # 过滤低频词，截断词表
    vocab = [word for word, freq in counter.items() if freq >= min_freq]
    vocab = sorted(vocab, key=lambda w: counter[w], reverse=True)[:max_vocab_size]
    word2idx = {w: i for i, w in enumerate(vocab)}
    return word2idx

def text_to_bow(text, word2idx):
    vec = np.zeros(len(word2idx))
    for w in tokenize(text):
        if w in word2idx:
            vec[word2idx[w]] += 1
    return vec

def texts_to_bow(texts, word2idx, mode='bow'):
    bow = np.array([text_to_bow(text, word2idx) for text in texts])
    # 返回原始的词袋模型（计数矩阵）
    if mode == 'bow':
        return bow
    # 返回词频，对每行进行归一化
    elif mode == 'tf':
        return bow / np.maximum(1, bow.sum(axis=1, keepdims=True))
    # 使用 TfidfTransformer 转换为 TF-IDF 特征
    elif mode == 'tfidf':
        transformer = TfidfTransformer()
        return transformer.fit_transform(bow).toarray()
    else:
        raise ValueError(f"Unknown mode: {mode}")

## 定义逻辑回归模型中常用的三个辅助函数

In [3]:
# 激活函数
def sigmoid(z):
    return 1 / (1 + np.exp(-z))
def softmax(z):
    exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))  # 防止溢出
    return exp_z / np.sum(exp_z, axis=1, keepdims=True)

# one-hot编码
def one_hot(labels, num_classes, smooth=False, smoothing=0.1):
    oh = np.zeros((len(labels), num_classes))
    if smooth:
        oh += smoothing / num_classes
        for i, l in enumerate(labels):
            oh[i, l] = 1 - smoothing + (smoothing / num_classes)
    else:
        for i, l in enumerate(labels):
            oh[i, l] = 1
    return oh

## 构建逻辑回归分类器

In [4]:
class LogisticRegression:
    def __init__(self, input_dim, num_classes=2, lr=0.1, loss_type='ce'):
        self.W = np.random.randn(input_dim, num_classes) * 0.01
        self.b = np.zeros(num_classes)
        self.lr = lr
        self.num_classes = num_classes
        self.loss_type = loss_type

    def forward(self, X):
        logits = X @ self.W + self.b
        return softmax(logits)

    def compute_loss(self, X, y):
        m = X.shape[0]
        probs = self.forward(X)
        smooth = self.loss_type == 'ls'
        y_onehot = one_hot(y, self.num_classes, smooth=smooth)
        loss = -np.mean(np.sum(y_onehot * np.log(probs + 1e-15), axis=1))
        return loss

    def backward(self, X, y):
        m = X.shape[0]
        probs = self.forward(X)
        smooth = self.loss_type == 'ls'
        y_onehot = one_hot(y, self.num_classes, smooth=smooth)
        dz = (probs - y_onehot) / m
        dW = X.T @ dz
        db = np.sum(dz, axis=0)
        return dW, db

    def update_params(self, dW, db):
        self.W -= self.lr * dW
        self.b -= self.lr * db

    def predict(self, X):
        probs = self.forward(X)
        return np.argmax(probs, axis=1)

    def fit(self, X_train, y_train, epochs=10, batch_size=64, shuffle=True):
        m = X_train.shape[0]
        for epoch in range(1, epochs + 1):
            if shuffle:
                indices = np.random.permutation(m)
            else:
                indices = np.arange(m)
            X_train = X_train[indices]
            y_train = y_train[indices]
            for start in range(0, m, batch_size):
                end = start + batch_size
                X_batch = X_train[start:end]
                y_batch = y_train[start:end]
                dW, db = self.backward(X_batch, y_batch)
                self.update_params(dW, db)
            acc = np.mean(self.predict(X_train) == y_train)
            print(f"Epoch {epoch}: acc={acc:.4f}, loss={self.compute_loss(X_train, y_train):.4f}")

## 运行程序

In [5]:
def run_experiment(feature_mode='bow', lr=0.1, loss_type='ce', vocab_size=5000):
    print(f"\n[实验] 特征={feature_mode}, 学习率={lr}, 损失函数={loss_type}")
    Train = pd.read_csv('D:/Data/master/train.tsv', sep='\t')
    Test = pd.read_csv('D:/Data/master/test.tsv', sep='\t')
    train_texts = Train['Phrase'].astype(str).values
    train_labels = Train['Sentiment'].values
    test_texts = Test['Phrase'].astype(str).values

    word2idx = build_vocab(train_texts, max_vocab_size=vocab_size)
    X_train = texts_to_bow(train_texts, word2idx, mode=feature_mode)
    X_test = texts_to_bow(test_texts, word2idx, mode=feature_mode)

    model = LogisticRegression(
        input_dim=X_train.shape[1],
        num_classes=5,
        lr=lr,
        loss_type=loss_type
    )
    model.fit(X_train, train_labels, epochs=10, batch_size=64)

    y_pred = model.predict(X_test)
    output_df = pd.DataFrame({
        'PhraseId': Test['PhraseId'],
        'Sentiment': y_pred
    })
    filename = f'submission_{feature_mode}_lr{lr}_loss{loss_type}.csv'
    output_df.to_csv(filename, index=False)
    print(f"[输出] 已保存至 {filename}")
    return np.mean(model.predict(X_train) == train_labels)

'''主程序'''
if __name__ == '__main__':
    feature_modes = ['bow', 'tf', 'tfidf']
    learning_rates = [0.001, 0.01, 0.1]
    loss_types = ['ce', 'ls']  # ce: cross entropy, ls: label smoothing

    results = {}
    for feat in feature_modes:
        for lr in learning_rates:
            for loss in loss_types:
                acc = run_experiment(feature_mode=feat, lr=lr, loss_type=loss)
                results[(feat, lr, loss)] = acc

    print("\n=== 所有组合结果 ===")
    for (feat, lr, loss), acc in results.items():
        print(f"{feat} + lr={lr} + loss={loss}: acc={acc:.4f}")


[实验] 特征=bow, 学习率=0.001, 损失函数=ce
Epoch 1: acc=0.5099, loss=1.3925
Epoch 2: acc=0.5102, loss=1.3236
Epoch 3: acc=0.5123, loss=1.2911
Epoch 4: acc=0.5142, loss=1.2724
Epoch 5: acc=0.5155, loss=1.2602
Epoch 6: acc=0.5167, loss=1.2516
Epoch 7: acc=0.5177, loss=1.2450
Epoch 8: acc=0.5184, loss=1.2397
Epoch 9: acc=0.5195, loss=1.2354
Epoch 10: acc=0.5205, loss=1.2317
[输出] 已保存至 submission_bow_lr0.001_lossce.csv

[实验] 特征=bow, 学习率=0.001, 损失函数=ls
Epoch 1: acc=0.5099, loss=1.4326
Epoch 2: acc=0.5101, loss=1.3769
Epoch 3: acc=0.5121, loss=1.3509
Epoch 4: acc=0.5139, loss=1.3361
Epoch 5: acc=0.5154, loss=1.3265
Epoch 6: acc=0.5163, loss=1.3198
Epoch 7: acc=0.5175, loss=1.3147
Epoch 8: acc=0.5185, loss=1.3107
Epoch 9: acc=0.5192, loss=1.3074
Epoch 10: acc=0.5202, loss=1.3046
[输出] 已保存至 submission_bow_lr0.001_lossls.csv

[实验] 特征=bow, 学习率=0.01, 损失函数=ce
Epoch 1: acc=0.5204, loss=1.2317
Epoch 2: acc=0.5254, loss=1.2110
Epoch 3: acc=0.5286, loss=1.2004
Epoch 4: acc=0.5314, loss=1.1927
Epoch 5: acc=0.5333,

# 基于深度学习的文本分类
## 文本向量化(深度学习模式)

- 使用词嵌入序列

In [6]:
def tokenize(text):
    return text.lower().split()

def build_vocab(texts, max_vocab_size=10000, min_freq=1):
    counter = Counter()
    for text in texts:
        counter.update(tokenize(text))
    vocab = [word for word, freq in counter.items() if freq >= min_freq]
    vocab = vocab[:max_vocab_size - 2]
    word2idx = {word: idx + 2 for idx, word in enumerate(vocab)}
    word2idx['<PAD>'] = 0
    word2idx['<UNK>'] = 1
    return word2idx
# 将一条文本编码成固定长度的索引序列
def encode(text, word2idx, max_len=50):
    tokens = tokenize(text)
    ids = [word2idx.get(w, 0) for w in tokens]
    if len(ids) < max_len:
        ids += [0] * (max_len - len(ids))
    else:
        ids = ids[:max_len]
    return ids

def get_feature_tensor(texts, word2idx, mode='index', max_len=50):
    if mode == 'index':  # 用于 CNN/RNN
        return [torch.tensor(encode(t, word2idx, max_len)) for t in texts]
    else:  # bow / tf / tfidf
        bow = np.zeros((len(texts), len(word2idx)))
        for i, text in enumerate(texts):
            for w in tokenize(text):
                if w in word2idx:
                    bow[i][word2idx[w]] += 1
        if mode == 'tf':
            bow = bow / np.maximum(1, bow.sum(axis=1, keepdims=True))
        elif mode == 'tfidf':
            transformer = TfidfTransformer()
            bow = transformer.fit_transform(bow).toarray()
        return [torch.tensor(row, dtype=torch.float32) for row in bow]

In [7]:
'''封装输入文本和标签，供DataLoader使用'''
class TextDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = torch.tensor(labels, dtype=torch.long)
    def __len__(self):
        return len(self.inputs)
    def __getitem__(self, idx):
        return self.inputs[idx], self.labels[idx]

## 构建深度学习模型
- 卷积神经网络（CNN）、循环神经网络（RNN） 和 线性模型

In [8]:
class CNNTextClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, kernel_sizes=[3,4,5], num_filters=100):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.convs = nn.ModuleList([nn.Conv1d(embed_dim, num_filters, k) for k in kernel_sizes])
        self.fc = nn.Linear(len(kernel_sizes) * num_filters, num_classes)
    def forward(self, x):
        x = self.embedding(x).transpose(1, 2)
        x = [F.relu(conv(x)).max(dim=2)[0] for conv in self.convs]
        x = torch.cat(x, dim=1)
        return self.fc(x)

class RNNTextClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, hidden_dim=128):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)
    def forward(self, x):
        x = self.embedding(x)
        _, (h_n, _) = self.lstm(x)
        return self.fc(h_n[-1])

class LinearClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.fc = nn.Linear(input_dim, num_classes)
    def forward(self, x):
        return self.fc(x)

> **损失函数**

In [9]:
def get_loss_fn(loss_type='ce', smoothing=0.1, num_classes=5):
    # 普通交叉熵损失
    if loss_type == 'ce':
        return nn.CrossEntropyLoss()
    # 标签平滑交叉熵
    elif loss_type == 'ls':
        def loss_fn(pred, target):
            one_hot = torch.zeros_like(pred).scatter(1, target.unsqueeze(1), 1)
            one_hot = one_hot * (1 - smoothing) + smoothing / num_classes
            log_prob = F.log_softmax(pred, dim=1)
            return -(one_hot * log_prob).sum(dim=1).mean()
        return loss_fn
    else:
        raise ValueError(f"Unknown loss type: {loss_type}")

> **训练与评估**

In [10]:
def train_model(model, train_loader, val_loader, epochs, lr, loss_type, device):
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = get_loss_fn(loss_type, num_classes=5)

    for epoch in range(1, epochs + 1):
        model.train()
        correct, total_loss = 0, 0
        for X, y in train_loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            out = model(X)
            loss = loss_fn(out, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * X.size(0)
            correct += (out.argmax(1) == y).sum().item()
        acc = correct / len(train_loader.dataset)
        print(f"Epoch {epoch}: Loss={total_loss/len(train_loader.dataset):.4f}, Acc={acc:.4f}")

    return evaluate(model, val_loader, device)

def evaluate(model, loader, device):
    model.eval()
    correct = 0
    with torch.no_grad():
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            out = model(X)
            correct += (out.argmax(1) == y).sum().item()
    return correct / len(loader.dataset)

## 主程序

In [13]:
def run_experiment(model_type='cnn', feature_type='index', loss_type='ce', lr=0.001, embed_dim=100):
    print(f"\n>>> 模型={model_type}, 特征={feature_type}, loss={loss_type}, lr={lr}")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    Train = pd.read_csv('D:/Data/master/train.tsv', sep='\t')
#     Test = pd.read_csv('D:/Data/master/test.tsv', sep='\t')
    train_texts, train_labels = Train['Phrase'].astype(str).tolist(), Train['Sentiment'].tolist()
#     val_texts, val_labels = Test['Phrase'].astype(str).tolist(), Test['Sentiment'].tolist()
    # ---------------- 构建词表和特征向量 ----------------
    word2idx = build_vocab(train_texts, max_vocab_size=10000)
    X_train, X_val, y_train, y_val = train_test_split(train_texts, train_labels, test_size=0.2, random_state=42)
    train_inputs = get_feature_tensor(X_train, word2idx, mode=feature_type)
    val_inputs = get_feature_tensor(X_val, word2idx, mode=feature_type)
    train_dataset = TextDataset(train_inputs, y_train)
    val_dataset = TextDataset(val_inputs, y_val)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64)
    # ---------------- 构建模型 ----------------
    if feature_type == 'index':
        vocab_size = len(word2idx)
        if model_type == 'cnn':
            model = CNNTextClassifier(vocab_size, embed_dim, num_classes=5)
        elif model_type == 'rnn':
            model = RNNTextClassifier(vocab_size, embed_dim, num_classes=5)
        else:
            raise ValueError("Only CNN/RNN support index input.")
    else:
        model = LinearClassifier(input_dim=len(word2idx), num_classes=5)

    acc = train_model(model, train_loader, val_loader, epochs=10, lr=lr, loss_type=loss_type, device=device)
    print(f"==> Final Val Acc: {acc:.4f}")
    return acc

In [14]:
if __name__ == '__main__':
    model_types = ['cnn', 'rnn']  # 仅 index 特征可选
    feature_types = ['index', 'bow', 'tf', 'tfidf']
    loss_types = ['ce', 'ls']
    learning_rates = [0.001, 0.01]

    results = {}
    for feat in feature_types:
        applicable_models = model_types if feat == 'index' else ['linear']
        for model_type in applicable_models:
            for loss in loss_types:
                for lr in learning_rates:
                    key = (model_type, feat, loss, lr)
                    acc = run_experiment(model_type=model_type, feature_type=feat, loss_type=loss, lr=lr)
                    results[key] = acc

    print("\n=== 所有实验结果 ===")
    for k, v in results.items():
        print(f"{k}: acc={v:.4f}")


>>> 模型=cnn, 特征=index, loss=ce, lr=0.001
Epoch 1: Loss=1.0817, Acc=0.5730
Epoch 2: Loss=0.8626, Acc=0.6551
Epoch 3: Loss=0.7662, Acc=0.6930
Epoch 4: Loss=0.7020, Acc=0.7185
Epoch 5: Loss=0.6521, Acc=0.7376
Epoch 6: Loss=0.6084, Acc=0.7552
Epoch 7: Loss=0.5745, Acc=0.7700
Epoch 8: Loss=0.5452, Acc=0.7838
Epoch 9: Loss=0.5111, Acc=0.7974
Epoch 10: Loss=0.4871, Acc=0.8075
==> Final Val Acc: 0.6337

>>> 模型=cnn, 特征=index, loss=ce, lr=0.01
Epoch 1: Loss=1.1314, Acc=0.5686
Epoch 2: Loss=0.9663, Acc=0.6248
Epoch 3: Loss=0.9188, Acc=0.6441
Epoch 4: Loss=0.8841, Acc=0.6571
Epoch 5: Loss=0.8575, Acc=0.6675
Epoch 6: Loss=0.8493, Acc=0.6745
Epoch 7: Loss=0.8268, Acc=0.6804
Epoch 8: Loss=0.8240, Acc=0.6840
Epoch 9: Loss=0.8060, Acc=0.6881
Epoch 10: Loss=0.7994, Acc=0.6931
==> Final Val Acc: 0.6396

>>> 模型=cnn, 特征=index, loss=ls, lr=0.001
Epoch 1: Loss=1.2005, Acc=0.5716
Epoch 2: Loss=1.0442, Acc=0.6534
Epoch 3: Loss=0.9751, Acc=0.6909
Epoch 4: Loss=0.9306, Acc=0.7183
Epoch 5: Loss=0.8972, Acc=0.7364

# 基于注意力机制的文本匹配
## 数据集的预处理

In [19]:
# 将文本转为小写并按空格切分成单词
def tokenize(text):
    return text.lower().split()
# 统计词频，构建一个词到索引的字典
def build_vocab(sent_pairs, max_vocab=10000, min_freq=1):
    cnt = Counter()
    for s1, s2, _ in sent_pairs:
        cnt.update(tokenize(s1))
        cnt.update(tokenize(s2))
    vocab = [w for w, freq in cnt.items() if freq >= min_freq][:max_vocab-2]
    word2idx = {w: i+2 for i, w in enumerate(vocab)}
    word2idx["<PAD>"], word2idx["<UNK>"] = 0, 1
    return word2idx

class PairDataset(Dataset):
    def __init__(self, pairs, word2idx, max_len=50):
        self.data = pairs
        self.w2i = word2idx
        self.max_len = max_len
    def encode(self, s):
        tok = tokenize(s)[:self.max_len]
        ids = [self.w2i.get(w,1) for w in tok] + [0]*(self.max_len- len(tok))
        return ids
    def __len__(self): return len(self.data)
    def __getitem__(self, i):
        s1, s2, label = self.data[i]
        return torch.tensor(self.encode(s1)), torch.tensor(self.encode(s2)), torch.tensor(label)
# 从一个 .jsonl 格式的文件中读取句子对和标签
def load_jsonl_pair(file_path):
    """
    读取 SNLI 数据集格式的 jsonl 文件，并返回三元组：(sentence1, sentence2, label)
    标签映射：entailment → 0, neutral → 1, contradiction → 2
    """
    label_map = {"entailment": 0, "neutral": 1, "contradiction": 2}
    data = []
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            d = json.loads(line)
            label = d["gold_label"]
            if label not in label_map:
                continue  # 忽略无效标签
            sent1 = d["sentence1"]
            sent2 = d["sentence2"]
            data.append((sent1, sent2, label_map[label]))
    return data

## 构建经典文本匹配模型

In [20]:
class ESIM(nn.Module):
    def __init__(self, vocab_size, emb_dim=100, hid=128, num_labels=3):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.bilstm1 = nn.LSTM(emb_dim, hid, bidirectional=True, batch_first=True)
        self.bilstm2 = nn.LSTM(hid*8, hid, bidirectional=True, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(hid*8, hid),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(hid, num_labels)
        )
    def forward(self, x1, x2):
        emb1, emb2 = self.embed(x1), self.embed(x2)
        o1, _ = self.bilstm1(emb1)
        o2, _ = self.bilstm1(emb2)
        # 注意力机制
        sim = torch.bmm(o1, o2.transpose(1,2))
        a2 = F.softmax(sim, dim=2)
        a1 = F.softmax(sim.transpose(1,2), dim=2)
        m1 = torch.bmm(a2, o2)
        m2 = torch.bmm(a1, o1)
        # 将原始表示和对齐信息组合起来，以捕捉推理线索
        comp1 = torch.cat([o1, m1, o1-m1, o1*m1], dim=-1)
        comp2 = torch.cat([o2, m2, o2-m2, o2*m2], dim=-1)
        # 推理
        o1c, _ = self.bilstm2(comp1)
        o2c, _ = self.bilstm2(comp2)
        # 池化 + 拼接 + 分类
        v1 = torch.cat([o1c.mean(1), o1c.max(1)[0]], dim=-1)
        v2 = torch.cat([o2c.mean(1), o2c.max(1)[0]], dim=-1)
        out = torch.cat([v1, v2], dim=-1)
        return self.fc(out)

## 训练与检验

In [21]:
def train(model, loader, opt, crit, device, num_epochs=10):
    for epoch in range(1, num_epochs + 1):
        model.train()
        total_loss = 0
        for x1, x2, y in loader:
            x1, x2, y = x1.to(device), x2.to(device), y.to(device)
            opt.zero_grad()
            logit = model(x1, x2)
            loss = crit(logit, y)
            loss.backward()
            opt.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(loader)
        return avg_loss
'''在验证集上评估模型'''
def eval_epoch(model, loader, crit, device):
    model.eval()
    tot_loss, correct = 0, 0
    with torch.no_grad():
        for x1,x2,y in loader:
            x1,x2,y = x1.to(device), x2.to(device), y.to(device)
            logit = model(x1,x2)
            tot_loss += crit(logit,y).item()
            pred = logit.argmax(1)
            correct += (pred==y).sum().item()
    return tot_loss/len(loader), correct/len(loader.dataset)

'''句子对判断'''
# 给定两个句子，使用训练好的 ESIM 模型预测它们之间的逻辑关系（蕴含、中立、矛盾）
def infer_relation(model, sent1, sent2, word2idx, device, max_len=50):
    model.eval()
    def encode(s):
        tok = tokenize(s)[:max_len]
        ids = [word2idx.get(w, 1) for w in tok] + [0] * (max_len - len(tok))
        return torch.tensor(ids).unsqueeze(0).to(device)
    x1 = encode(sent1)
    x2 = encode(sent2)
    with torch.no_grad():
        logits = model(x1, x2)
        pred = logits.argmax(1).item()
    label_map = {0: "entailment", 1: "neutral", 2: "contradiction"}
    return label_map[pred]

## 主函数

In [22]:
if __name__ == "__main__":
    '''数据读取'''
    train_pairs = load_jsonl_pair("D:/Data/master/snli_1.0/snli_1.0_train.jsonl")
    test_pairs = load_jsonl_pair("D:/Data/master/snli_1.0/snli_1.0_test.jsonl")
    word2idx = build_vocab(train_pairs, max_vocab=10000)
    # 创建 Dataset 和 DataLoader
    train_ds = PairDataset(train_pairs, word2idx)
    test_ds = PairDataset(test_pairs, word2idx)
    train_loader = DataLoader(train_ds, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_ds, batch_size=64)
    '''模型准备'''
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model3 = ESIM(len(word2idx), emb_dim=100, hid=128, num_labels=3).to(device)
    opt = torch.optim.Adam(model3.parameters(), lr=5e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, 'min', factor=0.5, patience=2)
    crit = nn.CrossEntropyLoss()
    '''训练循环 + Early Stopping'''
    best_loss, patience = float('inf'), 3
    wait = 0
    best_state = None
    for epoch in range(20):
        trl = train(model3,train_loader,opt,crit,device)
        vll, vac = eval_epoch(model3,test_loader,crit,device)
        print(f"Epoch {epoch:2d} | Train Loss: {trl:.4f} | Test Loss: {vll:.4f} | Test Acc: {vac:.4f}")
        scheduler.step(vll)
        if vll < best_loss:
            best_loss, best_state = vll, model3.state_dict()
            wait = 0
        else:
            wait += 1
            if wait >= patience:
                print("Early stopping.")
                break
    # 加载最优模型
    if best_state:
        model3.load_state_dict(best_state)

Epoch  0 | Train Loss: 0.7261 | Test Loss: 0.6092 | Test Acc: 0.7439
Epoch  1 | Train Loss: 0.5903 | Test Loss: 0.5596 | Test Acc: 0.7720
Epoch  2 | Train Loss: 0.5281 | Test Loss: 0.5415 | Test Acc: 0.7799
Epoch  3 | Train Loss: 0.4753 | Test Loss: 0.5396 | Test Acc: 0.7831
Epoch  4 | Train Loss: 0.4259 | Test Loss: 0.5708 | Test Acc: 0.7848
Epoch  5 | Train Loss: 0.3761 | Test Loss: 0.5869 | Test Acc: 0.7809
Epoch  6 | Train Loss: 0.3296 | Test Loss: 0.6507 | Test Acc: 0.7778
Early stopping.


# 基于LSTM+CRF的标签序列
## 数据预处理

In [90]:
'''从 CoNLL 格式的文件中读取数据'''
# sents: 每个句子是一个单词列表，如 ["EU", "rejects", "German", "call"]
# tags: 每个句子对应的NER标签序列，如 ["B-ORG", "O", "B-MISC", "O"]
def read_conll(fpath):
    sents, tags = [], []
    sent, tag = [], []
    for line in open(fpath, encoding='utf8'):
        line = line.strip()
        if not line:  # 跳过空行
            if sent:  # 如果当前句子非空，则保存
                sents.append(sent)
                tags.append(tag)
                sent, tag = [], []
            continue
        try:
            token, pos, chunk, ner = line.split()
            sent.append(token)
            tag.append(ner)
        except ValueError:
            print(f"Skipping invalid line: {line}")  # 可以打印出问题行，便于调试
    return sents, tags
'''构建词表（word2idx）：把词转成数字索引'''
def build_vocab(seqs, min_freq=1):
    cnt = defaultdict(int)
    for seq in seqs:
        for w in seq:
            cnt[w] += 1
    return {w:i+2 for i,(w,c) in enumerate(cnt.items()) if c>=min_freq} | {'<PAD>':0,'<UNK>':1}
'''构建标签的编码表'''
def build_vocab(seqs, max_vocab=10000, min_freq=1):
    cnt = Counter()
    for seq in seqs:  # 直接遍历句子列表
        cnt.update(seq)  # 统计词频
    vocab = [w for w, freq in cnt.items() if freq >= min_freq][:max_vocab-2]
    word2idx = {w: i+2 for i, w in enumerate(vocab)}
    word2idx["<PAD>"], word2idx["<UNK>"] = 0, 1  # 确保包含<UNK>和<PAD>
    return word2idx
'''将每个句子中的词或标签编码成数字索引，并统一长度'''
def encode_and_pad(seqs, vocab, pad_len):
    if '<UNK>' not in vocab:
#         print("Warning: '<UNK>' is missing in vocab!")
        vocab['<UNK>'] = 1    # 如果`<UNK>`没有在 vocab 中，手动添加
    if '<PAD>' not in vocab:
        vocab['<PAD>'] = 0  # 如果 vocab 中没有 <PAD>，则手动添加
    idxs = [[vocab.get(w, vocab['<UNK>']) for w in s] for s in seqs]
    return [seq + [vocab['<PAD>']]*(pad_len-len(seq)) for seq in idxs]


class NERDataset(Dataset):
    def __init__(self, sents, tags, w2i, t2i, pad_len):
        self.X = torch.tensor(encode_and_pad(sents, w2i, pad_len), dtype=torch.long)
        self.Y = torch.tensor(encode_and_pad(tags, t2i, pad_len), dtype=torch.long)
        self.mask = (self.X!=w2i['<PAD>']).long()
    def __len__(self): return len(self.X)
    def __getitem__(self, i): return self.X[i], self.mask[i], self.Y[i]

## 构建模型

In [91]:
class BiLSTM_CRF(nn.Module):
    def __init__(self, vocab_size, tag_size, emb_dim=100, hid=128):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.bilstm = nn.LSTM(emb_dim, hid, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hid*2, tag_size)
        self.crf = CRF(tag_size)
    def forward(self, X, mask, tags=None):
        emb = self.emb(X)
        h, _ = self.bilstm(emb)
        feats = self.fc(h)
#         print("feats shape:", feats.shape)
        if tags is not None:
            return -self.crf(feats, tags, mask)  # 负 log-likelihood
        return self.crf.decode(feats, mask)

class CRF(nn.Module):
    def __init__(self, tag_size):
        super().__init__()
        self.tag_size = tag_size
        self.transitions = nn.Parameter(torch.randn(tag_size, tag_size))
        self.start = nn.Parameter(torch.randn(tag_size))
        self.end = nn.Parameter(torch.randn(tag_size))
    def forward(self, feats, tags, mask):
        # 前向-后向算法求 partition + score
        return self._neg_log_likelihood(feats, tags, mask)
    def _neg_log_likelihood(self, feats, tags, mask):
        """
        计算负对数似然：CRF的损失函数，使用前向-后向算法
        """
        batch_size, seq_len, _ = feats.shape
        log_likelihood = torch.zeros(batch_size).to(feats.device)
        # 遍历每个句子，计算每个句子的得分
        for i in range(batch_size):
            mask_i = mask[i]  # 获取当前句子的mask（去除padding的部分）
            tags_i = tags[i, mask_i == 1]  # 当前句子的标签（去除padding）
            feats_i = feats[i, mask_i == 1]  # 当前句子的特征（去除padding）
            # 计算CRF的得分
            score = self.start[tags_i[0]] + feats_i[0, tags_i[0]]  # 第一个标签的得分
            for t in range(1, len(tags_i)):
                score += feats_i[t, tags_i[t]] + self.transitions[tags_i[t-1], tags_i[t]]
            # 使用前向-后向算法计算分区函数（partition function）
            partition = self._forward_backward(feats_i, mask_i)
            log_likelihood[i] = score - partition
        return -log_likelihood.sum()
    
    def _forward_backward(self, feats, mask):
        """
        使用前向-后向算法计算分区函数（logZ）。
        """
        seq_len, tag_size = feats.shape
        alpha = torch.full((seq_len, tag_size), -10000.0).to(feats.device)  # 初始化alpha
        alpha[0] = self.start + feats[0]  # 初始化第一个位置的alpha值
        # 计算每个位置的alpha值
        for t in range(1, seq_len):
            emit = feats[t].unsqueeze(0)         # (1, tag_size)
            trans = self.transitions             # (tag_size, tag_size)
            prev_alpha = alpha[t - 1].unsqueeze(1)  # (tag_size, 1)
            alpha[t] = torch.logsumexp(prev_alpha + trans + emit, dim=0)
        # 最终的分区函数是最后一个时间步的alpha的log值
        return torch.logsumexp(alpha[mask.sum()-1], dim=0)
    
    def decode(self, feats, mask):
        """
        使用Viterbi算法解码，返回最可能的标签序列。
        feats: (B, T, C)
        mask: (B, T)
        return: List of predicted tag sequences
        """
        B, T, C = feats.shape
        paths = []
        for i in range(B):
            length = mask[i].sum()
            score = self.start + feats[i, 0]  # (C,)
            history = []
            for t in range(1, length):
                emit = feats[i, t]  # (C,)
                next_score = []
                backpointer = []
                for curr in range(C):
                    trans_score = self.transitions[:, curr] + score  # (C,)
                    best_prev = torch.argmax(trans_score)
                    backpointer.append(best_prev.item())
                    next_score.append(trans_score[best_prev] + emit[curr])
                score = torch.stack(next_score)
                history.append(backpointer)
            # 终止 + 回溯
            score += self.end
            best_last = torch.argmax(score).item()
            best_path = [best_last]
            for back in reversed(history):
                best_last = back[best_last]
                best_path.append(best_last)
            best_path.reverse()
            paths.append(best_path)
        return paths

## 训练评估

In [93]:
def train_epoch(model, loader, opt):
    model.train()
    total_loss = 0
    for X, mask, Y in loader:
        opt.zero_grad()
        loss = model(X.to(device), mask.to(device), Y.to(device))
        loss.backward()
        opt.step()
        total_loss += loss.item()
    return total_loss / len(loader)

def evaluate(model, loader):
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for X, mask, Y in loader:
            preds = model(X.to(device), mask.to(device))   # 使用decode方法来获取预测标签
            for i, length in enumerate(mask.sum(1)):
                y_true += Y[i,:length].tolist()
                y_pred += preds[i][:length]
    p,r,f,_ = precision_recall_fscore_support(y_true, y_pred, average='micro', labels=list(label2idx.values()))
    return p, r, f

## 主函数

In [38]:
print(train_s[:3])  # 查看前3条数据
print(dev_s[:3])  # 查看前3条数据

[['EU', 'rejects', 'German', 'call', 'to', 'boycott', 'British', 'lamb', '.'], ['Peter', 'Blackburn'], ['BRUSSELS', '1996-08-22']]
[['SOCCER', '-', 'JAPAN', 'GET', 'LUCKY', 'WIN', ',', 'CHINA', 'IN', 'SURPRISE', 'DEFEAT', '.'], ['Nadim', 'Ladki'], ['AL-AIN', ',', 'United', 'Arab', 'Emirates', '1996-12-06']]


In [94]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_s, train_t = read_conll(r"D:\Data\master\Conll-2003\train.txt")
dev_s, dev_t = read_conll(r"D:\Data\master\Conll-2003\test.txt")
w2i = build_vocab(train_s + dev_s)
label2idx, idx2label = build_label_vocab([tag for seq in train_t+dev_t for tag in seq])
pad_len = max(map(len, train_s + dev_s))
train_ds = NERDataset(train_s, train_t, w2i, label2idx, pad_len)
dev_ds = NERDataset(dev_s, dev_t, w2i, label2idx, pad_len)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
dev_loader = DataLoader(dev_ds, batch_size=32)

model4 = BiLSTM_CRF(len(w2i), len(label2idx)).to(device)
optimizer = optim.Adam(model4.parameters(), lr=0.001)
for epoch in range(10):
    train_loss = train_epoch(model4, train_loader, optimizer)
    p, r, f = evaluate(model4, dev_loader)
    print(f"Epoch {epoch+1}: loss={train_loss:.4f}, P={p:.4f}, R={r:.4f}, F1={f:.4f}")

Epoch 1: loss=-57469.3476, P=0.0359, R=0.0684, F1=0.0471
Epoch 2: loss=-161807.7289, P=0.0359, R=0.0684, F1=0.0471
Epoch 3: loss=-262519.7292, P=0.0359, R=0.0684, F1=0.0471
Epoch 4: loss=-362364.0781, P=0.0359, R=0.0684, F1=0.0471
Epoch 5: loss=-461796.0836, P=0.0359, R=0.0684, F1=0.0471
Epoch 6: loss=-561025.6654, P=0.0359, R=0.0684, F1=0.0471
Epoch 7: loss=-660182.7294, P=0.0359, R=0.0684, F1=0.0471
Epoch 8: loss=-759252.2223, P=0.0359, R=0.0684, F1=0.0471
Epoch 9: loss=-858291.7232, P=0.0359, R=0.0684, F1=0.0471
Epoch 10: loss=-957208.0310, P=0.0359, R=0.0684, F1=0.0471


# 基于神经网络的语言模型
## 数据预处理

In [12]:
# 读取原始文本
with open("D:/Data/master/poetryFromTang.txt", 'r', encoding='utf-8') as f:
    text = f.read()
# 构建字符字典
chars = sorted(list(set(text)))
vocab_size = len(chars)
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for ch, i in stoi.items()}
# 定义编码/解码函数
def encode(s): return [stoi[c] for c in s]
def decode(t): return ''.join([itos[i] for i in t])
# 整体文本转为数字序列
data = torch.tensor(encode(text), dtype=torch.long)
# 设置模型训练的超参数
SEQ_LEN = 100
BATCH_SIZE = 64

## 自定义数据集
> 将长文本数据切分为多个小段，并用 DataLoader 批量加载，供语言模型进行训练
1. 语言模型是序列任务，需要“窗口式”切分
2. 神经网络是批量训练的
3. 可以更灵活控制训练样本数、shuffle、batch size 等

In [13]:
# 按序列切分文本数据
class CharDataset(Dataset):
    def __init__(self, data, seq_len):
        self.data = data
        self.seq_len = seq_len
    def __len__(self):
        return len(self.data) - self.seq_len
    def __getitem__(self, i):
        return (self.data[i:i+self.seq_len],
                self.data[i+1:i+self.seq_len+1])

dataset = CharDataset(data, SEQ_LEN)
loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

## 定义模型

In [14]:
class CharRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, model='lstm'):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        rnn_cls = nn.LSTM if model == 'lstm' else nn.GRU
        self.rnn = rnn_cls(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
    def forward(self, x, hidden=None):
        x = self.embedding(x)
        out, hidden = self.rnn(x, hidden)
        logits = self.fc(out)
        return logits, hidden

## 训练模型

In [15]:
def train(model, loader, optimizer, criterion, epochs=10, device='cpu'):
    model.train()
    model.to(device)
    for epoch in range(epochs):
        total_loss = 0
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            logits, _ = model(x)
            loss = criterion(logits.view(-1, vocab_size), y.view(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(loader)
        perplexity = np.exp(avg_loss)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}, Perplexity: {perplexity:.2f}")

'''生成文本'''
def generate(model, start_seq="春", length=100, device='cpu'):
    model.eval()
    input_seq = torch.tensor([stoi.get(ch, 1) for ch in start_seq], dtype=torch.long).unsqueeze(0).to(device)
    generated = list(start_seq)
    hidden = None
    for _ in range(length):
        logits, hidden = model(input_seq, hidden)
        probs = torch.softmax(logits[:, -1], dim=-1)
        next_id = torch.multinomial(probs, num_samples=1).item()
        generated.append(itos[next_id])
        input_seq = torch.tensor([[next_id]], dtype=torch.long).to(device)
    return ''.join(generated)

In [16]:
'''运行'''
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = CharRNN(vocab_size, model='lstm')  # 或 'gru'
optimizer = torch.optim.Adam(model.parameters(), lr=0.003)
criterion = nn.CrossEntropyLoss()
train(model, loader, optimizer, criterion, epochs=10, device=device)

print("\n🌸 生成诗句示例：")
print(generate(model, start_seq="春风", length=100, device=device))

Epoch 1, Loss: 3.5250, Perplexity: 33.95
Epoch 2, Loss: 0.4739, Perplexity: 1.61
Epoch 3, Loss: 0.1441, Perplexity: 1.15
Epoch 4, Loss: 0.0970, Perplexity: 1.10
Epoch 5, Loss: 0.0779, Perplexity: 1.08
Epoch 6, Loss: 0.0670, Perplexity: 1.07
Epoch 7, Loss: 0.0603, Perplexity: 1.06
Epoch 8, Loss: 0.0556, Perplexity: 1.06
Epoch 9, Loss: 0.0524, Perplexity: 1.05
Epoch 10, Loss: 0.0502, Perplexity: 1.05

🌸 生成诗句示例：
春风颇愁。惜哉瑶池饮，日晏昆仑丘。
黄鹄去不息，哀鸣何所投。君看随阳雁，各有稻粱谋。

平明跨驴出，未知适谁门。权门多噂eR，且复寻诸孙。
诸孙贫无事，宅舍如荒村。堂前自生竹，堂后自生萱。
萱草秋已死，竹
