In [None]:
import os
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report
from transformers import (AutoTokenizer, AutoModel, AutoConfig,
                          Trainer, TrainingArguments,
                          DataCollatorWithPadding)

In [None]:
df_train = pd.read_csv("Training_data_v4.csv")  # ['missing_word', 'predicted_translation', 'label', 'original_sentence', 'translated_sentence']
df_predictions  = pd.read_csv("mapped_prediction_data.csv")   # ['missing_word', 'original_sentence', 'translated_sentence']
df_train.rename(columns={'predicted_category': 'label'}, inplace=True)
print(df_train.columns.tolist())
print(df_predictions.columns.tolist())
print(df_train.head(1).to_dict(orient='records'))

['missing_word', 'predicted_translation', 'label', 'original_sentence', 'translated_sentence']
['missing_word', 'original_sentence', 'translated_sentence']
[{'missing_word': 'arts', 'predicted_translation': 'articles', 'label': 'Abbreviation or acronym relation', 'original_sentence': 'Assassination (arts. 392, 393 and 394 of the Penal Code)', 'translated_sentence': ':: Murder (articles 392, 393 and 394 of the Penal Code);'}]


In [None]:
labels = sorted(df_train['label'].unique())
label2id = {label: idx for idx, label in enumerate(labels)}
id2label = {idx: label for label, idx in label2id.items()}
df_train['label_id'] = df_train['label'].map(label2id)
num_labels = len(labels)  # should be 4
print(label2id)

{'Abbreviation or acronym relation': 0, 'Character set issue': 1, 'Semantic equivalence': 2, 'Spelling or version variant': 3}


In [None]:
df_train.label.value_counts()

Unnamed: 0_level_0,count
label,Unnamed: 1_level_1
Spelling or version variant,248
Semantic equivalence,239
Character set issue,215
Abbreviation or acronym relation,165


In [None]:
train_df, temp_df = train_test_split(df_train,
                                     test_size=0.3,  # 30% goes to val+test
                                     random_state=42,
                                     stratify=df_train['label_id'])

# Step 2: Split temp into validation and test (50% each of 30% → 15% each)
val_df, test_df = train_test_split(temp_df,
                                   test_size=0.5,
                                   random_state=42,
                                   stratify=temp_df['label_id'])

print(f"Train size: {len(train_df)}, Val size: {len(val_df)}, Test size: {len(test_df)}")

Train size: 606, Val size: 130, Test size: 131


