In [28]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
from tqdm import tqdm
import pickle

In [29]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [30]:
# seed for reproducability
SEED = 122
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

### Load Data

In [31]:
from datasets import Dataset

harry_potter_dataset = "./data/Harry_Potter_Books.txt"

# Read the data from the file
with open(harry_potter_dataset, "r", encoding="utf-8") as f:
    data = f.read()

# Creating list of dictionaries
data = data.split(" .")
data = [{"text": row} for row in data]

# Creating dataset object
dataset = Dataset.from_list(data)
dataset

Dataset({
    features: ['text'],
    num_rows: 67785
})

### Dataset Splitting

The dataset is divided into training, validation, and test sets using the following strategy:

80% Training

10% Validation

10% Test

In [32]:
from datasets import DatasetDict

train_test = dataset.train_test_split(test_size=0.2)

# 10% test set and 10% validation set
train_test_valid = train_test['test'].train_test_split(test_size=0.5)

dataset = DatasetDict({
    'train': train_test['train'],
    'test': train_test_valid['test'],
    'validation': train_test_valid['train']})

dataset

DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 54228
    })
    test: Dataset({
        features: ['text'],
        num_rows: 6779
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 6778
    })
})

## Preprocessing

### Tokenization

Tokenization is performed using TorchText’s basic_english tokenizer. The original text column is removed and replaced with a list of tokens.

In [33]:
from torchtext.data.utils import get_tokenizer

tokenizer = get_tokenizer('basic_english')

#function to tokenize
tokenize_data = lambda example, tokenizer: {'tokens': tokenizer(example['text'])}  

#map the function to each example
tokenized_dataset = dataset.map(tokenize_data, remove_columns=['text'], fn_kwargs={'tokenizer': tokenizer})
print(tokenized_dataset['train'][33]['tokens'])


OSError: [WinError 127] 找不到指定的程序。

In [None]:
from torchtext.vocab import build_vocab_from_iterator
## numericalizing
vocab = build_vocab_from_iterator(tokenized_dataset['train']['tokens'], 
min_freq=3) 
vocab.insert_token('<unk>', 0)           
vocab.insert_token('<eos>', 1)            
vocab.set_default_index(vocab['<unk>'])   
print(len(vocab))                         
print(vocab.get_itos()[:10]) 



KeyError: 'tokens'

### Vocabulary Construction

A vocabulary is built using only the training set to avoid data leakage. Words appearing fewer than three times are discarded to control vocabulary size.

Two special tokens are added:

<unk>: unknown words

<eos>: end of sentence

In [12]:
from collections import Counter

counter = Counter()

for example in dataset:
    counter.update(example["tokens"])


# 过滤低频词
min_freq = 3
vocab_list = ["<unk>", "<eos>"] + [
    word for word, freq in counter.items() if freq >= min_freq
]

word2idx = {word: idx for idx, word in enumerate(vocab_list)}
idx2word = {idx: word for word, idx in word2idx.items()}

vocab_size = len(word2idx)
vocab_size


TypeError: string indices must be integers

The vocabulary is saved for later inference:

In [None]:
import os, pickle

os.makedirs("model", exist_ok=True)

with open("model/vocab.pkl", "wb") as f:
    pickle.dump(word2idx, f)


### Preparing Training Batches
The dataset is converted into a continuous stream of token indices. Each sentence is appended with <eos> to allow the model to learn sentence termination.

In [None]:
def get_data(dataset, vocab, batch_size):
    data = []

    for example in dataset:
        tokens = example["tokens"] + ["<eos>"]
        tokens = [vocab.get(token, vocab["<unk>"]) for token in tokens]
        data.extend(tokens)

    data = torch.LongTensor(data)

    num_batches = data.size(0) // batch_size
    data = data[:num_batches * batch_size]
    data = data.view(num_batches, batch_size)

    return data


In [None]:
batch_size = 128
train_data = get_data(tokenized_dataset['train'], word2idx, batch_size)
valid_data = get_data(tokenized_dataset['validation'], word2idx, batch_size)
test_data  = get_data(tokenized_dataset['test'], word2idx, batch_size)

### Model Architecture

### LSTM Language Model

The language model consists of:

Embedding layer

Multi-layer LSTM

Dropout for regularization

Fully connected output layer

