<a href="https://colab.research.google.com/github/tubagokhan/ADGM/blob/main/MultilabelClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install transformers[torch]
!pip install accelerate -U
!pip install tqdm

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import json
import torch
from transformers import BertTokenizer
from sklearn.model_selection import train_test_split

# Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

# Define the label types
label_types = ["PERM", "DEF", "RISK", "MIT", "ENT", "ACT", "FS", "PROD", "TECH"]

# Load your JSON data
with open('/content/drive/Othercomputers/MBZUAI/MBZUAI/Codes/records.json', 'r' , encoding='utf-8-sig') as json_file:
    data = json.load(json_file)

# Prepare your dataset
texts = []  # Store text data
labels = []  # Store binary-encoded labels

# Convert label data into binary format
for item in data:
    text = item["Paragraph"]
    text = text.lower()  # Convert text to lowercase
    tags = item["Tags"]
    label = [0] * len(label_types)

    for tag in tags:
        tag_type = tag["Type"]
        if tag_type in label_types:
            label[label_types.index(tag_type)] = 1

    texts.append(text)
    labels.append(label)

# Split the dataset into training and testing
train_texts, test_texts, train_labels, test_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)

# Tokenize the text data
max_seq_length = 128  # Adjust as needed
train_encodings = tokenizer(train_texts, truncation=True, padding='max_length', max_length=max_seq_length, return_tensors='pt', return_attention_mask=True)
test_encodings = tokenizer(test_texts, truncation=True, padding='max_length', max_length=max_seq_length, return_tensors='pt', return_attention_mask=True)

# Create PyTorch Dataset
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = CustomDataset(train_encodings, torch.tensor(train_labels, dtype=torch.float32))
test_dataset = CustomDataset(test_encodings, torch.tensor(test_labels, dtype=torch.float32))

# Now you can use train_dataset and test_dataset to train and evaluate your BERT-based multi-label classification model.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertForSequenceClassification, AdamW
from torch.utils.data import DataLoader
from sklearn.metrics import f1_score
import numpy as np
from tqdm import tqdm  # Import tqdm for the progress bar

# Load BERT model
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=len(label_types))

# Define hyperparameters
batch_size = 64  # Change the batch size
learning_rate = 2e-5
num_epochs = 50  # Increase the number of epochs
early_stop_patience = 3  # Number of epochs to wait for improvement before early stopping

