<a href="https://colab.research.google.com/github/koseii2122/LSTM-POS-Torch/blob/main/POS_Torch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [None]:
class POSDataset(Dataset):
  def __init__(self, filepath):
    self.data = pd.read_csv(filepath, encoding='ISO-8859-1').fillna(method='ffill')
    self.words = list(set(self.data['Word'].values))
    self.POS = list(set(self.data['POS'].values))

    self.word2idx = {w: i + 1 for i, w in enumerate(self.words)}
    self.pos2idx = {t: i for i, t in enumerate(self.POS)}

    self.sentences = self._get_sentences()

  def __len__(self):
    return len(self.sentences)

  def __getitem__(self, index):
    sentence = self.sentences[index]
    words = [word[0] for word in sentence]
    pos = [word[1] for word in sentence]
    x = [self.word2idx[w] for w in words]
    y = [self.pos2idx[w] for w in pos]
    return torch.tensor(x), torch.tensor(y)

  def _get_sentences(self):
    # Group the data by sentence number
    grouped = self.data.groupby("Sentence #")
    sentences = []
    for _, group in grouped:
      words = group["Word"].values.tolist()
      pos = group["POS"].values.tolist()
      sentence = list(zip(words, pos))
      sentences.append(sentence)
    return sentences

In [None]:
# Define a simple LSTM-based model
class POSModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)
        output, _ = self.lstm(embedded)
        output = self.fc(output)
        return output

In [None]:
# Define hyperparameters and train the model
BATCH_SIZE = 32
EMBEDDING_DIM = 128
HIDDEN_DIM = 98
LEARNING_RATE = 0.003
EPOCHS = 4

dataset = POSDataset("/content/ner_datasetreference.csv")

In [None]:
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

In [None]:
def collate_fn(batch):
    x = [item[0] for item in batch]
    y = [item[1] for item in batch]
    x_lengths = [len(seq) for seq in x]
    y_lengths = [len(seq) for seq in y]

    # Pad the sequences to the same length
    x = nn.utils.rnn.pad_sequence(x, batch_first=True)
    y = nn.utils.rnn.pad_sequence(y, batch_first=True)

    # Create a mask to ignore padding values in the loss calculation
    x_mask = torch.arange(x.size(1))[None, :] < torch.tensor(x_lengths)[:, None]
    y_mask = torch.arange(y.size(1))[None, :] < torch.tensor(y_lengths)[:, None]

    return x, y, x_mask, y_mask

In [None]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,collate_fn = collate_fn)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

In [None]:
for batch in train_loader:
    print(batch[0],batch[1])
    break


tensor([[17717, 10051, 31399,  ...,     0,     0,     0],
        [13265, 22800, 33857,  ...,     0,     0,     0],
        [21704, 23101, 10297,  ...,     0,     0,     0],
        ...,
        [15891, 32151, 10051,  ...,     0,     0,     0],
        [34058, 19381, 12406,  ...,     0,     0,     0],
        [ 1876, 18392,  9319,  ...,     0,     0,     0]]) tensor([[34, 28,  8,  ...,  0,  0,  0],
        [37, 37, 35,  ...,  0,  0,  0],
        [30, 21, 17,  ...,  0,  0,  0],
        ...,
        [ 7,  7, 28,  ...,  0,  0,  0],
        [37, 35, 20,  ...,  0,  0,  0],
        [13, 20,  7,  ...,  0,  0,  0]])


In [None]:
model = POSModel(len(dataset.words) + 1, EMBEDDING_DIM, HIDDEN_DIM, len(dataset.POS))

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [None]:
for epoch in range(EPOCHS):
    train_loss = 0.0
    val_loss = 0.0
    model.train()
    for batch in train_loader:
        x, y, x_mask, y_mask = batch
        optimizer.zero_grad()
        output = model(x)
        loss = criterion(output.view(-1, len(dataset.POS)), y.view(-1))
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * x.size(0)

    train_loss /= len(train_dataset)
    model.eval()
    with torch.no_grad():
        for batch in val_loader:
            x, y, x_mask, y_mask = batch
            output = model(x)
            loss = criterion(output.view(-1, len(dataset.POS)), y.view(-1))
            val_loss += loss.item() * x.size(0)
        val_loss /= len(val_dataset)

    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

Epoch 1, Train Loss: 0.2493, Val Loss: 0.0894
Epoch 2, Train Loss: 0.0600, Val Loss: 0.0668
Epoch 3, Train Loss: 0.0358, Val Loss: 0.0647
Epoch 4, Train Loss: 0.0254, Val Loss: 0.0671


In [None]:
text = "Such a stependous player."

# Split the text into words
words = text.split()

# Convert words to numerical values using the word2idx dictionary
x = [dataset.word2idx.get(word, 0) for word in words]

# Convert the numerical values to a tensor and add a batch dimension
x = torch.tensor(x).unsqueeze(0)

# Pass the tensor through the model to get the predicted tags
model.eval()
with torch.no_grad():
    output = model(x)
    _, predicted_tags = torch.max(output, dim=2)

# Convert the predicted tags back to their corresponding tag labels using the idx2tag dictionary
predicted_tags = predicted_tags.squeeze().tolist()
predicted_labels = [dataset.POS[idx] for idx in predicted_tags]
print(predicted_labels)

['JJ', 'DT', 'JJR', 'JJR']