In [None]:
model_name = "eventdata-utd/ConfliBERT-scr-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
config = AutoConfig.from_pretrained(model_name, num_labels=num_labels)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/1.26k [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/224k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/695k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/695 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/678 [00:00<?, ?B/s]

In [None]:
def get_start_end_positions(tokenizer, original, translated, answer_text):
    inputs = tokenizer(original, translated,
                       truncation=True, max_length=256,
                       return_offsets_mapping=True)
    offset_mapping = inputs.pop("offset_mapping")
    sequence_ids = inputs.sequence_ids()

    idx = 0
    while sequence_ids[idx] != 1:
        idx += 1
    context_start = idx
    while idx < len(sequence_ids) and sequence_ids[idx] == 1:
        idx += 1
    context_end = idx - 1

    char_start = translated.find(answer_text)
    if char_start == -1:
        print(f"[WARN] answer_text not found in translation: '{answer_text}' in '{translated}'")
        return context_start, context_start  # fallback to dummy token span

    char_end = char_start + len(answer_text)

    # Get token start index
    idx = context_start
    while idx <= context_end and offset_mapping[idx][0] <= char_start:
        idx += 1
    start_position = idx - 1

    # Get token end index
    idx = context_end
    while idx >= context_start and offset_mapping[idx][1] >= char_end:
        idx -= 1
    end_position = idx + 1

    return start_position, end_position


# Apply to training data
train_spans = []
for _, row in train_df.iterrows():
    start_idx, end_idx = get_start_end_positions(tokenizer,
                                                 row['original_sentence'],
                                                 row['translated_sentence'],
                                                 row['predicted_translation'])
    train_spans.append((start_idx, end_idx))
train_df['start_pos'], train_df['end_pos'] = zip(*train_spans)


In [None]:
val_spans = []
for _, row in val_df.iterrows():
    start_idx, end_idx = get_start_end_positions(tokenizer,
                                                 row['original_sentence'],
                                                 row['translated_sentence'],
                                                 row['predicted_translation'])
    val_spans.append((start_idx, end_idx))
val_df['start_pos'], val_df['end_pos'] = zip(*val_spans)

In [None]:
test_spans = []
for _, row in test_df.iterrows():
    start_idx, end_idx = get_start_end_positions(tokenizer,
                                                 row['original_sentence'],
                                                 row['translated_sentence'],
                                                 row['predicted_translation'])
    test_spans.append((start_idx, end_idx))
test_df['start_pos'], test_df['end_pos'] = zip(*test_spans)

In [None]:
train_df['end_pos'].isnull().sum()

np.int64(0)

In [None]:
class SpanRelDataset(torch.utils.data.Dataset):
    def __init__(self, df):
        self.texts = list(zip(df['original_sentence'], df['translated_sentence']))
        self.start_positions = df['start_pos'].tolist()
        self.end_positions   = df['end_pos'].tolist()
        self.labels          = df['label_id'].tolist()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        orig, trans = self.texts[idx]
        # Tokenize without padding (we'll pad later in the batch)
        inputs = tokenizer(orig, trans,
                           truncation=True, max_length=256,
                           return_offsets_mapping=False,
                           return_tensors=None)
        item = {}
        for k in ['input_ids', 'attention_mask']:
            item[k] = torch.tensor(inputs[k], dtype=torch.long)
        # token_type_ids may not exist for all models (Roberta has no token_type_ids)
        if 'token_type_ids' in inputs:
            item['token_type_ids'] = torch.tensor(inputs['token_type_ids'], dtype=torch.long)
        # Add labels
        item['start_positions'] = torch.tensor(self.start_positions[idx], dtype=torch.long)
        item['end_positions']   = torch.tensor(self.end_positions[idx], dtype=torch.long)
        item['labels']          = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

train_dataset = SpanRelDataset(train_df)
val_dataset   = SpanRelDataset(val_df)


In [None]:
test_dataset   = SpanRelDataset(test_df)


In [None]:
data_collator = DataCollatorWithPadding(tokenizer)

In [None]:
import torch.nn as nn

class SpanRelationModel(nn.Module):
    def __init__(self, model_name, num_labels):
        super().__init__()
        self.base_model = AutoModel.from_pretrained(model_name, config=config)
        hidden_size = self.base_model.config.hidden_size
        # QA (span) head: one linear for start/end
        self.qa_outputs = nn.Linear(hidden_size, 2)
        # Classification head
        self.classifier = nn.Linear(hidden_size, num_labels)

    def forward(self, input_ids, attention_mask, token_type_ids=None,
                start_positions=None, end_positions=None, labels=None):
        # Base model outputs
        outputs = self.base_model(input_ids=input_ids,
                                  attention_mask=attention_mask,
                                  token_type_ids=token_type_ids)
        sequence_output = outputs.last_hidden_state  # [batch, seq_len, hidden]
        logits = self.qa_outputs(sequence_output)    # [batch, seq_len, 2]
        start_logits, end_logits = logits.split(1, dim=-1)
        start_logits = start_logits.squeeze(-1)      # [batch, seq_len]
        end_logits = end_logits.squeeze(-1)          # [batch, seq_len]
        # Classification logits on [CLS] (or first token)
        # Some models have pooler_output; if not, use sequence_output[:,0]
        try:
            pooled_output = outputs.pooler_output
        except:
            pooled_output = sequence_output[:, 0, :]
        cls_logits = self.classifier(pooled_output)  # [batch, num_labels]

        total_loss = None
        if start_positions is not None and end_positions is not None and labels is not None:
            # Compute losses
            loss_fct = nn.CrossEntropyLoss()
            # Clamp positions (in case of out-of-bounds)
            start_positions = start_positions.clamp(0, sequence_output.size(1)-1)
            end_positions   = end_positions.clamp(0, sequence_output.size(1)-1)
            loss_start = loss_fct(start_logits, start_positions)
            loss_end   = loss_fct(end_logits, end_positions)
            loss_cls   = loss_fct(cls_logits, labels)
            total_loss = loss_start + loss_end + loss_cls

        if total_loss is not None:
            return total_loss, start_logits, end_logits, cls_logits
        else:
            return start_logits, end_logits, cls_logits

model = SpanRelationModel(model_name, num_labels)

model.safetensors:   0%|          | 0.00/436M [00:00<?, ?B/s]

Some weights of BertModel were not initialized from the model checkpoint at eventdata-utd/ConfliBERT-scr-uncased and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

SpanRelationModel(
  (base_model): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30000, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, e

In [None]:
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=10,  # more epochs due to small dataset
    per_device_train_batch_size=4,  # smaller batch for more gradient updates per epoch
    per_device_eval_batch_size=8,
    gradient_accumulation_steps=2,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_steps=10,  # more frequent logging
    save_total_limit=2,
    learning_rate=2e-5,  # lower learning rate for stability
    weight_decay=0.01,
    warmup_ratio=0.1,  # helps with training stability in early steps
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    logging_dir="./logs",
    fp16=True,  # enable if using GPU with mixed precision support
    report_to="none"  # or "tensorboard" if you want visualization
)
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

trainer.train()


  trainer = Trainer(


Epoch,Training Loss,Validation Loss
1,8.9983,8.389652
2,5.4154,5.432434
3,4.3036,4.409484
4,2.7808,4.199712
5,2.2211,3.963728
6,2.0466,3.918256
7,1.6396,4.114259
8,1.8343,4.328892


TrainOutput(global_step=608, training_loss=3.9665798372343968, metrics={'train_runtime': 250.2294, 'train_samples_per_second': 24.218, 'train_steps_per_second': 3.037, 'total_flos': 0.0, 'train_loss': 3.9665798372343968, 'epoch': 8.0})

In [None]:
model.eval()
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=16,
                                         collate_fn=data_collator)
all_preds = []
all_labels = []
all_start = []
all_end = []
for batch in val_loader:
    with torch.no_grad():
        inputs = {k: v.to(device) for k, v in batch.items()}
        outputs = model(input_ids=inputs['input_ids'],
                        attention_mask=inputs['attention_mask'],
                        token_type_ids=inputs.get('token_type_ids', None))
        start_logits, end_logits, cls_logits = outputs
        start_pred = torch.argmax(start_logits, dim=1).cpu().tolist()
        end_pred   = torch.argmax(end_logits, dim=1).cpu().tolist()
        label_pred = torch.argmax(cls_logits, dim=1).cpu().tolist()
    all_start.extend(start_pred)
    all_end.extend(end_pred)
    all_preds.extend(label_pred)
    all_labels.extend(batch['labels'].cpu().tolist())

In [None]:
id2label = {v: k for k, v in label2id.items()}
all_label_ids = sorted(list(id2label.keys()))

from sklearn.metrics import classification_report, accuracy_score

acc = accuracy_score(all_labels, all_preds)
print(f"Classification Accuracy (val): {acc:.4f}")
print("Classification report:")
print(classification_report(
    all_labels,
    all_preds,
    labels=all_label_ids,
    target_names=[id2label[i] for i in all_label_ids],
    zero_division=0
))

Classification Accuracy (val): 0.6385
Classification report:
                                  precision    recall  f1-score   support

Abbreviation or acronym relation       0.79      0.44      0.56        25
             Character set issue       0.95      0.62      0.75        32
            Semantic equivalence       0.51      0.61      0.56        36
     Spelling or version variant       0.58      0.81      0.67        37

                        accuracy                           0.64       130
                       macro avg       0.71      0.62      0.64       130
                    weighted avg       0.69      0.64      0.64       130



TEST Accuracy before active learning

In [None]:
model.eval()
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16,
                                         collate_fn=data_collator)