# Initialize optimizer and loss function
optimizer = AdamW(model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

best_f1_score = 0.0
early_stop_counter = 0

# Training loop with early stopping and progress bar
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch + 1}")
    for i, batch in progress_bar:
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]

        optimizer.zero_grad()
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        progress_bar.set_postfix({'Loss': total_loss / (i + 1)})  # Update the progress bar

    print(f"Epoch {epoch + 1} - Average Loss: {total_loss / len(train_loader)}")

    # Evaluate the model and calculate F1 score on the validation set
    model.eval()
    predictions = []
    true_labels = []
    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch["input_ids"]
            attention_mask = batch["attention_mask"]
            labels = batch["labels"]
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            predictions.extend(torch.sigmoid(logits).cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    f1 = f1_score(np.array(true_labels), np.array(predictions) > 0.5, average='micro')

    print(f"Epoch {epoch + 1} - F1 Score: {f1}")

    # Check for early stopping
    if f1 > best_f1_score:
        best_f1_score = f1
        early_stop_counter = 0
    else:
        early_stop_counter += 1
        if early_stop_counter >= early_stop_patience:
            print(f"Early stopping at epoch {epoch + 1} as F1 score did not improve.")
            break


In [None]:
# Evaluation
model.eval()
predicted_labels = []
true_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]

        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        predicted_labels.extend(torch.sigmoid(logits).cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

predicted_labels = np.array(predicted_labels)
true_labels = np.array(true_labels)

# Compute F1-score for each label
f1_scores = f1_score(true_labels, (predicted_labels > 0.5), average='micro')
print("Micro F1-Score:", f1_scores)


In [None]:
import nltk
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt')

# Input sentence
sentence = "the aggregate of each of the following in, or relating to, the Clients portfolio at the close of business on the valuation date."
# Tokenize the sentence
tokens = nltk.word_tokenize(sentence)

# Perform part-of-speech tagging
pos_tags = nltk.pos_tag(tokens)

# Create a grammar for phrase structure parsing
grammar = r"""
    NP: {<DT>?<JJ>*<NN>}
    PP: {<IN><NP>}
    VP: {<VB.*><NP|PP|CLAUSE>+$}
    CLAUSE: {<NP><VP>}
"""
chunk_parser = nltk.RegexpParser(grammar)

# Parse the sentence
tree = chunk_parser.parse(pos_tags)

# Initialize a list to store the phrases
phrases = []

# Define a function to extract phrases from the tree
def extract_phrases(t):
    if isinstance(t, nltk.Tree):
        phrase = " ".join([word for word, tag in t.leaves()])
        phrases.append(phrase)
        for subtree in t:
            extract_phrases(subtree)

# Extract phrases from the tree
extract_phrases(tree)

# Print the extracted phrases
#for phrase in phrases:
#    print(phrase)


In [None]:
import torch

for phrase in phrases:
  # Tokenize the input sentence
  #input_sentence = "the aggregate of each of the following in, or relating to, the Clients portfolio at the close of business on the valuation date."
  tokenized_input = tokenizer(phrase, truncation=True, padding='max_length', max_length=max_seq_length, return_tensors='pt', return_attention_mask=True)

  # Pass the tokenized input through the model
  model.eval()
  with torch.no_grad():
      output = model(**tokenized_input)

  # Interpret the model's output
  logits = output.logits
  predicted_labels = torch.sigmoid(logits).cpu().numpy()[0]  # Assuming you have a single input sentence

  # Identify the predicted phrases and labels
  predicted_phrases_and_labels = []
  for i, label_type in enumerate(label_types):
      if predicted_labels[i] > 0.5:
          predicted_phrases_and_labels.append({
              "Type": label_type,
              "Phrase": phrase  # In this example, we assume the entire sentence is the phrase
          })

  # Display the predicted phrases and labels
  print("Predicted Phrases and Labels:")
  for item in predicted_phrases_and_labels:
      print(f"Type: {item['Type']}, Phrase: {item['Phrase']}")
  print(" ")


In [None]:
# Print the extracted phrases
for phrase in phrases:
    print(phrase)

In [None]:
import torch

# Read the JSON file
with open('/content/drive/Othercomputers/MBZUAI/MBZUAI/Codes/samplePhrasesandTags100.json', 'r', encoding='utf-8-sig') as json_file:
    sample_data = json.load(json_file)

# Extract values from the JSON data and populate the lists
SamplePhrases = [data['phrase'] for data in sample_data]
SampleTags = [data['label_'] for data in sample_data]


for t in range(len(SamplePhrases)):
  print(("KG: " +SampleTags[t]))
  # Tokenize the input sentence
  #input_sentence = "the aggregate of each of the following in, or relating to, the Clients portfolio at the close of business on the valuation date."
  tokenized_input = tokenizer(SamplePhrases[t], truncation=True, padding='max_length', max_length=max_seq_length, return_tensors='pt', return_attention_mask=True)

  # Pass the tokenized input through the model
  model.eval()
  with torch.no_grad():
      output = model(**tokenized_input)

  # Interpret the model's output
  logits = output.logits
  predicted_labels = torch.sigmoid(logits).cpu().numpy()[0]


    # Find the index of the label type with the highest prediction
  highest_label_index = np.argmax(predicted_labels)

  # Save the related data for the highest label
  predicted_phrases_and_labels = [{
      "Type": label_types[highest_label_index],
      "Phrase": SamplePhrases[t]
  }]

  # Display the predicted phrases and labels
  prediction = predicted_phrases_and_labels[0]
  print(f"Type: {prediction['Type']}, Phrase: {prediction['Phrase']}")
  print(" ")
