In [10]:
from collections.abc import Iterator

import datasets
import numpy as np
import tiktoken
import torch
from torch import Tensor
from tqdm import tqdm

In this notebook we will build a model that determines text language - either German or English.

### Data

We will use the same dataset that we used for the neural translation exercise. It consists of 200k German-English sentence pairs. We will build a new dataset of the form sentence - label (EN or DE). New dataset will be of size 400k (because each example in the original dataset will correspond to 2 examples in the new dataset).

In [4]:
dataset = datasets.load_dataset("IWSLT/iwslt2017", "iwslt2017-de-en", trust_remote_code=True)

In [5]:
train_data_raw, valid_data_raw, test_data_raw = (
    dataset["train"],
    dataset["validation"],
    dataset["test"],
)

MAX_NUM_SAMPLES = None
if MAX_NUM_SAMPLES is not None:
    train_data_raw = train_data_raw.take(MAX_NUM_SAMPLES)

#### Tokenizer

In order to be able to work with text data, we need to be able to convert it into numeric form that neural networks can process. This is, of course, done by means of a tokenizer. I chose `tiktoken` tokenizer because it is multilingual and fast, but of course other tokenizers could do the job as well. One crucial requirement to the tokenizer is that it was train on a corpora that contained English and German sentences (and possibly other languages as well, such as `tiktoken`).

In [6]:
tokenizer = tiktoken.get_encoding("cl100k_base")

 We first tokenize all the data in our dataset before transofrming it into the target form.
 We will only take first 20 tokens as that should be more than enough to deduce a language of a sentence.
 Number 20 was picked arbitrary, maybe even lower number will do the job equally well.

In [7]:
def tokenize_example(example: dict[str, dict[str, str]],
                     tokenizer: tiktoken.core.Encoding,
                     max_length: int) -> dict[str, list[int]]:
    de_tokens = tokenizer.encode(example["translation"]["de"])[:max_length]
    en_tokens = tokenizer.encode(example["translation"]["en"])[:max_length]

    return {
        "de_tokens": de_tokens,
        "en_tokens": en_tokens,
    }

MAX_SEQ_LENGTH = 20  # we will be using first 20 tokens to classify a sentence as German or Englsih
fn_kwargs={"tokenizer": tokenizer, "max_length": MAX_SEQ_LENGTH}

train_data_raw = train_data_raw.map(tokenize_example, fn_kwargs=fn_kwargs)
valid_data_raw = valid_data_raw.map(tokenize_example, fn_kwargs=fn_kwargs)
test_data_raw = test_data_raw.map(tokenize_example, fn_kwargs=fn_kwargs)

In [11]:
def generate_dataset(dataset: datasets.Dataset,
                     tokenizer: tiktoken.core.Encoding,
                     max_length: int) -> Iterator[dict]:
    for example in dataset:
        de_tokens = tokenizer.encode(example["translation"]["de"])[:max_length]
        en_tokens = tokenizer.encode(example["translation"]["en"])[:max_length]

        yield {"tokens": de_tokens, "label": 0, "text": example["translation"]["de"]}
        yield {"tokens": en_tokens, "label": 1, "text": example["translation"]["en"]}

In [12]:
gen_kwargs={"tokenizer": tokenizer, "max_length": MAX_SEQ_LENGTH}

train_data = datasets.Dataset.from_generator(generate_dataset,
                                             gen_kwargs=gen_kwargs | {"dataset": train_data_raw})

valid_data = datasets.Dataset.from_generator(generate_dataset,
                                             gen_kwargs=gen_kwargs | {"dataset": valid_data_raw})

test_data = datasets.Dataset.from_generator(generate_dataset,
                                            gen_kwargs=gen_kwargs | {"dataset": test_data_raw})

train_data = train_data.with_format(type="torch",
                                    columns=["tokens", "label"],
                                    output_all_columns=True)

valid_data = valid_data.with_format(type="torch",
                                    columns=["tokens", "label"],
                                    output_all_columns=True)

test_data = test_data.with_format(type="torch",
                                  columns=["tokens", "label"],
                                  output_all_columns=True)

