In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
#from nltk.corpus import stopwords
#from collections import Counter
#import string
#import re
from torch.utils.data import DataLoader
from datasets import Dataset, load_dataset, load_dataset_builder

In [2]:
# set the GPU device for M4 macbook air
import torch
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

tensor([1.], device='mps:0')


In [3]:
# inspect the dataset
ag_news_builder = load_dataset_builder("wangrongsheng/ag_news")
print(ag_news_builder.info.splits)
print(ag_news_builder.info.features)

{'train': SplitInfo(name='train', num_bytes=29832303, num_examples=120000, shard_lengths=None, dataset_name='ag_news'), 'test': SplitInfo(name='test', num_bytes=1880424, num_examples=7600, shard_lengths=None, dataset_name='ag_news')}
{'text': Value('string'), 'label': ClassLabel(names=['World', 'Sports', 'Business', 'Sci/Tech'])}


In [4]:
# Load the dataset into GPU
ag_news_train = load_dataset("wangrongsheng/ag_news", split="train").with_format('torch', device=mps_device)
#ag_news_train = load_dataset("wangrongsheng/ag_news", split="train")
ag_news_test = load_dataset("wangrongsheng/ag_news", split="test").with_format('torch', device=mps_device)
#ag_news_test = load_dataset("wangrongsheng/ag_news", split="test")

In [5]:
ag_news_train[0]

{'text': "Wall St. Bears Claw Back Into the Black (Reuters) Reuters - Short-sellers, Wall Street's dwindling\\band of ultra-cynics, are seeing green again.",
 'label': tensor(2)}

In [6]:
from transformers import AutoTokenizer
from transformers import DataCollatorWithPadding

tokenizer = AutoTokenizer.from_pretrained("distilbert/distilbert-base-uncased")
def preprocess_function(examples):
    return tokenizer(examples["text"], truncation=True)

# Apply the tokenizer to the entire dataset
tokenized_train = ag_news_train.map(preprocess_function)
tokenized_test = ag_news_test.map(preprocess_function)

# Example for PyTorch DataLoader (requires specifying the format first)
tokenized_train.set_format("torch", columns=['input_ids', 'label'])
tokenized_test.set_format("torch", columns=['input_ids', 'label'])

# Initialize the data collator
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [7]:
tokenized_train.format

{'type': 'torch',
 'format_kwargs': {},
 'columns': ['input_ids', 'label'],
 'output_all_columns': False}

In [8]:
from torch.nn.utils.rnn import pad_sequence

# Load the datasets into dataloaders
BATCH_SIZE = 128

train_loader = DataLoader(tokenized_train,
                          shuffle=True,
                          batch_size=BATCH_SIZE,
                          collate_fn=data_collator # The key step for padding
                         )

test_loader = DataLoader(tokenized_test,
                          shuffle=True,
                          batch_size=BATCH_SIZE,
                          collate_fn=data_collator # The key step for padding
                         )

In [9]:
for i in train_loader:
    print(i['input_ids'].size())
    break

torch.Size([128, 130])


In [10]:
def get_vocab_size_simple(dataset, text_column="text"):
    """Get vocabulary size using simple tokenization"""
    vocab = set()
    
    for example in dataset:
        words = example[text_column].lower().split()
        vocab.update(words)
    
    return len(vocab)

In [11]:
get_vocab_size_simple(ag_news_train)

158733

In [12]:
from transformers import AutoTokenizer
# Load a pre-trained tokenizer (e.g., for 'bert-base-uncased')
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
tokenizer.vocab_size

30522

In [13]:
def get_vocab_size_custom_tokenizer(dataset, text_column="text", target_vocab_size=10000):
    """Train custom tokenizer and return actual vocabulary size"""
    from tokenizers import Tokenizer
    from tokenizers.models import WordLevel
    from tokenizers.trainers import WordLevelTrainer
    from tokenizers.pre_tokenizers import Whitespace
    
    tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
    tokenizer.pre_tokenizer = Whitespace()
    
    trainer = WordLevelTrainer(
        vocab_size=target_vocab_size,
        special_tokens=["[UNK]", "[PAD]", "[CLS]", "[SEP]"]
    )
    
    def batch_iterator():
        for example in dataset:
            yield example[text_column]
    
    tokenizer.train_from_iterator(batch_iterator(), trainer=trainer)
    
    return tokenizer.get_vocab_size()
get_vocab_size_custom_tokenizer(ag_news_train)

10000

In [14]:
VOCAB_SIZE = tokenizer.vocab_size
EMBED_DIM = 100
HIDDEN_DIM = 64
NUM_OUTPUTS = ag_news_train.features['label'].num_classes
NUM_EPOCHS = 10

### Simple Word Embedding Model
First, let's try out the Simple Word Embedding Model (SWEM) that we built in Notebook 4A on the AG News dataset. Unlike before though, instead of loading pre-trained embeddings, let's learn the embeddings from scratch. Before we begin, it will be helpful to define a few more hyperparameters.