all_preds = []
all_labels = []
all_start = []
all_end = []
for batch in test_loader:
    with torch.no_grad():
        inputs = {k: v.to(device) for k, v in batch.items()}
        outputs = model(input_ids=inputs['input_ids'],
                        attention_mask=inputs['attention_mask'],
                        token_type_ids=inputs.get('token_type_ids', None))
        start_logits, end_logits, cls_logits = outputs
        start_pred = torch.argmax(start_logits, dim=1).cpu().tolist()
        end_pred   = torch.argmax(end_logits, dim=1).cpu().tolist()
        label_pred = torch.argmax(cls_logits, dim=1).cpu().tolist()
    all_start.extend(start_pred)
    all_end.extend(end_pred)
    all_preds.extend(label_pred)
    all_labels.extend(batch['labels'].cpu().tolist())


In [None]:
id2label = {v: k for k, v in label2id.items()}
all_label_ids = sorted(list(id2label.keys()))

from sklearn.metrics import classification_report, accuracy_score

acc = accuracy_score(all_labels, all_preds)
print(f"Classification Accuracy (test): {acc:.4f}")
print("Classification report:")
print(classification_report(
    all_labels,
    all_preds,
    labels=all_label_ids,
    target_names=[id2label[i] for i in all_label_ids],
    zero_division=0
))

