# HuggingFace API Login

In [None]:
from huggingface_hub import notebook_login

notebook_login()

# Import Libraries :

In [78]:
# essential libraries
import pandas as pd
import numpy as np
import warnings
import re
import json
import os

# spacy libararies
import spacy
from spacy.tokens import Doc
from spacy.tokenizer import Tokenizer


# modelling libraries
from transformers import DistilBertTokenizer,DistilBertForTokenClassification, BertTokenizer
from transformers import Trainer, TrainingArguments ,EarlyStoppingCallback ,pipeline
from sklearn.model_selection import train_test_split
from datasets import Dataset,DatasetDict,load_metric


# from-scratch modelling libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer

# evaluation libraires
import evaluate
from seqeval.metrics import classification_report

# plotting libraries
import matplotlib.pyplot as plt
from torchinfo import summary


# ignoring warnings
warnings.filterwarnings("ignore")

# Transforming dataset :

In [1]:
csv = "/content/drive/MyDrive/UH - Final Year Project/data/df-iter-1.csv"
df = pd.read_csv(csv)
df

Unnamed: 0,sentence,vars,names,variable_position
0,The loading term becomes relevant at a time τ,['τ'],a time,['end']
1,we get easily for the rupture force f ∗,['f'],the rupture force,['end']
2,Green’s theorem can be used to show that the v...,['d3xG(x)'],the volume,['end']
3,at a ﬁxed point x,['x'],['point'],['end']
4,it will be quite small (|K| is large) around t...,['x'],the point,['end']
...,...,...,...,...
296,we are inspired by the recent advances in the ...,['(information)'],quantum,['end']
297,"then the correlation function C(A, B)","['C(A, B)']",the correlation function,['end']
298,Next is to utilize the perturbation expansion ...,['f'],the longitudinal distribution function,['end']
299,the resonator voltage V f,['V'],the resonator voltage,['end']


In [2]:
 # drop unecessary "[]" inside names
df["names"] = df["names"].apply(lambda name: name.strip("[]'"))
df['vars'] = df['vars'].apply(lambda x: x.strip("[]'"))
df.drop(columns=["variable_position"],inplace=True)

In [3]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 301 entries, 0 to 300
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   sentence  301 non-null    object
 1   vars      301 non-null    object
 2   names     301 non-null    object
dtypes: object(3)
memory usage: 7.2+ KB


In [4]:
df

Unnamed: 0,sentence,vars,names
0,The loading term becomes relevant at a time τ,τ,a time
1,we get easily for the rupture force f ∗,f,the rupture force
2,Green’s theorem can be used to show that the v...,d3xG(x),the volume
3,at a ﬁxed point x,x,point
4,it will be quite small (|K| is large) around t...,x,the point
...,...,...,...
296,we are inspired by the recent advances in the ...,(information),quantum
297,"then the correlation function C(A, B)","C(A, B)",the correlation function
298,Next is to utilize the perturbation expansion ...,f,the longitudinal distribution function
299,the resonator voltage V f,V,the resonator voltage


# Sentence Tokenization :

In [5]:


def my_tokenizer(text,added_tokens) :

  # defining function pattern
  func_name = r"α-ωA-Za-zΑ-Ω0-9ℰℓℒℳøℂℕℙℚℝℤΓΔΛΞΠΣΦΨΩÅℏ∞∘∂∫∮∯∇αβγ∅"
  func_var = r"A-Za-zα-ωΑ-ΩℰℓℒℳøℂℕℙℚℝℤΓΔΛΞΠΣΦΨΩÅℏ∞∘∂∫∮∯∇αβγ∅"
  func_pattern = fr"(.|)([{func_name}]{{1,3}}(′|.|)\([{func_var}](,\s*[{func_var}])*\))"

  # getting math functions
  matches = re.findall(func_pattern, text)

  # getting only second-group matches (the functions)
  functions = [match[1] for match in matches]
  func_saver = iter(functions.copy())



  # Replace math functions with temporary markers
  for func in functions:
    text = text.replace(func, "[FUNC]")
    # adding the function to the gloabl variable
    added_tokens.append(func)


  # Tokenize the rest of the text
  tokens = text.split()

  # Replace temporary markers with original functions
  new_tokens = [next(func_saver) if '[FUNC]' in token else token for token in tokens]

  return new_tokens

