<a href="https://colab.research.google.com/github/mirellagadelha/needlistproject/blob/master/mba_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch

# If there's a GPU available...
if torch.cuda.is_available():

    # Tell PyTorch to use the GPU.
    device = torch.device("cuda")

    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))

else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")


import pandas as pd
from transformers import BertTokenizer
import torch

df = pd.read_csv("news-articles-with-type-short.csv", delimiter=';', skiprows=1, names=['content', 'type'])

contents = df.content.values
types = df.type.values

# Load the BERT tokenizer.
print('Loading BERT tokenizer...')
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)

There are 1 GPU(s) available.
We will use the GPU: NVIDIA A100-SXM4-40GB
Loading BERT tokenizer...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
from torch import Tensor
from torch.utils.data import Dataset

from typing import Any, Optional, Union

from transformers import BatchEncoding

def add_special_tokens_at_beginning_and_end(input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None:
    """
    Adds special CLS token (token id = 101) at the beginning.
    Adds SEP token (token id = 102) at the end of each chunk.
    Adds corresponding attention masks equal to 1 (attention mask is boolean).
    """
    for i in range(len(input_id_chunks)):
        # adding CLS (token id 101) and SEP (token id 102) tokens
        input_id_chunks[i] = torch.cat([Tensor([101]), input_id_chunks[i], Tensor([102])])
        # adding attention masks  corresponding to special tokens
        mask_chunks[i] = torch.cat([Tensor([1]), mask_chunks[i], Tensor([1])])

def add_special_tokens_at_beginning_and_end(input_id_chunks, mask_chunks):
    """Add special tokens ([CLS] at the beginning, [SEP] at the end) for BERT input."""
    for i in range(len(input_id_chunks)):
        # Convert input chunk and mask chunk to tensors if they are not already
        input_chunk_tensor = torch.tensor(input_id_chunks[i])
        mask_chunk_tensor = torch.tensor(mask_chunks[i])

        # Add special tokens
        input_id_chunks[i] = torch.cat([torch.tensor([101]), input_chunk_tensor, torch.tensor([102])])  # [CLS] and [SEP]
        mask_chunks[i] = torch.cat([torch.tensor([1]), mask_chunk_tensor, torch.tensor([1])])  # Attention mask for [CLS] and [SEP]

def add_padding_tokens(input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None:
    """Adds padding tokens (token id = 0) at the end to make sure that all chunks have exactly 512 tokens."""
    for i in range(len(input_id_chunks)):
        # get required padding length
        pad_len = 512 - input_id_chunks[i].shape[0]
        # check if tensor length satisfies required chunk size
        if pad_len > 0:
            # if padding length is more than 0, we must add padding
            input_id_chunks[i] = torch.cat([input_id_chunks[i], Tensor([0] * pad_len)])
            mask_chunks[i] = torch.cat([mask_chunks[i], Tensor([0] * pad_len)])

def tokenize_text_with_truncation(text: str, tokenizer, maximal_text_length: int):
    """Tokenizes text and truncates to the specified maximal length."""
    tokens = tokenizer.encode(text, add_special_tokens=True)
    return tokens[:maximal_text_length]

def tokenize_whole_text(text: str, tokenizer):
    """Tokenizes the entire text without truncation."""
    return tokenizer.encode(text, add_special_tokens=True)

def stack_tokens_from_all_chunks(input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> tuple[Tensor, Tensor]:
    """Reshapes data to a form compatible with BERT model input."""
    input_ids = torch.stack(input_id_chunks, dim=0)
    attention_mask = torch.stack(mask_chunks, dim=0)

    return input_ids.long(), attention_mask.int()

def split_tokens_into_smaller_chunks(tokens, chunk_size: int, stride: int, minimal_chunk_length: int):
    """Splits tokens into smaller chunks."""
    input_id_chunks = []
    mask_chunks = []

    # Ensure we process the tokens in chunks with overlapping stride
    for i in range(0, len(tokens), stride):
        chunk = tokens[i:i + chunk_size]

        # Only include chunks that are large enough
        if len(chunk) >= minimal_chunk_length:
            input_id_chunks.append(chunk)
            mask_chunks.append([1] * len(chunk))  # Simple mask with 1's (can be adjusted based on padding)

        if len(chunk) < chunk_size:
            break

    return input_id_chunks, mask_chunks

In [None]:
def transform_single_text(
    text: str,
    tokenizer: tokenizer,
    chunk_size: int,
    stride: int,
    minimal_chunk_length: int,
    maximal_text_length: Optional[int],
) -> tuple[Tensor, Tensor]:
    """Transforms (the entire) text to model input of BERT model."""
    if maximal_text_length:
        tokens = tokenize_text_with_truncation(text, tokenizer, maximal_text_length)
    else:
        tokens = tokenize_whole_text(text, tokenizer)
    input_id_chunks, mask_chunks = split_tokens_into_smaller_chunks(tokens, chunk_size, stride, minimal_chunk_length)
    add_special_tokens_at_beginning_and_end(input_id_chunks, mask_chunks)
    add_padding_tokens(input_id_chunks, mask_chunks)
    input_ids, attention_mask = stack_tokens_from_all_chunks(input_id_chunks, mask_chunks)
    return input_ids, attention_mask

In [None]:
from transformers import BatchEncoding

def transform_list_of_texts(
    texts: list[str],
    tokenizer: tokenizer,
    chunk_size: int,
    stride: int,
    minimal_chunk_length: int,
    maximal_text_length: Optional[int] = None,
) -> BatchEncoding:
    model_inputs = [
        transform_single_text(text, tokenizer, chunk_size, stride, minimal_chunk_length, maximal_text_length)
        for text in texts
    ]
    input_ids = [model_input[0] for model_input in model_inputs]
    attention_mask = [model_input[1] for model_input in model_inputs]
    tokens = {"input_ids": input_ids, "attention_mask": attention_mask}
    return BatchEncoding(tokens)

In [None]:
class TokenizedDataset(Dataset):
    """Dataset for tokens with optional labels."""

    def __init__(self, tokens: BatchEncoding, labels: Optional[list] = None):
        self.input_ids = tokens["input_ids"]
        self.attention_mask = tokens["attention_mask"]
        self.labels = labels

    def __len__(self) -> int:
        return len(self.input_ids)

    def __getitem__(self, idx: int) -> Union[tuple[Tensor, Tensor, Any], tuple[Tensor, Tensor]]:
        if self.labels is not None and len(self.labels) > 0:
            return self.input_ids[idx], self.attention_mask[idx], self.labels[idx]
        return self.input_ids[idx], self.attention_mask[idx]

In [None]:
tokens = transform_list_of_texts(contents, tokenizer, 510, 510, 1, None)

Token indices sequence length is longer than the specified maximum sequence length for this model (723 > 512). Running this sequence through the model will result in indexing errors


In [None]:
dataset = TokenizedDataset(tokens, types)

In [None]:
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size

In [None]:
print('{:>5,} training samples'.format(train_size))
print('{:>5,} validation samples'.format(val_size))

  449 training samples
   50 validation samples


In [None]:
from torch import Tensor

def collate_fn_pooled_tokens(data):
  input_ids = [data[i][0] for i in range(len(data))]
  attention_mask = [data[i][1] for i in range(len(data))]
  if len(data[0]) == 2:
      collated = [input_ids, attention_mask]
  else:
      labels = Tensor([data[i][2] for i in range(len(data))])
      collated = [input_ids, attention_mask, labels]
  return collated

In [None]:
from torch.utils.data import DataLoader, Dataset, RandomSampler, SequentialSampler, random_split

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, sampler=RandomSampler(train_dataset), batch_size=32, collate_fn=collate_fn_pooled_tokens)
validation_dataloader = DataLoader(val_dataset, sampler = SequentialSampler(val_dataset), batch_size = 32, collate_fn=collate_fn_pooled_tokens)

In [None]:
train_labels = [dataset.labels[i] for i in train_dataset.indices]
val_labels = [dataset.labels[i] for i in val_dataset.indices]

print(len(train_labels))  # Should be train_size
print(len(val_labels))

449
50


In [None]:
from transformers import BertForSequenceClassification, AdamW, BertConfig

# Load BertForSequenceClassification, the pretrained BERT model with a single
# linear classification layer on top.
model = BertForSequenceClassification.from_pretrained(
    "bert-base-uncased", # Use the 12-layer BERT model, with an uncased vocab.
    num_labels = 2, # The number of output labels--2 for binary classification.
                    # You can increase this for multi-class tasks.
    output_attentions = False, # Whether the model returns attentions weights.
    output_hidden_states = False, # Whether the model returns all hidden-states.
)

# Tell pytorch to run this model on the GPU.
model = model.to(device)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
pooling_strategy = "mean"

def _evaluate_single_batch(batch: tuple[Tensor]) -> Tensor:
  input_ids = batch[0]
  attention_mask = batch[1]
  number_of_chunks = [len(x) for x in input_ids]

  # concatenate all input_ids into one batch

  input_ids_combined = []
  for x in input_ids:
      input_ids_combined.extend(x.tolist())

  input_ids_combined_tensors = torch.stack([torch.tensor(x).to(device) for x in input_ids_combined])

  # concatenate all attention masks into one batch

  attention_mask_combined = []
  for x in attention_mask:
      attention_mask_combined.extend(x.tolist())

  attention_mask_combined_tensors = torch.stack(
      [torch.tensor(x).to(device) for x in attention_mask_combined]
  )

  # get model predictions for the combined batch
  logits = model(input_ids_combined_tensors, attention_mask_combined_tensors).logits

  logits_split = logits.split(number_of_chunks, dim=0)

  # pooling
  if pooling_strategy == "mean":
      pooled_logits = torch.stack([torch.mean(x, dim=0) for x in logits_split])
  elif pooling_strategy == "max":
      pooled_logits = torch.stack([torch.max(x, dim=0)[0] for x in logits_split])
  else:
      raise ValueError("Unknown pooling strategy!")

  return pooled_logits

In [None]:
from torch.optim import AdamW, Optimizer
from torch.nn import CrossEntropyLoss

def _train_single_epoch(dataloader: DataLoader, optimizer: Optimizer) -> None:
  model.train()
  cross_entropy = CrossEntropyLoss()

  for step, batch in enumerate(dataloader):
    optimizer.zero_grad()

    labels = batch[-1].long().to(device)

    # if isinstance(labels, list):
    #   labels = torch.cat(labels, dim=0)

    # # Ensure the labels are of type float32 for BCELoss
    # if labels.dtype != torch.float32:
    #   labels = labels.float()

    logits = _evaluate_single_batch(batch)
    loss = cross_entropy(logits, labels)

    loss.backward()
    optimizer.step()


In [None]:
optimizer = AdamW(model.parameters(), lr = 5e-05, eps = 1e-8)

In [None]:
for epoch in range(3):
  print('======== Epoch {:} / {:} ========'.format(epoch + 1, 3))
  print('Training...')
  _train_single_epoch(train_dataloader, optimizer)

Training...
Training...
Training...


In [None]:
for epoch in range(3):
  print('======== Epoch {:} / {:} ========'.format(epoch + 1, 3))
  print('Training...')
  _train_single_epoch(train_dataloader, optimizer)

Training...
Training...
Training...


In [None]:
import numpy as np
from torch import argmax

total_logits = []

for step, batch in enumerate(validation_dataloader):
  with torch.no_grad():
    logits = _evaluate_single_batch(batch)
    total_logits.append(logits)

final_logits = torch.cat(total_logits, dim=0)
classes = argmax(final_logits, dim=1)

print(f"Shape of predicted classes: {classes.shape}")
print(f"Shape of val_labels: {len(val_labels)}")

# Calculate accuracy
accurate = (classes == torch.tensor(val_labels)).sum().item()
accuracy = accurate / len(val_labels)

print(f"Test accuracy: {accuracy}")

Shape of predicted classes: torch.Size([50])
Shape of val_labels: 50


RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu!