In [None]:
BATCH_SIZE = 64
EPOCHS = 20
MAX_LENGTH = 512
MODEL_PATH = "csebuetnlp/banglishbert" # choices: csebuetnlp/banglabert, csebuetnlp/banglishbert, bert-base-multilingual-cased
DATA_PATH = "./Dataset/"
DROPOUT = 0.4
WEIGHT_DECAY = 3e-2
LEARNING_RATE = 1e-4

In [None]:
from datasets import Dataset, DatasetDict, ClassLabel
import pandas as pd
import evaluate
import numpy as np
from transformers import AutoTokenizer, DataCollatorForTokenClassification, AutoModelForTokenClassification, TrainingArguments, Trainer, AutoConfig
import os

os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [None]:
ds = DatasetDict({
    'train': Dataset.from_dict(pd.read_json(f"{DATA_PATH}train.json")),
    'valid': Dataset.from_dict(pd.read_json(f"{DATA_PATH}valid.json")),
    'test': Dataset.from_dict(pd.read_json(f"{DATA_PATH}test.json"))
})

In [None]:
classes = ["O","B-Symptom","I-Symptom","B-Health Condition","I-Health Condition","B-Age","I-Age","B-Medicine","I-Medicine","B-Dosage","I-Dosage","B-Medical Procedure","I-Medical Procedure","B-Specialist","I-Specialist"]

In [None]:
class_ids = {k: v for v, k in enumerate(classes)}

In [None]:
feature = ClassLabel(num_classes=len(classes), names=classes)
ds['train'].features['labels'].feature = feature
ds['test'].features['labels'].feature = feature
ds['valid'].features['labels'].feature = feature

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, use_fast=True)

In [None]:
def tokenize_and_align_labels(samples):
    samples['text'] = [sample.split() for sample in samples['text']]

    tokenized_inputs = tokenizer(
        samples["text"],
        max_length=MAX_LENGTH,
        truncation=True,
        is_split_into_words=True
    )

    labels = []
    for i, label in enumerate(samples["labels"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []

        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(class_ids[label[word_idx]])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
tokenized_ds = ds.map(tokenize_and_align_labels, batched=True, remove_columns=['text'])

In [None]:
data_collator = DataCollatorForTokenClassification(
    tokenizer=tokenizer,
    padding="longest",
    max_length=MAX_LENGTH,
    return_tensors='pt'
)

In [None]:
!pip install seqeval

In [None]:
seqeval = evaluate.load("seqeval")

In [None]:
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [classes[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [classes[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = seqeval.compute(predictions=true_predictions, references=true_labels, zero_division = 0)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
configuration = AutoConfig.from_pretrained(
    MODEL_PATH,
    num_labels=len(classes),
    id2label={index: label for index, label in enumerate(classes)},
    label2id={label: index for index, label in enumerate(classes)},
)
configuration.hidden_dropout_prob = DROPOUT
configuration.attention_probs_dropout_prob = DROPOUT
model = AutoModelForTokenClassification.from_pretrained(
    MODEL_PATH,
    config=configuration
)

In [None]:

from transformers import TrainingArguments, Trainer
# Calculate warmup_steps ensuring it is an integer and >= 1
total_steps = int(len(ds["train"]) / BATCH_SIZE * EPOCHS)
warmup_steps = max(1, int(0.2 * total_steps))  # Ensuring warmup_steps is at least 1


In [None]:
training_args = TrainingArguments(
    output_dir="./results",
    overwrite_output_dir=True,
    save_total_limit=1,
    load_best_model_at_end=True,
    num_train_epochs=EPOCHS,
    learning_rate=LEARNING_RATE,
    lr_scheduler_type="linear",
    warmup_steps=warmup_steps,  # Ensure this is >= 1
    optim="adamw_torch",
    weight_decay=WEIGHT_DECAY,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    save_strategy="epoch",
    evaluation_strategy="epoch",
    gradient_checkpointing=True,
    group_by_length=True,
    dataloader_num_workers=2,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_ds["train"],
    eval_dataset=tokenized_ds["valid"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

trainer.train()
trainer.save_model(output_dir="./best_model/")


In [None]:
predictions = trainer.predict(tokenized_ds["test"]).predictions
labels = tokenized_ds['test']['labels']
compute_metrics((predictions, labels))

In [None]:
predictions = np.argmax(predictions, axis=2)

true_predictions = [
    [classes[p] for (p, l) in zip(prediction, label) if l != -100]
    for prediction, label in zip(predictions, labels)
]

ds['text'] = ds['test'].add_column('preds', true_predictions)
ds['test'].to_pandas().to_json('output.json', orient='records')

In [23]:
import json
import re

def fix_json_format(content):
    """
    Attempt to fix common JSON formatting issues.
    """
    # Replace single quotes with double quotes
    content = content.replace("'", '"')

    # Fix trailing commas in arrays and objects
    content = re.sub(r',\s*([}\]])', r'\1', content)

    # Fix missing double quotes around keys
    content = re.sub(r'(\w+):', r'"\1":', content)

    return content

def extract_json_objects(content):
    """
    Extract individual JSON objects from the content.
    """
    # Find potential JSON objects by matching brackets
    json_objects = re.findall(r'\{(?:[^{}]|(?R))*\}', content, re.DOTALL)

    return json_objects

def convert_json_to_format(input_file, output_file):
    try:
        with open(input_file, 'r', encoding='utf-8') as infile:
            content = infile.read()

        # Attempt to fix common JSON formatting issues
        fixed_content = fix_json_format(content)

        # Extract individual JSON objects
        json_objects = extract_json_objects(fixed_content)

        results = []

        for json_obj in json_objects:
            try:
                # Try to parse each JSON object
                data = json.loads(json_obj)

                if isinstance(data, dict):
                    # Process the text by removing '\n'
                    text = ' '.join(token for token in data.get('tokens', []) if token != '\n')

                    # Create the output format
                    output = {
                        "text": text,
                        "labels": data.get('tags', [])
                    }

                    results.append(output)
            except json.JSONDecodeError as e:
                print(f"Error parsing JSON object: {e}")

        # Save to the output file
        with open(output_file, 'w', encoding='utf-8') as outfile:
            json.dump(results, outfile, ensure_ascii=False, indent=2)

        print(f"Conversion successful! Output saved to {output_file}")

    except Exception as e:
        print(f"An error occurred: {e}")



In [25]:

# Example usage
input_file = 'Dataset/data.json'   # Replace with your input file path
output_file = 'output.json' # Replace with your desired output file path
convert_json_to_format(input_file, output_file)


An error occurred: unknown extension ?R at position 12