In [6]:
def create_spacy_tokenizer(nlp):
    def custom_tokenizer(text):
        global added_tokens
        tokens = my_tokenizer(text,added_tokens)
        return Doc(nlp.vocab, words=tokens)
    return custom_tokenizer

In [7]:
nlp = spacy.load("en_core_web_sm")
nlp.tokenizer = create_spacy_tokenizer(nlp)

In [8]:
def tokenize(text) :
  tokens= []
  doc = nlp(text)

  # Iterate over the tokens in the processed doc
  for token in doc:
    tokens.append(str(token))
  return tokens

In [9]:
# this returns tokens inside the list
global added_tokens
added_tokens = []
df['tokenized_sentence'] = df['sentence'].apply(tokenize)

In [10]:
mathematical_symbols = [
    'α', 'β', 'γ', 'δ', 'ε', 'ζ', 'η', 'θ', 'ι', 'κ', 'λ', 'μ', 'ν', 'ξ', 'ο', 'π', 'ρ', 'σ', 'τ', 'υ', 'φ', 'χ', 'ψ', 'ω',
    'Α', 'Β', 'Γ', 'Δ', 'Ε', 'Ζ', 'Η', 'Θ', 'Ι', 'Κ', 'Λ', 'Μ', 'Ν', 'Ξ', 'Ο', 'Π', 'Ρ', 'Σ', 'Τ', 'Υ', 'Φ', 'Χ', 'Ψ', 'Ω',
    'ℰ', 'ℓ', 'ℒ', 'ℳ', 'ø', 'ℂ', 'ℕ', 'ℙ', 'ℚ', 'ℝ', 'ℤ','Γ', 'Δ', 'Λ', 'Ξ', 'Π', 'Φ', 'Ψ', 'Ω','Å', 'ℏ', '∞',
    '∂', '∮', '∯', '∇','∅','˜','µ','ǫ','ℋ', 'ℨ', 'ℛ']

mathematical_operations = ['∫', '∑', '∏', '√', '+', '-', '*', '/', '=', '^', '%','∩', '∪', '⊂', '⊆', '∈', '∉','∘' , "≡","<",">","↔","|"]

added_tokens = added_tokens + mathematical_symbols +  mathematical_operations

In [11]:
df.head()

Unnamed: 0,sentence,vars,names,tokenized_sentence
0,The loading term becomes relevant at a time τ,τ,a time,"[The, loading, term, becomes, relevant, at, a,..."
1,we get easily for the rupture force f ∗,f,the rupture force,"[we, get, easily, for, the, rupture, force, f, ∗]"
2,Green’s theorem can be used to show that the v...,d3xG(x),the volume,"[Green’s, theorem, can, be, used, to, show, th..."
3,at a ﬁxed point x,x,point,"[at, a, ﬁxed, point, x]"
4,it will be quite small (|K| is large) around t...,x,the point,"[it, will, be, quite, small, (|K|, is, large),..."


# Labelling :

In [12]:
sentence = ["The","quick","brown","fox" ,"jumps" ,"over","the","lazy","dog"]
word = "The quick brown fox"
word_tokened = word.split()
start = sentence.index(word_tokened[0])
end = sentence.index(word_tokened[-1])

for i in range(start,end+1) :
  if i == start :
    sentence[i] = "B-NAME"
  else :
    sentence[i] = "I-NAME"

In [13]:
def labeling(dataframe):
  labz = []
  for var , name , tokenized_sents in zip(dataframe["vars"],dataframe["names"],dataframe["tokenized_sentence"]) :
    # initilaising everything as oustide of entity
    labels = ['O']*len(tokenized_sents)
    # anonattating variable name
    if var in tokenized_sents :
      idx = tokenized_sents.index(var)
      labels[idx] = "B-VAR"

    tokenized_name = name.split()
    # dealing with names :
    if len(tokenized_name) > 1 :
      if tokenized_name[0] in  tokenized_sents :
        # getting name start index
        start_idx = tokenized_sents.index(tokenized_name[0])
        # getting name end index
        end_idx = tokenized_sents.index(tokenized_name[-1])

        # looping over the labels :
        for i in range(start_idx,end_idx+1) :
          if i == start_idx :
            labels[i] = "B-NAME"
          else :
            labels[i] = "I-NAME"
    elif len(tokenized_name) == 1 :
      if tokenized_name in tokenized_sents :
        idx = tokenized_sents.index(tokenized_name)
        labels[i] = "B-NAME"
    labz.append(labels)
  return labz