Classification Accuracy (test): 0.6336
Classification report:
                                  precision    recall  f1-score   support

Abbreviation or acronym relation       0.67      0.56      0.61        25
             Character set issue       0.79      0.58      0.67        33
            Semantic equivalence       0.53      0.75      0.62        36
     Spelling or version variant       0.66      0.62      0.64        37

                        accuracy                           0.63       131
                       macro avg       0.66      0.63      0.63       131
                    weighted avg       0.66      0.63      0.64       131



In [None]:
from collections import Counter

def compute_span_f1(pred_starts, pred_ends, dataset):
    exact = 0
    total_f1 = 0
    for idx, (s, e) in enumerate(zip(pred_starts, pred_ends)):
        row = dataset[idx]
        # Convert token span back to text
        tokens = tokenizer.convert_ids_to_tokens(row['input_ids'])
        pred_tokens = tokens[s:e+1]
        pred_text = tokenizer.convert_tokens_to_string(pred_tokens).strip()
        true_text = tokenizer.convert_tokens_to_string(
                        tokens[row['start_positions']:row['end_positions']+1]).strip()
        # Exact match
        if pred_text == true_text:
            exact += 1
        # Token-level F1
        pred_tokens_set = pred_text.split()
        true_tokens_set = true_text.split()
        common = Counter(pred_tokens_set) & Counter(true_tokens_set)
        num_same = sum(common.values())
        if num_same == 0:
            f1 = 0.0
        else:
            precision = num_same / len(pred_tokens_set)
            recall    = num_same / len(true_tokens_set)
            f1 = 2 * (precision * recall) / (precision + recall)
        total_f1 += f1
    return exact/len(pred_starts), total_f1/len(pred_starts)

em, f1 = compute_span_f1(all_start, all_end, val_dataset)
print(f"Span Exact Match (val): {em:.4f}, Token-level F1: {f1:.4f}")


In [None]:
model.eval()
results = []
for _, row in df_predictions.iterrows():
    orig = row['original_sentence']
    trans = row['translated_sentence']
    inputs = tokenizer(orig, trans, truncation=True, max_length=256, return_tensors='pt')
    inputs = {k: v.to(device) for k,v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
        start_logits, end_logits, cls_logits = outputs
    start_idx = torch.argmax(start_logits, dim=1).item()
    end_idx   = torch.argmax(end_logits, dim=1).item()
    pred_label_id = torch.argmax(cls_logits, dim=1).item()
    # Convert tokens to string
    tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
    pred_tokens = tokens[start_idx:end_idx+1]
    pred_span = tokenizer.convert_tokens_to_string(pred_tokens).strip()
    pred_label = id2label[pred_label_id]
    results.append([row['missing_word'], orig, trans, pred_span, pred_label])

df_results = pd.DataFrame(results, columns=[
    "missing_word", "original_sentence", "translated_sentence",
    "predicted_closest_match", "predicted_label"
])
df_results.to_csv("predictions.csv", index=False)
print("Test predictions saved to predictions.csv.")


========== Manual Check using the trained model(Fill in examples) ==========

In [None]:
def predict_example(word, original, translated):
    inputs = tokenizer(original, translated, truncation=True, max_length=256, return_tensors='pt')
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
        start_logits, end_logits, cls_logits = outputs
    start_idx = torch.argmax(start_logits, dim=1).item()
    end_idx   = torch.argmax(end_logits, dim=1).item()
    pred_label_id = torch.argmax(cls_logits, dim=1).item()
    tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
    pred_tokens = tokens[start_idx:end_idx+1]
    pred_span = tokenizer.convert_tokens_to_string(pred_tokens).strip()
    pred_label = id2label[pred_label_id]
    return pred_span, pred_label

# Enter an example to test the model.
example = {
    "word": "", #enter word from original sentence here
    "original_sentence": "", # Fill original sentence here
    "translated_sentence": "" # Fill translated sentence here
}
span, label = predict_example(example["word"],
                              example["original_sentence"],
                              example["translated_sentence"])
print("Predicted span:", span)
print("Predicted label:", label)