Generating train split: 0 examples [00:00, ? examples/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating train split: 0 examples [00:00, ? examples/s]

In [13]:
def collate_fn(batch: dict) -> dict[str, int]:
    token_ids = [example["tokens"] for example in batch]
    labels = [example["label"] for example in batch]

    # pad value 2 corresponds to symbol '#' in tiktoken
    # shape (max_length, batch_size)
    token_ids= torch.nn.utils.rnn.pad_sequence(token_ids, padding_value=2)

    # shape (batch_size, max_length)
    token_ids = token_ids.transpose(1, 0)

    # shape (batch_size, 1)
    labels = Tensor(labels).unsqueeze(1)

    return {"tokens": token_ids, "label": labels}

train_data_loader = torch.utils.data.DataLoader(dataset=train_data,
                                                batch_size=64,
                                                shuffle=True,
                                                collate_fn=collate_fn)

valid_data_loader = torch.utils.data.DataLoader(dataset=valid_data,
                                                batch_size=64,
                                                shuffle=True,
                                                collate_fn=collate_fn)

test_data_loader = torch.utils.data.DataLoader(dataset=test_data,
                                               batch_size=64,
                                               shuffle=True,
                                               collate_fn=collate_fn)

## Model

We will use a simple feed forward neural network for this task. We will have 3 layers:
- embedding layer to convert from token ids to a state space representation
- feed forward layer 1
- feed forward layer 2

In [14]:
class Model(torch.nn.Module):
    def __init__(self, embed_dim: int, hidden_dim: int, vocab_size: int):
        super().__init__()
        self.embedding = torch.nn.Embedding(vocab_size, embed_dim)
        self.fc_1 = torch.nn.Linear(embed_dim, hidden_dim)
        self.fc_2 = torch.nn.Linear(hidden_dim, 1)

    def forward(self, x: Tensor) -> Tensor:
        """
        Inputs
            x of shape (batch_size, seq_length)
        Outputs
            Tensor of shape (batch_size, 1) representing probabilities
        """
        # x.shape (batch_size, seq_length, embed_dim)
        x = self.embedding(x)

        # x.shape (batch_size, seq_length, hidden_dim)
        x = self.fc_1(x)
        x = x.relu()

        # x.shape (batch_size, hidden_dim)
        x = x.sum(dim=1)

        # x.shape (batch_size, 1)
        x = self.fc_2(x)
        x = x.sigmoid()

        return x

In [15]:
loss_function = torch.nn.BCELoss()

def run_batch(model: torch.nn.Module,
              loss_function: torch.nn.BCELoss,
              batch: dict[str, Tensor],
              device: torch.device) -> float:
    # src.shape (batch_size, seq_length)
    src = batch["tokens"].to(device)  # type: Tensor
    tgt = batch["label"].to(device)  # type: Tensor

    probs = model(src)  # type: Tensor

    loss = loss_function(probs, tgt)  # type: Tensor

    pred = torch.where(probs > 0.5, 1.0, 0.0)
    accuracy = (pred == tgt).float().mean()

    return loss, accuracy

def train_one_epoch(model: torch.nn.Module,
                    optimizer: torch.optim.Optimizer,
                    loss_function: torch.nn.BCELoss,
                    data_loader: torch.utils.data.DataLoader,
                    device: torch.device) -> float:
    model.train()

    losses = []
    correct = 0

    progress_bar = tqdm(data_loader)
    for i, batch in enumerate(progress_bar):
        optimizer.zero_grad()

        loss, accuracy = run_batch(model=model, loss_function=loss_function, batch=batch, device=device)
        losses.append(loss.item())

        loss.backward()

        optimizer.step()

        correct += accuracy

        if i % 100 == 0:
            running_loss = sum(losses) / len(losses)
            progress_bar.set_description(f"Running loss after {i} batches: {running_loss:.2f}, accuracy: {accuracy * 100:.2f}%")


    return sum(losses) / len(losses)

def evaluate_model(model: torch.nn.Module,
                    loss_function: torch.nn.BCELoss,
                    data_loader: torch.utils.data.DataLoader,
                    device: torch.device) -> tuple[float, float]:
    model.eval()

    loss, accuracy = 0, 0

    for batch in data_loader:
        loss_batch, accuracy_batch = run_batch(model=model,
                                               loss_function=loss_function,
                                               batch=batch,
                                               device=device)

        loss += loss_batch
        accuracy += accuracy_batch

    return loss / len(data_loader), accuracy / len(data_loader)

def classify_sentence(model: Model,
                      sentence: str,
                      tokenizer: tiktoken.core.Encoding,
                      max_length: int,
                      device: torch.device) -> None:
    model.eval()

    tokens = tokenizer.encode(sentence)[:max_length]

    tokens = np.asarray(tokens)
    tokens = torch.from_numpy(tokens).unsqueeze(0).to(device)

    pred = model(tokens)

    return pred.item()

In [28]:
device = "cpu"

model = Model(embed_dim=128, hidden_dim=512, vocab_size=tokenizer.max_token_value).to(device)

def count_parameters(model: torch.nn.Module) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Model has {count_parameters(model):,} parameters")

Model has 12,901,889 parameters


In [29]:
# check untrained model prediction
# the value represents probability of an english sentence
classify_sentence(model=model,
                  sentence="hello world, how is it going?",
                  tokenizer=tokenizer,
                  max_length=MAX_SEQ_LENGTH,
                  device=device)

0.18996331095695496

In [30]:
optimizer = torch.optim.Adam(model.parameters())

num_epochs = 1
losses = []

# training on M2 chip takes around 2 minutes 20 seconds (1 epoch)
for epoch in range(num_epochs):
    loss_epoch = train_one_epoch(model=model,
                                 optimizer=optimizer,
                                 loss_function=loss_function,
                                 data_loader=train_data_loader,
                                 device=device)

    valid_loss, valid_accuracy = evaluate_model(model=model,
                                                loss_function=loss_function,
                                                data_loader=valid_data_loader,
                                                device=device)

    losses.append(loss_epoch)

    print(f"Finished epoch {epoch + 1} - validation loss {valid_loss:.2f}, validation accuracy {valid_accuracy:.2f}")

Running loss after 6400 batches: 0.02, accuracy: 100.00%: 100%|██████████| 6441/6441 [02:21<00:00, 45.57it/s]


Finished epoch 1 - validation loss 0.00, validation accuracy 1.00


In [31]:
test_loss, test_accuracy = evaluate_model(model=model,
                                          loss_function=loss_function,
                                          data_loader=test_data_loader,
                                          device=device)

print(f"Test loss {test_loss:.2f}, test accuracy {test_accuracy:.2f}")

Test loss 0.01, test accuracy 1.00


In [32]:
classify_sentence(model=model,
                  sentence="hallo welt, wie gehts es?",
                  tokenizer=tokenizer,
                  max_length=MAX_SEQ_LENGTH,
                  device=device)

1.3911486007600615e-07

In [33]:
classify_sentence(model=model,
                  sentence="hello world, how is it going?",
                  tokenizer=tokenizer,
                  max_length=MAX_SEQ_LENGTH,
                  device=device)

1.0

In [34]:
classify_sentence(model=model,
                  sentence="hello world",
                  tokenizer=tokenizer,
                  max_length=MAX_SEQ_LENGTH,
                  device=device)

0.7549492716789246