# Install necessary libraries

In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
from transformers import AdamW, get_linear_schedule_with_warmup
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, confusion_matrix
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import time


In [None]:
from huggingface_hub import notebook_login
notebook_login()
# #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

In [None]:
# Move the model to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

# Load and preprocess the data

## Load txt file and create dataframe

In [None]:
train_df = pd.read_csv("/kaggle/input/persian-ner/BIO_train_df.csv")
val_df = pd.read_csv("/kaggle/input/persian-ner/BIO_val_df.csv")
test_df = pd.read_csv("/kaggle/input/persian-ner/BIO_test_df.csv")

In [None]:
train_df.loc[train_df['tag'] == 'B-org', 'tag'] = 'B-ORG'
train_df.loc[train_df['tag'] == 'I-org', 'tag'] = 'I-ORG'
val_df.loc[val_df['tag'] == 'B-org', 'tag'] = 'B-ORG'
val_df.loc[val_df['tag'] == 'I-org', 'tag'] = 'I-ORG'
test_df.loc[test_df['tag'] == 'B-org', 'tag'] = 'B-ORG'
test_df.loc[test_df['tag'] == 'I-org', 'tag'] = 'I-ORG'
unique_tags = train_df['tag'].unique()
print(unique_tags)
train_df = train_df[["token", "tag"]]
val_df = val_df[["token", "tag"]]
test_df = test_df[["token", "tag"]]
train_df.head(10)


# Define NER tags and Custom dataset

# Set up the model for fine-tuning

In [None]:
# Map tags to integers
tag2id = {tag: index for index, tag in enumerate(unique_tags)}
id2tag =  {value: key for key, value in tag2id.items()}

# Custom dataset class
class NERDataset(Dataset):
    def __init__(self, dataframe, tokenizer, tag2id, max_len=128):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.tag2id = tag2id
        self.max_len = max_len

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        token = self.data.iloc[idx, 0].split()  # Token is split into a list of strings
        tag = self.data.iloc[idx, 1].split()  # Tag is split into a list of strings

        encoding = self.tokenizer(
            token,
            is_split_into_words=True,
            return_offsets_mapping=True,
            padding='max_length',
            truncation=True,
            max_length=self.max_len
        )
        tags = [self.tag2id.get(t) for t in tag]

        input_ids = encoding['input_ids']
        attention_mask = encoding['attention_mask']
        offset_mapping = encoding['offset_mapping']
        
        # Create a new tags list that matches the length of the input_ids
        new_tags = []
        tags_idx = 0
        for i, offset in enumerate(offset_mapping):
            if offset[0] == 0 and offset[1] != 0:  # Only consider non-padding tokens
                if tags_idx < len(tags):
                    new_tags.append(tags[tags_idx])
                    tags_idx += 1
                else:
                    new_tags.append(self.tag2id['O'])
            else:
                new_tags.append(-100)  # Use -100 to ignore these tokens in the loss calculation
        
        item = {key: torch.tensor(val) for key, val in encoding.items()}
        item['labels'] = torch.tensor(new_tags)
        return item

In [None]:
# def freeze(model, freeze_layer = [1,2,3,4,5,6]):
#     base_name = 'bert.encoder.layer.'
#     freeze_name = [base_name+str(layer) for layer in freeze_layer]
#     for name, param in model.named_parameters():
#         if name in freeze_name:
#             param.requires_grad = False

In [None]:
# Load tokenizer and model
pars_bert  = "HooshvareLab/bert-base-parsbert-uncased"
tokenizer = AutoTokenizer.from_pretrained(pars_bert)
model = AutoModelForTokenClassification.from_pretrained(pars_bert, num_labels=len(tag2id))
# freeze(model)
model.to(device)

In [None]:
MAX_LEN = 128
BATCH_SIZE = 32
EPOCHS = 2
LEARNING_RATE = 3e-5
logging_steps = 100


## Create dataset

In [None]:
# Create datasets
train_dataset = NERDataset(train_df, tokenizer, tag2id)
val_dataset = NERDataset(val_df, tokenizer, tag2id)
test_dataset = NERDataset(test_df, tokenizer, tag2id)

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# Custom training loop

## Define optimizer and scheduler


In [None]:
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)
total_steps = len(train_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

huggingface code

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

Chat-Gpt code

In [None]:
model.eval()
test_loss = 0
correct_predictions = 0
total_predictions = 0
step =0 
with torch.no_grad():
    
    for batch in tqdm(test_loader):
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.cuda.amp.autocast():
            outputs = model(input_ids =  batch['input_ids'], attention_mask = batch['attention_mask'],
                            token_type_ids = batch['token_type_ids'], # offset_mapping=batch['offset_mapping'],
                            labels =  batch['labels'])
            loss = outputs.loss
            test_loss += loss.item()
            predictions = outputs.logits.argmax(dim=-1)
            correct_predictions += (predictions == batch['labels']).sum().item()
            total_predictions += predictions.size(0)
test_accuracy = correct_predictions / total_predictions
print(f"Test Loss: {test_loss / len(test_loader)}, Test Accuracy: {test_accuracy}")

In [None]:
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    start_time = time.time()
    for step, batch in tqdm(enumerate(train_loader),total=len(train_loader)):

        batch = {k: v.to(device) for k, v in batch.items()}
        
        outputs = model(input_ids =  batch['input_ids'], attention_mask = batch['attention_mask'],
                            token_type_ids = batch['token_type_ids'], # offset_mapping=batch['offset_mapping'],
                            labels =  batch['labels'])
        loss = outputs.loss
        total_loss += loss.item()

        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

        if step % logging_steps == 0:
            elapsed_time = time.time() - start_time
            print(f"Epoch {epoch + 1}, Step {step}, Loss: {total_loss / (step + 1)}, Time elapsed: {elapsed_time}s")

        # Save model at checkpoints
#         if step % int(save_steps_perc * len(train_dataloader)) == 0 and step > 0:
#             model.save_pretrained(f'checkpoint-epoch{epoch+1}-step{step}')

    # Save model after each epoch
#     model.save_pretrained(f'model-epoch{epoch+1}')

    # Validation
    model.eval()
    val_loss = 0
    correct_predictions = 0
    total_predictions = 0
    with torch.no_grad():
        for batch in val_loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(input_ids =  batch['input_ids'], attention_mask = batch['attention_mask'],
                            token_type_ids = batch['token_type_ids'], # offset_mapping=batch['offset_mapping'],
                            labels =  batch['labels'])
            loss = outputs.loss
            val_loss += loss.item()
            predictions = outputs.logits.argmax(dim=-1)
            correct_predictions += (predictions == batch['labels']).sum().item()
            total_predictions += predictions.size(0)
    val_accuracy = correct_predictions / total_predictions
    print(f"Validation Loss: {val_loss / len(val_loader)}, Validation Accuracy: {val_accuracy}")
# Save final model
model.save_pretrained('final_model')

# Evaluate the model

In [None]:
model.eval()
test_loss = 0
correct_predictions = 0
total_predictions = 0

# Initialize lists to store true labels and predictions
prediction_list = []
ground_truth = []

with torch.no_grad():
    for batch in tqdm(test_loader):
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.cuda.amp.autocast():
            outputs = model(
                input_ids=batch['input_ids'], 
                attention_mask=batch['attention_mask'],
                token_type_ids=batch['token_type_ids'], 
                labels=batch['labels']
            )
            loss = outputs.loss
            test_loss += loss.item()
            predictions = outputs.logits.argmax(dim=-1)
            
            correct_predictions += (predictions == batch['labels']).sum().item()
            total_predictions += predictions.size(0)
            
            # Collect true labels and predictions
            prediction_list.append(predictions[:,1])
            ground_truth.append(batch['labels'][:,1])

test_accuracy = correct_predictions / total_predictions
average_test_loss = test_loss / len(test_loader)
print(f"Test Loss: {average_test_loss}, Test Accuracy: {test_accuracy}")

In [None]:

# Custom compute metrics function
def compute_metrics(true_labels, pred_labels):
    metrics = {
        'f1': f1_score(true_labels, pred_labels, average='macro'),
        'accuracy': accuracy_score(true_labels, pred_labels),
        'precision': precision_score(true_labels, pred_labels, average='macro'),
        'recall': recall_score(true_labels, pred_labels, average='macro'),
        'confusion_matrix': confusion_matrix(true_labels, pred_labels)
    }

    return metrics

In [None]:
metrics = compute_metrics(np.concatenate([tensor.flatten().cpu() for tensor in ground_truth]),
                          np.concatenate([tensor.flatten().cpu() for tensor in prediction_list])
                         )

# Print the computed metrics
print(f"F1 Score: {metrics['f1']}")
print(f"Accuracy: {metrics['accuracy']}")
print(f"Precision: {metrics['precision']}")
print(f"Recall: {metrics['recall']}")
print(f"Confusion Matrix:\n{metrics['confusion_matrix']}")