In [None]:
class LSTMLanguageModel(nn.Module):
    def __init__(self, vocab_size, emb_dim, hid_dim, num_layers, dropout_rate):
                
        super().__init__()
        self.num_layers = num_layers
        self.hid_dim = hid_dim
        self.emb_dim = emb_dim

        self.embedding = nn.Embedding(vocab_size, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hid_dim, num_layers=num_layers, 
                    dropout=dropout_rate, batch_first=True)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hid_dim, vocab_size)
        
        self.init_weights()
        
    def init_weights(self):
        init_range_emb = 0.1
        init_range_other = 1/math.sqrt(self.hid_dim)
        self.embedding.weight.data.uniform_(-init_range_emb, init_range_emb)
        self.fc.weight.data.uniform_(-init_range_other, init_range_other)
        self.fc.bias.data.zero_()
        for i in range(self.num_layers):
            self.lstm.all_weights[i][0] = torch.FloatTensor(self.emb_dim,
                    self.hid_dim).uniform_(-init_range_other, init_range_other) 
            self.lstm.all_weights[i][1] = torch.FloatTensor(self.hid_dim, 
                    self.hid_dim).uniform_(-init_range_other, init_range_other) 

    def init_hidden(self, batch_size, device):
        hidden = torch.zeros(self.num_layers, batch_size, self.hid_dim).to(device)
        cell   = torch.zeros(self.num_layers, batch_size, self.hid_dim).to(device)
        return hidden, cell
    
    def detach_hidden(self, hidden):
        hidden, cell = hidden
        hidden = hidden.detach()
        cell = cell.detach()
        return hidden, cell

    def forward(self, src, hidden):
        #src: [batch size, seq len]
        embedding = self.dropout(self.embedding(src))
        #embedding: [batch size, seq len, emb_dim]
        output, hidden = self.lstm(embedding, hidden)      
        #output: [batch size, seq len, hid_dim]
        #hidden = h, c = [num_layers * direction, seq len, hid_dim)
        output = self.dropout(output) 
        prediction = self.fc(output)
        #prediction: [batch size, seq_len, vocab size]
        return prediction, hidden

## Training Procedure

In [None]:
vocab_size = len(word2idx)
emb_dim = 1024                # 400 in the paper
hid_dim = 1024                # 1150 in the paper
num_layers = 2                # 3 in the paper
dropout_rate = 0.65              
lr = 1e-3  

In [None]:
model = LSTMLanguageModel(vocab_size, emb_dim, hid_dim, num_layers, dropout_rate).to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {num_params:,} trainable parameters')

The model has 39,428,903 trainable parameters


In [None]:
def get_batch(data, seq_len, idx):
    #data #[batch size, bunch of tokens]
    src    = data[:, idx:idx+seq_len]                   
    target = data[:, idx+1:idx+seq_len+1]  #target simply is ahead of src by 1            
    return src, target

In [None]:
def train(model, data, optimizer, criterion, batch_size, seq_len, clip, device):

    epoch_loss = 0
    model.train()

    num_tokens = data.shape[1]
    data = data[:, :num_tokens - (num_tokens - 1) % seq_len]
    num_tokens = data.shape[1]

    real_batch_size = data.shape[0]   
    hidden = model.init_hidden(real_batch_size, device)

    for idx in tqdm(range(0, num_tokens - 1, seq_len), leave=False):

        optimizer.zero_grad()
        hidden = model.detach_hidden(hidden)

        src, target = get_batch(data, seq_len, idx)
        src, target = src.to(device), target.to(device)

        output, hidden = model(src, hidden)
        # output: [batch, seq, vocab]

        output = output.reshape(-1, output.size(-1)) 
        target = target.reshape(-1)

        loss = criterion(output, target)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item() * seq_len

    return epoch_loss / num_tokens


In [None]:
def evaluate(model, data, criterion, batch_size, seq_len, device):

    epoch_loss = 0
    model.eval()

    num_tokens = data.shape[1]
    data = data[:, :num_tokens - (num_tokens - 1) % seq_len]
    num_tokens = data.shape[1]

    real_batch_size = data.shape[0]
    hidden = model.init_hidden(real_batch_size, device)

    with torch.no_grad():
        for idx in range(0, num_tokens - 1, seq_len):

            hidden = model.detach_hidden(hidden)
            src, target = get_batch(data, seq_len, idx)
            src, target = src.to(device), target.to(device)

            output, hidden = model(src, hidden)
            output = output.reshape(-1, output.size(-1))
            target = target.reshape(-1)

            loss = criterion(output, target)
            epoch_loss += loss.item() * seq_len

    return epoch_loss / num_tokens


In [None]:
import math

n_epochs = 20
seq_len  = 30
clip     = 0.25

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, factor=0.5, patience=0
)

best_valid_loss = float("inf")

for epoch in range(1, n_epochs + 1):

    train_loss = train(
        model, train_data, optimizer, criterion,
        batch_size, seq_len, clip, device
    )

    valid_loss = evaluate(
        model, valid_data, criterion,
        batch_size, seq_len, device
    )

    scheduler.step(valid_loss)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), "model/best-val-lstm_lm.pt")

    train_ppl = math.exp(train_loss) if train_loss < 20 else float("inf")
    valid_ppl = math.exp(valid_loss) if valid_loss < 20 else float("inf")

    lr = optimizer.param_groups[0]["lr"]

    print(f"Epoch {epoch:02d}")
    print(f"\tLR              : {lr:.6f}")
    print(f"\tTrain Perplexity: {train_ppl:.3f}")
    print(f"\tValid Perplexity: {valid_ppl:.3f}")


: 

### Testing

In [None]:
# Load the best model state from the saved checkpoint
model.load_state_dict(torch.load('model/best-val-lstm_lm.pt',  map_location=device))

# Evaluate the model on the test data
test_loss = evaluate(model, test_data, criterion, batch_size, seq_len, device)

# Print the test perplexity
print(f'Test Perplexity: {math.exp(test_loss):.3f}')