In [14]:
df["labels"] = labeling(df)
df

Unnamed: 0,sentence,vars,names,tokenized_sentence,labels
0,The loading term becomes relevant at a time τ,τ,a time,"[The, loading, term, becomes, relevant, at, a,...","[O, O, O, O, O, O, B-NAME, I-NAME, B-VAR]"
1,we get easily for the rupture force f ∗,f,the rupture force,"[we, get, easily, for, the, rupture, force, f, ∗]","[O, O, O, O, B-NAME, I-NAME, I-NAME, B-VAR, O]"
2,Green’s theorem can be used to show that the v...,d3xG(x),the volume,"[Green’s, theorem, can, be, used, to, show, th...","[O, O, O, O, O, O, O, O, B-NAME, I-NAME, O, O,..."
3,at a ﬁxed point x,x,point,"[at, a, ﬁxed, point, x]","[O, O, O, O, B-VAR]"
4,it will be quite small (|K| is large) around t...,x,the point,"[it, will, be, quite, small, (|K|, is, large),...","[O, O, O, O, O, O, O, O, O, B-NAME, I-NAME, B-..."
...,...,...,...,...,...
296,we are inspired by the recent advances in the ...,(information),quantum,"[we, are, inspired, by, the, recent, advances,...","[O, O, O, O, O, O, O, O, O, O, O, O, B-VAR, O]"
297,"then the correlation function C(A, B)","C(A, B)",the correlation function,"[then, the, correlation, function, C(A, B)]","[O, B-NAME, I-NAME, I-NAME, B-VAR]"
298,Next is to utilize the perturbation expansion ...,f,the longitudinal distribution function,"[Next, is, to, utilize, the, perturbation, exp...","[O, O, O, O, B-NAME, I-NAME, I-NAME, I-NAME, I..."
299,the resonator voltage V f,V,the resonator voltage,"[the, resonator, voltage, V, f]","[B-NAME, I-NAME, I-NAME, B-VAR, O]"


In [15]:
df["labels"][1]

['O', 'O', 'O', 'O', 'B-NAME', 'I-NAME', 'I-NAME', 'B-VAR', 'O']

# Fintuning model :

## Data Encoding

In [16]:
def custom_tokenizer(text) :

  # defining function pattern
  func_name = r"α-ωA-Za-zΑ-Ω0-9ℰℓℒℳøℂℕℙℚℝℤΓΔΛΞΠΣΦΨΩÅℏ∞∘∂∫∮∯∇αβγ∅"
  func_var = r"A-Za-zα-ωΑ-ΩℰℓℒℳøℂℕℙℚℝℤΓΔΛΞΠΣΦΨΩÅℏ∞∘∂∫∮∯∇αβγ∅"
  func_pattern = fr"(.|)([{func_name}]{{1,3}}(′|.|)\([{func_var}](,\s*[{func_var}])*\))"

  # getting math functions
  matches = re.findall(func_pattern, text)

  # getting only second-group matches (the functions)
  functions = [match[1] for match in matches]
  func_saver = iter(functions.copy())



  # Replace math functions with temporary markers
  for func in functions:
    text = text.replace(func, "[FUNC]")


  # Tokenize the rest of the text
  tokens = text.split()

  # Replace temporary markers with original functions
  new_tokens = [next(func_saver) if '[FUNC]' in token else token for token in tokens]

  return new_tokens

