In [1]:
!pip install transformers datasets evaluate seqeval accelerate



In [2]:
import pandas as pd
import numpy as np
import string
import re

import nltk
from nltk.tokenize import word_tokenize

from wordcloud import WordCloud
import matplotlib.pyplot as plt

import spacy
from spacy.training import Example
import random

from datasets import Dataset, load_metric
from transformers import DataCollatorForTokenClassification, pipeline, create_optimizer, TFAutoModelForTokenClassification, AutoTokenizer, AutoModelForTokenClassification, Trainer, TrainingArguments
from transformers.keras_callbacks import KerasMetricCallback

import sklearn
import numpy as np
import torch

import spacy
from spacy import displacy


# Ensure you have the necessary NLTK tokenizer models downloaded
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [3]:
from google.colab import drive
drive.mount('/content/drive')

folder_path = '/content/drive/MyDrive/UU/Thesis'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
pd.set_option('display.max_columns', None)

In [5]:
# Load an Excel file into a DataFrame
# df = pd.read_excel('LCReviewsIntegrated_1962-1994.xlsx', engine='openpyxl')
df = pd.read_excel(folder_path + '/LCReviewsIntegrated_1962-1994.xlsx', engine='openpyxl')

In [6]:
def remove_extra_spaces(text):
    # Replace multiple spaces with a single space
    cleaned_text = re.sub(r'\s+', ' ', text)
    return cleaned_text.strip()

In [7]:
df['content'] = df['content'].apply(remove_extra_spaces)
df['title1'] = df['title1'].apply(remove_extra_spaces)

In [8]:
def remove_punctuation(input_string):
    # Create a translation table that maps each punctuation character to None
    translator = str.maketrans('', '', string.punctuation)
    # Translate the input string using the translation table
    return input_string.translate(translator)

In [9]:
def extract_title(x):
    if not (type(x.title1) == str):
        return np.nan
    if not (type(x.content) == str):
        return np.nan
    if x.title1.lower() in x.content.lower():
        return x.title1

    sentence_parts = re.split(r' / | : ', x.title1.lower())
    sentence_parts = sorted(sentence_parts, key=len, reverse=True)
    for part in sentence_parts:
        if part in x.content.lower():
            return part
        elif remove_punctuation(part) in x.content.lower():
            return remove_punctuation(part)

    if remove_punctuation(x.title1).lower() in x.content.lower():
        return remove_punctuation(x.title1)

    return "error"

In [10]:
def find_sentence_in_text(full_text, sentence):
    start_index = full_text.find(sentence)
    if start_index == -1:
        print("EROR!!!")
        return False
    end_index = start_index + len(sentence)
    return start_index, end_index

In [11]:
result = []
for index, row in df.iterrows():
    result.append(extract_title(row))

df['title3'] = result

In [12]:
df_clean = df[df['title3'] != 'error']

## Check what tokens are present before the title

In [13]:
def create_mask_for_sentence(full_text, sentence):
    # Tokenize the full text
    tokens = word_tokenize(full_text)
    # Find the start and end indices of the sentence in the full text
    start_index = full_text.find(sentence)
    if start_index == -1:
        return None, None
    end_index = start_index + len(sentence)
    # Tokenize the sentence separately to match tokens exactly
    sentence_tokens = word_tokenize(sentence)
    # Initialize mask with zeros
    mask = [0] * len(tokens)
    # Loop through the full text tokens to set the mask
    sentence_pos = 0
    for i, token in enumerate(tokens):
        if sentence_pos < len(sentence_tokens) and token == sentence_tokens[sentence_pos]:
            # Check if the full sequence matches and ensure it doesn't go out of bounds
            if i + len(sentence_tokens) - sentence_pos <= len(tokens) and \
                all(tokens[i + j] == sentence_tokens[sentence_pos + j] for j in range(len(sentence_tokens) - sentence_pos)):
                # Mark the mask for the length of the sentence tokens
                mask[i:i + len(sentence_tokens)] = [1] * len(sentence_tokens)
                break
            else:
                # Increment if it's not a full sequence match
                sentence_pos += 1
        elif token == sentence_tokens[0]:  # Reset if it's the beginning of sentence tokens
            sentence_pos = 0
    return tokens, mask

