In [7]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import pandas as pd
from tqdm import tqdm
from datetime import datetime
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score
# Define the BERT tokenizer and model
import os
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [8]:
# Tokenize and encode the sentences
class MyDataset(Dataset):
    def __init__(self, dataframe):
        self.data = dataframe

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        sentence = self.data.iloc[index]['Full Sentence']
        e1 = self.data.iloc[index]['entity e1']
        e2 = self.data.iloc[index]['entity e2']
        pair_type = self.data.iloc[index]['pair type']

        #input_text = f"{sentence} [SEP] {e1} [SEP] {e2}"
        input_text = f"{sentence} [SEP] {e1} [SEP] {e2}"
        encoding = tokenizer(input_text, return_tensors='pt', truncation=True)

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(pair_type, dtype=torch.long)
        }

def collate_fn(batch):
    input_ids = [item['input_ids'] for item in batch]
    attention_mask = [item['attention_mask'] for item in batch]
    labels = [item['labels'] for item in batch]

    # Pad sequences to the maximum length within the batch
    input_ids = torch.nn.utils.rnn.pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_mask = torch.nn.utils.rnn.pad_sequence(attention_mask, batch_first=True, padding_value=0)

    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'labels': torch.stack(labels)
    }

In [9]:
path_data_train = "save_results/train_dataset.csv"
path_data_test = "save_results/test_dataset.csv"
data_train = pd.read_csv(path_data_train, sep=',', encoding='utf-8')[:100]
data_test = pd.read_csv(path_data_test, sep=',', encoding='utf-8')[:100]
le = LabelEncoder()
class_mapping = {
    'false': 0,
    'effect': 1,
    'mechanism': 2,
    'advise': 3,
    'int': 4
}
# Fit and transform the 'pair type' column using the custom mapping
data_train['pair type'] = le.fit_transform(data_train['pair type'].map(class_mapping))
data_test['pair type'] = le.fit_transform(data_test['pair type'].map(class_mapping))
# Split the data into training and validation sets
train_df, val_df = train_test_split(data_train, test_size=0.1, random_state=42)

train_dataset = MyDataset(train_df)
val_dataset = MyDataset(val_df)
test_dataset = MyDataset(data_test)  # Assuming you have a DataFrame named test_df

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)

In [13]:

model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(le.classes_))

# Set up training parameters
optimizer = AdamW(model.parameters(), lr=1e-5)
num_epochs = 1
best_val_accuracy = 0.0  # Variable to store the best validation accuracy

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForSequenceClassification: ['cls.predictions.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at

In [14]:
# Create a folder for logs if it doesn't exist
logs_folder = "logs"
os.makedirs(logs_folder, exist_ok=True)

# Generate a filename based on the current date and time
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = f"{logs_folder}/training_log_{current_time}.txt"

# Set up logging
log_file = open(log_filename, "w")

# Move model and optimizer to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# optimizer.to(device)

# Training loop
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    total_batches = len(train_loader)
    start_time = datetime.now()

    # Use tqdm for a progress bar
    for batch in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', position=0, leave=True):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    # Calculate average loss
    avg_loss = total_loss / total_batches

    # Validation loop
    model.eval()
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=1)
            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(labels.cpu().numpy())


    val_accuracy = accuracy_score(val_labels, val_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(val_labels, val_preds, average='weighted')

    print(f'Epoch {epoch + 1}/{num_epochs}, Avg Loss: {avg_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}')

    # Log the training progress
    log_file.write(f'Epoch {epoch + 1}/{num_epochs}, Avg Loss: {avg_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}\n')

    # Check if the current model has the best validation accuracy
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        # Save the best model
        model.save_pretrained('best_bert_model')


# Close the log file
log_file.close()

Epoch 1/1:   0%|          | 0/12 [00:00<?, ?it/s]

Epoch 1/1: 100%|██████████| 12/12 [00:37<00:00,  3.11s/it]
  _warn_prf(average, modifier, msg_start, len(result))


Epoch 1/1, Avg Loss: 0.8248, Validation Accuracy: 0.9000, Precision: 0.8100, Recall: 0.9000, F1: 0.8526


In [16]:
from sklearn.metrics import classification_report

# Load the best model for testing
best_model = BertForSequenceClassification.from_pretrained('best_bert_model')

# Move the best model to GPU
best_model.to(device)

# Testing loop
best_model.eval()
test_preds = []
test_labels = []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = best_model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        preds = torch.argmax(logits, dim=1)
        test_preds.extend(preds.cpu().numpy())
        test_labels.extend(labels.cpu().numpy())

# Calculate and print the test accuracy
test_accuracy = accuracy_score(test_labels, test_preds)
print(f'Test Accuracy: {test_accuracy:.4f}')

# Generate and print the classification report
class_report = classification_report(test_labels, test_preds, target_names=[str(i) for i in range(len(le.classes_))])
print("Classification Report:\n", class_report)