In [None]:
class CustomDistilBertTokenizer(DistilBertTokenizer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def tokenize(self, text, **kwargs):
        return custom_tokenizer(text)

In [None]:
def vocab_stats(model,tokenizer) :
  print(f"Tokenizer vocabulary size  : {len(tokenizer)}")
  print(f"Model embedding size : {model.get_input_embeddings().weight.shape[0]}")
  print("-"*50)

In [None]:
# defining labels
label2id = {'O': 0, 'B-VAR': 1, 'B-NAME': 2, 'I-NAME': 3}
id2label = {v: k for k, v in label2id.items()}
num_labels = len(set(label for labels in df['labels'] for label in labels))

In [None]:
# defining the tokenizer and the model
tokenizer = CustomDistilBertTokenizer.from_pretrained('distilbert-base-uncased')
model = DistilBertForTokenClassification.from_pretrained("distilbert-base-uncased", num_labels=4, id2label=id2label, label2id=label2id)

if torch.cuda.is_available():
    model = model.to('cuda')

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'DistilBertTokenizer'. 
The class this function is called from is 'CustomDistilBertTokenizer'.
Some weights of DistilBertForTokenClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# vocab stats before adding the tokens
vocab_stats(model,tokenizer)

num_added_tokens = tokenizer.add_tokens(added_tokens)
print("number of tokens added :" ,num_added_tokens)

model.resize_token_embeddings(len(tokenizer))

# vocab stats after adding the tokens
vocab_stats(model,tokenizer)

Tokenizer vocabulary size  : 30522
Model embedding size : 30522
--------------------------------------------------
number of tokens added : 67
Tokenizer vocabulary size  : 30589
Model embedding size : 30589
--------------------------------------------------


In [None]:
# checking non-added tokens and adding them
special_tokens=[]
vocab = tokenizer.get_vocab()
for token in added_tokens:
    if token not in vocab:
        special_tokens.append(token)

In [None]:
#special_tokens_dict = {'additional_special_tokens':special_tokens }
#tokenizer.add_special_tokens(special_tokens_dict)

# checking on the added special tokens
#print(tokenizer.special_tokens_map)

#len(tokenizer)

# updating embeddings again
#model.resize_token_embeddings(len(tokenizer))

In [None]:
# saving tokenizer
tokenizer.save_pretrained('./tokenizah')

('./tokenizah/tokenizer_config.json',
 './tokenizah/special_tokens_map.json',
 './tokenizah/vocab.txt',
 './tokenizah/added_tokens.json')

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained('./tokenizah')

id = tokenizer.convert_tokens_to_ids("f(x)")
id

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'CustomDistilBertTokenizer'. 
The class this function is called from is 'DistilBertTokenizer'.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


30549

In [27]:
pip install datasets

Collecting datasets
  Downloading datasets-2.20.0-py3-none-any.whl.metadata (19 kB)
Collecting pyarrow>=15.0.0 (from datasets)
  Downloading pyarrow-17.0.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.5.0,>=2023.1.0 (from fsspec[http]<=2024.5.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.5.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-2.20.0-py3-none-any.whl (547 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m547.8/547.8 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
# Convert DataFrame to Dataset
dataset = Dataset.from_pandas(df)

In [None]:
dataset

In [None]:
text = "f(x) and g(y)"

encoded = tokenizer(
    text,
    padding='max_length',
    truncation=True,
    max_length=128
)

# Extract token IDs and attention masks
token_ids = encoded['input_ids']
attention_mask = encoded['attention_mask']

print("Token IDs:", token_ids)
print("Attention Mask:", attention_mask)

In [None]:
example = {
    'tokenized_sentence': ['f(x)',"is","a","function"],
    'labels': ['B-VAR', 'O','B-NAME', 'I-NAME']
}

encoded = tokenizer(
    ' '.join(example['tokenized_sentence']),
    padding='max_length',
    truncation=True,
    max_length=128,
)

label2id = {'O': 0, 'B-VAR': 1, 'B-NAME': 2, 'I-NAME': 3}

print("Encoded Input IDs:", encoded['input_ids'])
print("Encoded Attention Mask:", encoded['attention_mask'])

In [None]:
def label_mapper(example,tokenizer):

  encoded = tokenizer(
      example['sentence'],
      padding='max_length',
      truncation=True,
      max_length=128
  )


  label_ids=[]
  # Use the tokenized sentence directly
  labels = [sublist for sublist in example['labels']]

  for label in labels :
    label_ids.append([label2id[lab] for lab in label])


  for label in label_ids :
    label.insert(0,-100)
    label.insert(len(label),-100)
    label_pad = 128 - len(label)
    label.extend([-100] * label_pad)

  return {
          "labels_ids": label_ids,
          "input_ids" : encoded['input_ids'],
          "attention_mask" : encoded['attention_mask']}

In [None]:
encoded_dataset = dataset.map(label_mapper,batched=True,fn_kwargs={'tokenizer': tokenizer})

In [None]:
encoded_dataset[0]

In [None]:
label = ['B-VAR', 'O','B-NAME', 'I-NAME']


label

In [None]:
# confirming the results
print(encoded_dataset[0])
ids = [0, 0, 0, 0, 0, 0, 2, 3, 1]
print([id2label[id] for id in ids])

In [None]:
# Columns to remove
columns_to_remove = ['sentence', 'vars', 'names', 'labels']

# Remove the unnecessary columns
dataset = encoded_dataset.remove_columns(columns_to_remove)

# Display the updated dataset to verify
print(dataset)

## Modelling

### Splitting data :

In [None]:
split = encoded_dataset.train_test_split(test_size=0.2, seed=42)

# Creating DatasetDict
dataset_dict = DatasetDict({
    'train': split['train'],
    'validation': split['test']
})

In [None]:
new_column_names = {'tokenized_sentence': 'sentence', 'labels_ids': 'labels'}
dataset_dict = dataset_dict.rename_columns(new_column_names)

In [None]:
print(dataset_dict)

In [None]:
!pip install evaluate
!pip install seqeval

In [None]:
label_list = list(label2id.keys())

In [None]:
label_list

### Training :

In [None]:
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)


    global true_predictions
    true_predictions  = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    global true_labels
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = seqeval.compute(predictions=true_predictions, references=true_labels)


    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
seqeval = evaluate.load("seqeval")

save_directory = "./my-model2"
lr = 2e-5
epochs = 20
steps = 100


# Adding early stopping :
early_stopping_callback = EarlyStoppingCallback(
    early_stopping_patience=5,  # Number of evaluation steps with no improvement before stopping
    early_stopping_threshold=0.01,  # Minimum change to qualify as an improvement
)


training_args = TrainingArguments(
    output_dir="./my-model2",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=20,
    logging_steps=100,
    weight_decay=0.01,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=True,
)


trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset_dict["train"],
    eval_dataset=dataset_dict["validation"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    callbacks=[early_stopping_callback]
)

# Start training
trainer.train()

In [None]:
trainer.label_names

## Evaluation & Prediciton :

In [None]:

global true_predictions
global true_labels


report = classification_report(true_predictions, true_labels, output_dict=True)
df_report = pd.DataFrame(report)
df_report

In [None]:

text = "the variable is f(x)"
updated_tokenizer = DistilBertTokenizer.from_pretrained('./tokenizah')


classifier = pipeline("ner",model="taissirboukrouba/my-model2",tokenizer=tokenizer, device=0)
classifier(text)

## Visualisation :

In [None]:
trainer.state.log_history

In [None]:
for log in trainer.state.log_history :
  print(log)

In [None]:
train_losses = [log['eval_loss'] for log in trainer.state.log_history[:-1]]
train_losses = [log['eval_loss'] for log in trainer.state.log_history[:-1]]

epochs = range(1, len(train_losses) + 1)

# Plot training loss
plt.plot(epochs, train_losses, label='Evaluation Loss')

# Extract other metrics as needed (e.g., validation loss, accuracy)
# ...

# Customize plot
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Evaluation Loss')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Logs you provided
log_history = [
    # Your log data goes here
]

# Extracting losses and epochs
epochs = []
train_loss = []
val_loss = []

for log in log_history:
    if 'loss' in log and 'epoch' in log:
        epochs.append(log['epoch'])
        train_loss.append(log['loss'])
    if 'eval_loss' in log and 'epoch' in log:
        val_loss.append(log['eval_loss'])

# Plotting
plt.figure(figsize=(10, 5))
plt.plot(epochs[:len(train_loss)], train_loss, label='Training Loss')
plt.plot(epochs[len(train_loss):], val_loss, label='Validation Loss', color='red')

plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss per Epoch')
plt.legend()
plt.grid(True)


In [None]:
classifier.steps

# Model from-scratch :

## Personalized Tokenizer :

In [45]:


class PersonalisedTokenizer:
    def __init__(self, vocab=None):
        # initialising normal vocabulary (used to encode tokens into ids)
        self.vocab = vocab if vocab else {}
        # initialising inverse vocabulary (used to get tokens using ids)
        self.inv_vocab = {v: k for k, v in self.vocab.items()}

    def get_tokens(self, text):
        # applying custom tokenization
        return custom_tokenizer(text)

    def get_token_id(self, tokens):
        # getting token id from vocabulary or replacing it by unknown vocabulary when not present
        unknown_token = self.vocab.get('<unk>', 0)
        return [self.vocab.get(token, unknown_token) for token in tokens]

    def decode_id(self, ids):
        # getting token of the id from the reverse vocabulary
        return [self.inv_vocab.get(i, '<unk>') for i in ids]

    def add_tokens(self, new_tokens):
        for token in new_tokens:
            if token not in self.vocab:
                # add token in case not present in the vocabulary
                self.vocab[token] = len(self.vocab)
                # also updting the inverse vocabulary
                self.inv_vocab[len(self.vocab) - 1] = token

    def get_tokens_ids(self, text, max_length=128,existing_tokens=False):
      # if tokens exist don't tokenize them
      if existing_tokens :
        tokens = text
      # if they don't tokenize the text
      else :
        tokens = self.get_tokens(text)
      # adding [CLS] tag to the start of the setence
      tokens.insert(0,"[CLS]")
      # adding [SEP] tag to the end of the setence
      tokens.insert(len(tokens),"[SEP]")
      # getting token ids
      token_ids = self.get_token_id(tokens)
      # applying truncation
      token_ids = token_ids[:max_length]
      # getting the padding left to add
      padding_length = max_length - len(token_ids)
      # applying padding
      token_ids += [0] * padding_length
      return token_ids

    def decode_tokens_ids(self, token_ids):
        # getting tokens from ids using the reverse vocabulary inside "decode_id()"
        tokens = self.decode_id(token_ids)
        return ' '.join(tokens)

    def get_attention_masks(self,tokens_ids,max_length=128) :
        # getting SEP index
        sep_idx = tokens_ids.index(102)
        # slicing the token ids list
        cut_tokens_ids = tokens_ids[:sep_idx+1]
        # applying attention mask
        attention_masks = [1]*(len(cut_tokens_ids))
        # getting the padding left to add
        padding_length = max_length - len(attention_masks)
        # applying padding
        attention_masks += [0] * padding_length
        return attention_masks

    def save_pretrained(self, save_directory):
        # Save the vocabulary to a file
        if not os.path.exists(save_directory):
            os.makedirs(save_directory)
        vocab_file = os.path.join(save_directory, 'vocab.json')
        with open(vocab_file, 'w') as f:
            json.dump(self.vocab, f)

In [46]:
my_list = [1, 2, 3, 4, 5, 6]
cut_element = 4
index = my_list.index(cut_element+1)
list1 = my_list[:index]
list2 = my_list[index:]
list1

[1, 2, 3, 4]

In [47]:
# Get the BERT vocabulary as a dictionary
vocab = BertTokenizer.from_pretrained('bert-base-uncased').get_vocab()
# testing vocabulary
print(vocab["speed"])

3177


In [48]:
## testing tokenizer
tokenizer_scratch = PersonalisedTokenizer(vocab=vocab)
text = "f(x) is world"
encoded = tokenizer_scratch.get_tokens_ids(text)
print(encoded)

[101, 0, 2003, 2088, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


30522

In [49]:
text = ["f(x)","is","world"]
encoded = tokenizer_scratch.get_tokens_ids(text,128,True)
print(encoded)

[101, 0, 2003, 2088, 102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [50]:
attention_msks = tokenizer_scratch.get_attention_masks(encoded)
attention_msks[:10]

[1, 1, 1, 1, 1, 0, 0, 0, 0, 0]

In [51]:
def PersonalisedEncoding(example, tokenizer,label2id,max_length=20):
    input_ids = []
    attention_masks = []
    label_ids = []

    # getting attention masks and input ids
    for tokens in example['tokenized_sentence'] :
      input_id = tokenizer.get_tokens_ids(tokens,max_length,existing_tokens=True)
      input_ids.append(input_id)

      attention_mask = tokenizer.get_attention_masks(input_id,max_length)
      attention_masks.append(attention_mask)


    # getting labels ids
    labels = [sublist for sublist in example['labels']]

    for label in labels :
      label_ids.append([label2id[lab] for lab in label])


    for label in label_ids :
      # adding -100 in the begining of label (CLS)
      label.insert(0,-100)
      # adding -100 in the begining of label (SEP)
      label.insert(len(label),-100)
      # applying padding
      label_pad = max_length - len(label)
      label.extend([-100] * label_pad)

    return {
          "labels_ids": label_ids,
          "input_ids" : input_ids,
          "attention_mask" : attention_masks}

In [69]:
class TransformerForSequenceLabeling(nn.Module):
    def __init__(self, vocab_size, d_model, num_labels, nhead, num_layers, dim_feedforward, max_seq_len=512):
        super(TransformerForSequenceLabeling, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = nn.Parameter(torch.zeros(1, max_seq_len, d_model))
        encoder_layers = TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers=num_layers)
        self.classifier = nn.Linear(d_model, num_labels)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None, labels=None):
        # Embedding + Positional Encoding
        embedded = self.embedding(input_ids) + self.positional_encoding[:, :input_ids.size(1)]

        # Transformer Encoder
        transformer_output = self.transformer_encoder(embedded)

        # Classification head
        logits = self.classifier(transformer_output)

        if labels is not None:
            # Compute loss if labels are provided
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
            return loss, logits

        return logits

In [75]:
model_scratch = TransformerForSequenceLabeling(
    vocab_size=len(tokenizer_scratch.vocab),
    d_model=768,                 # Hidden size
    num_labels=4,
    nhead=8,                     # Number of attention heads
    num_layers=6,                # Number of encoder layers
    dim_feedforward=2048         # Feedforward dimension
)

In [53]:
example = {
    'tokenized_sentence': [["f(x)","is","a","function"],['f(x)',"is","a","function"]],
    'labels': [['B-VAR', 'O','B-NAME', 'I-NAME'],['B-VAR', 'O','B-NAME', 'I-NAME']]
}

test_df = pd.DataFrame(example)
test_dataset = Dataset.from_pandas(test_df)
test_dataset

Dataset({
    features: ['tokenized_sentence', 'labels'],
    num_rows: 2
})

In [54]:
label2id = {'O': 0, 'B-VAR': 1, 'B-NAME': 2, 'I-NAME': 3}
id2label = {v: k for k, v in label2id.items()}
encoded_test_dataset = test_dataset.map(PersonalisedEncoding,batched=True,fn_kwargs={'tokenizer': tokenizer_scratch,"label2id":label2id})
encoded_test_dataset

Map:   0%|          | 0/2 [00:00<?, ? examples/s]

Dataset({
    features: ['tokenized_sentence', 'labels', 'labels_ids', 'input_ids', 'attention_mask'],
    num_rows: 2
})

In [55]:
print(encoded_test_dataset["labels_ids"][0])

[-100, 1, 0, 2, 3, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100, -100]


In [56]:
# encodings length check
len(encoded_test_dataset[0]["input_ids"]) == len(encoded_test_dataset[0]["attention_mask"])

True

In [57]:
# encodings length check
len(encoded_test_dataset[0]["labels_ids"]) == len(encoded_test_dataset[0]["attention_mask"])

True

In [58]:
dataset = Dataset.from_pandas(df)
arguments = {"tokenizer": tokenizer_scratch,
             "label2id":label2id,
             "max_length": 128}
encoded_dataset_scratch = dataset.map(PersonalisedEncoding,batched=True,fn_kwargs=arguments)
encoded_dataset_scratch

Map:   0%|          | 0/301 [00:00<?, ? examples/s]

Dataset({
    features: ['sentence', 'vars', 'names', 'tokenized_sentence', 'labels', 'labels_ids', 'input_ids', 'attention_mask'],
    num_rows: 301
})

In [59]:
# Columns to remove
columns_to_remove = ['sentence', 'vars', 'names', 'labels']

# Remove the unnecessary columns
encoded_dataset_scratch = encoded_dataset_scratch.remove_columns(columns_to_remove)

In [60]:
encoded_dataset_scratch

Dataset({
    features: ['tokenized_sentence', 'labels_ids', 'input_ids', 'attention_mask'],
    num_rows: 301
})

In [61]:
split = encoded_dataset_scratch.train_test_split(test_size=0.2, seed=42)

# Creating DatasetDict
dataset_dict = DatasetDict({
    'train': split['train'],
    'validation': split['test']
})

In [62]:
new_column_names = {'tokenized_sentence': 'sentence', 'labels_ids': 'labels'}
dataset_dict = dataset_dict.rename_columns(new_column_names)

In [63]:
dataset_dict

DatasetDict({
    train: Dataset({
        features: ['sentence', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 240
    })
    validation: Dataset({
        features: ['sentence', 'labels', 'input_ids', 'attention_mask'],
        num_rows: 61
    })
})

## Personalized Model :

In [37]:
pip install torchsummary torchinfo

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


In [42]:
!pip install seqeval
!pip install evaluate



In [73]:
label_list = list(label2id.keys())

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)


    global true_predictions
    true_predictions  = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    global true_labels
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = seqeval.compute(predictions=true_predictions, references=true_labels)


    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [76]:
seqeval = evaluate.load("seqeval")

save_directory = "./model-scratch-1"
lr = 2e-5
epochs = 20
steps = 100


"""# Adding early stopping :
early_stopping_callback = EarlyStoppingCallback(
    early_stopping_patience=5,  # Number of evaluation steps with no improvement before stopping
    early_stopping_threshold=0.01,  # Minimum change to qualify as an improvement
)"""


training_args = TrainingArguments(
    output_dir=save_directory,
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=5,
    logging_steps=100,
    weight_decay=0.01,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    push_to_hub=True,
)


trainer = Trainer(
    model=model_scratch,
    args=training_args,
    train_dataset=dataset_dict["train"],
    eval_dataset=dataset_dict["validation"],
    tokenizer=tokenizer_scratch,
    compute_metrics=compute_metrics,
    #callbacks=[early_stopping_callback]
)

# Start training
trainer.train()

Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,No log,0.936353,0.0,0.0,0.0,0.606154
2,No log,0.84184,0.208861,0.302752,0.247191,0.66
3,No log,0.828182,0.168142,0.174312,0.171171,0.647692
4,No log,0.817087,0.194915,0.211009,0.202643,0.636923
5,No log,0.804281,0.228758,0.321101,0.267176,0.683077


TrainOutput(global_step=75, training_loss=0.8276736958821614, metrics={'train_runtime': 846.0976, 'train_samples_per_second': 1.418, 'train_steps_per_second': 0.089, 'total_flos': 0.0, 'train_loss': 0.8276736958821614, 'epoch': 5.0})

In [77]:
# Evaluate the model
results = trainer.evaluate()
print(results)

{'eval_loss': 0.8042813539505005, 'eval_precision': 0.22875816993464052, 'eval_recall': 0.3211009174311927, 'eval_f1': 0.26717557251908397, 'eval_accuracy': 0.683076923076923, 'eval_runtime': 11.0932, 'eval_samples_per_second': 5.499, 'eval_steps_per_second': 0.361, 'epoch': 5.0}


In [None]:
# visualize model
"""
import hiddenlayer as hl

transforms = [ hl.transforms.Prune('Constant') ] # Removes Constant nodes from graph.

graph = hl.build_graph(model, batch.text, transforms=transforms)
graph.theme = hl.graph.THEMES['blue'].copy()
graph.save('rnn_hiddenlayer', format='png')"""

"\nimport hiddenlayer as hl\n\ntransforms = [ hl.transforms.Prune('Constant') ] # Removes Constant nodes from graph.\n\ngraph = hl.build_graph(model, batch.text, transforms=transforms)\ngraph.theme = hl.graph.THEMES['blue'].copy()\ngraph.save('rnn_hiddenlayer', format='png')"