In [14]:
task = "ner"  # Should be one of "ner", "pos" or "chunk"
# model_checkpoint = "distilbert-base-uncased"
model_checkpoint = "Babelscape/wikineural-multilingual-ner"
batch_size = 16

In [15]:
label_list = ['O', 'I']

In [16]:
def create_data_set(samples):
    data = []

    for sample in samples:
        unique_content_df = df_clean[df_clean['content'] == sample]
        masks = []

        review = remove_punctuation(sample.lower())

        for index, row in unique_content_df.iterrows():
            book = remove_punctuation(row['title3'].lower())

            tokens, mask = create_mask_for_sentence(review, book)  # Assuming this returns a mask of the same length as tokens
            masks.append(mask)
        masks = np.bitwise_or.reduce(np.array(masks), axis=0)
        mask[mask == 0] = "O"
        mask[mask == 1] = "I"

        data.append({
            "tokens": tokens,
            "ner_tags": masks
        })

    return data

In [17]:
samples = df_clean['content'].unique()[:10000]

train_dataset = Dataset.from_list(create_data_set(samples[:int(len(samples) * 0.8)]))
val_dataset = Dataset.from_list(create_data_set(samples[int(len(samples) * 0.8):]))

In [18]:
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [19]:
label_all_tokens = True

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"], truncation=True, is_split_into_words=True
    )

    labels = []
    for i, label in enumerate(examples[f"{task}_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens have a word id that is None. We set the label to -100 so they are automatically
            # ignored in the loss function.
            if word_idx is None:
                label_ids.append(-100)
            # We set the label for the first token of each word.
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            # For the other tokens in a word, we set the label to either the current label or -100, depending on
            # the label_all_tokens flag.
            else:
                label_ids.append(label[word_idx] if label_all_tokens else -100)
            previous_word_idx = word_idx

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [20]:
tokenized_dataset_train = train_dataset.map(tokenize_and_align_labels, batched=True)
tokenized_dataset_val = val_dataset.map(tokenize_and_align_labels, batched=True)

tokenized_dataset_train

Map:   0%|          | 0/8000 [00:00<?, ? examples/s]

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Dataset({
    features: ['tokens', 'ner_tags', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
    num_rows: 8000
})

In [21]:
metric = load_metric("seqeval")

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100] for prediction, label in zip(predictions, labels)
    ]

    total_count_O = 0
    total_count_I = 0
    for pred in true_predictions:
        total_count_O = total_count_O + sum(s.count("O") for s in pred)
        total_count_I = total_count_I + sum(s.count("I") for s in pred)



    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "total_count_O": total_count_O,
        "total_count_I": total_count_I,
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

  metric = load_metric("seqeval")
You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this metric from the next major release of `datasets`.


In [22]:
data_collator = DataCollatorForTokenClassification(tokenizer)

In [23]:
model = AutoModelForTokenClassification.from_pretrained(
    model_checkpoint, num_labels=2, ignore_mismatched_sizes=True
)
model.config.id2label = {0: 'NO_BOOK', 1: 'BOOK'}

Some weights of BertForTokenClassification were not initialized from the model checkpoint at Babelscape/wikineural-multilingual-ner and are newly initialized because the shapes did not match:
- classifier.bias: found shape torch.Size([9]) in the checkpoint and torch.Size([2]) in the model instantiated
- classifier.weight: found shape torch.Size([9, 768]) in the checkpoint and torch.Size([2, 768]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [24]:
# import torch.nn as nn

# class DiceLoss(nn.Module):
#     def __init__(self, smooth=1.):
#         super(DiceLoss, self).__init__()
#         self.smooth = smooth

#     def forward(self, logits, targets):
#         # Flatten predictions and labels
#         logits = logits.view(-1, 2)  # Assuming 2 classes
#         targets = targets.view(-1)

#         intersection = torch.sum(logits[:, 1] * targets)
#         union = torch.sum(logits[:, 1]) + torch.sum(targets)

#         dice_score = (2. * intersection + self.smooth) / (union + self.smooth)
#         dice_loss = 1 - dice_score

#         return dice_loss

In [25]:
class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        # """ Custom weighted CrossEntropyLoss """
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs[0]

        # Reshape logits to [batch_size * sequence_length, num_classes]
        logits = logits.view(-1, logits.size(-1))

        # Reshape labels to [batch_size * sequence_length]
        labels = labels.view(-1)

        class_weights = torch.tensor([0.5, 28.27], dtype=torch.float32, device=torch.device("cuda"))
        loss_fn = torch.nn.CrossEntropyLoss(weight=class_weights)

        loss = loss_fn(logits.to(torch.device("cuda")), labels.to(torch.device("cuda")) )

        return (loss, outputs) if return_outputs else loss

    # def compute_loss(self, model, inputs, return_outputs=False):
    #     """ DICE LOSS """
    #     # TODO: IGNORE -100 labels
    #     labels = inputs.pop("labels")
    #     outputs = model(**inputs)
    #     logits = outputs.logits  # Assuming logits are stored in outputs.logits

    #     # Compute loss using DiceLoss
    #     loss_fn = DiceLoss()
    #     loss = loss_fn(logits, labels)

    #     return (loss, outputs) if return_outputs else loss

In [26]:
training_args = TrainingArguments(
    output_dir="test_model",
    learning_rate=2e-5,
    per_device_train_batch_size=12,
    per_device_eval_batch_size=12,
    num_train_epochs=25,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model='f1',
    greater_is_better=True,
    push_to_hub=False
)

In [27]:
# Create a custom Trainer instance
trainer = MyTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset_train,
    eval_dataset=tokenized_dataset_val,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

In [28]:
del df
del df_clean
del train_dataset
del val_dataset

In [None]:
trainer.train()

Epoch,Training Loss,Validation Loss,Total Count O,Total Count I,F1,Accuracy
1,0.2813,0.213156,843872,62103,0.139682,0.944671
2,0.1924,0.228442,849886,56089,0.165932,0.95096
3,0.1363,0.237135,845596,60379,0.149918,0.94683


In [None]:
sentence = remove_punctuation(df_clean['content'].unique()[-1].lower())

In [None]:
pipe = pipeline(task="token-classification", model=model, tokenizer=tokenizer)

In [None]:
output = pipe(sentence)

In [None]:
def merge_overlapping_intervals(intervals):
    merged_intervals = []
    if not intervals:
        return merged_intervals

    # Sort intervals based on the start value
    intervals.sort(key=lambda x: x[0])

    # Initialize variables for the first interval
    start, end, label = intervals[0]

    # Iterate through the intervals
    for interval in intervals[1:]:
        next_start, next_end, next_label = interval

        # If the intervals overlap, merge them
        if next_start <= end + 1:
            end = max(end, next_end)
        else:
            # If no overlap, add the merged interval to the result and update start, end, label
            merged_intervals.append((start, end, label))
            start, end, label = next_start, next_end, next_label

    # Add the last merged interval
    merged_intervals.append((start, end, "BOOK"))

    return merged_intervals

In [None]:
spans = [(res['start'], res['end'], res['entity']) for res in output if res['entity'] == 'BOOK']

In [None]:
spans = merge_overlapping_intervals(spans)

In [None]:
nlp = spacy.blank('en')
doc = nlp.make_doc(sentence)
ents = []
for span_start, span_end, label in spans:
    ent = doc.char_span(span_start, span_end, label=label)
    if ent is None:
        continue

    ents.append(ent)

doc.ents = ents
displacy.render(doc, style="ent", jupyter=True)