Once again, we're going to organize our model as a `nn.Module`.
Instead of assuming the input is already an embedding, we're going to make learning the embedding as part of our model.
We do this by using `nn.Embedding` to perform an embedding look-up at the beginning of our forward pass.
Once we've done the look up, we'll have a minibatch of embedded sequences of dimension $L \times$ `BATCH_SIZE` $\times$ `EMBED_DIM`.
For SWEM, remember, we take the mean&ast; across the length dimension to get an average embedding for the sequence.

<font size="1"> 
&ast;Note: Technically we should only take the mean across the embeddings at the positions corresponding to "real" words in our input, and not for the zero paddings we artificially added.
This can be done by generating a binary mask while doing the padding to track the "real" words in the input.
Ultimately though, this refinement doesn't have much impact on the results for this particular task, so we omit it for simplicity.
</font>

In [15]:
import torch.nn as nn
import torch.nn.functional as F

class SWEM(nn.Module):
    def __init__(self, vocab_size, embedding_size, hidden_dim, num_outputs):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_size)
        
        self.fc1 = nn.Linear(embedding_size, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, num_outputs)

    def forward(self, x):
        embed = self.embedding(x)
        #print(f'embed: {embed.size()}')
        embed_mean = torch.mean(embed, dim=1)
        #print(f'embed_mean: {embed_mean.size()}')
        h = self.fc1(embed_mean)
        #print(f'fc1: {h.size()}')
        h = F.relu(h)
        h = self.fc2(h)
        #print(f'fc2: {h.size()}')
        return h

In [16]:
# instantiate, train, and evaluate
## Training
# Instantiate model
model = SWEM(VOCAB_SIZE, EMBED_DIM, HIDDEN_DIM, NUM_OUTPUTS)
model.to(mps_device)

# Binary cross-entropy (BCE) Loss and Adam Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Iterate through train set minibatchs 
for epoch in range(NUM_EPOCHS):
    correct = 0
    num_examples = 0
    for data in train_loader:
        # Zero out the gradients
        optimizer.zero_grad()
        
        # Forward pass
        inputs = data['input_ids'].to(device='mps')
        labels = data['labels'].to(device='mps')

        y = model(inputs)
        loss = criterion(y, labels)
        
        # Backward pass
        loss.backward()
        optimizer.step()
        
        predictions = torch.argmax(y, dim=1)
        correct += torch.sum((predictions == labels).float())
        num_examples += len(inputs)
    
    # Print training progress
    if epoch % 2 == 0:
        acc = correct/num_examples
        print("Epoch: {0} \t Train Loss: {1} \t Train Acc: {2}".format(epoch, loss, acc))

## Testing
correct = 0
num_test = 0

with torch.no_grad():
    # Iterate through test set minibatchs 
    for data in test_loader:
        # Forward pass
        inputs = data['input_ids'].to(device='mps')
        labels = data['labels'].to(device='mps')
        y = model(inputs)
        predictions = torch.argmax(y, dim=1)
        correct += torch.sum((predictions == labels).float())
        num_test += len(inputs)
    
print('Test accuracy: {}'.format(correct/num_test))

Epoch: 0 	 Train Loss: 0.46851399540901184 	 Train Acc: 0.697516679763794
Epoch: 2 	 Train Loss: 0.4480689764022827 	 Train Acc: 0.9099000096321106
Epoch: 4 	 Train Loss: 0.1308743953704834 	 Train Acc: 0.9268666505813599
Epoch: 6 	 Train Loss: 0.18553726375102997 	 Train Acc: 0.9365083575248718
Epoch: 8 	 Train Loss: 0.11664336919784546 	 Train Acc: 0.9469833374023438
Test accuracy: 0.9127631783485413


In [18]:
# Use the trained model to classify a text string
string = "The broadcaster's board agreed the decision at a meeting on Wednesday, hours before the deadline for countries to confirm whether they will join what's supposed to be a celebratory 70th anniversary edition of the song contest next May."
def predict_text(text, model, tokenizer, class_names):
      """
      Predict the class of a given text string
      
      Args:
          text: Input string to classify
          model: Trained SWEM model
          tokenizer: The tokenizer used during training
          class_names: List of class names (e.g., ['World', 'Sports', 'Business', 'Sci/Tech'])
      
      Returns:
          predicted_class: Integer class index
          predicted_label: String class name
          probabilities: Probability distribution over classes
      """
      # Tokenize
      tokenized = tokenizer(text, truncation=True, return_tensors='pt')
      input_ids = tokenized['input_ids']

      # If using GPU, move to device
      input_ids = input_ids.to(device='mps')

      # Predict
      model.eval()
      with torch.no_grad():
          logits = model(input_ids)
          predicted_class = torch.argmax(logits, dim=1).item()
          probabilities = torch.softmax(logits, dim=1)

      predicted_label = class_names[predicted_class]

      return predicted_class, predicted_label, probabilities[0]

# Usage
class_names = ['World', 'Sports', 'Business', 'Sci/Tech']
pred_class, pred_label, probs = predict_text(string, model, tokenizer,
class_names)

print(f"Prediction: {pred_label} (class {pred_class})")
print(f"\nConfidence scores:")
for i, name in enumerate(class_names):
  print(f"  {name}: {probs[i].item():.2%}")

Prediction: Sci/Tech (class 3)

Confidence scores:
  World: 33.56%
  Sports: 7.92%
  Business: 16.52%
  Sci/Tech: 42.00%
