In [16]:
!pip install -q accelerate

In [37]:
import os
from pathlib import Path

import pandas as pd

import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel

from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
import xgboost as xgb
import numpy as np

In [18]:
def create_cls_model_spec(task, data_spec):
    cls_model_spec = {
        'knn': {
            'cls': make_pipeline(
                StandardScaler(),
                KNeighborsClassifier(n_neighbors=len(data_spec[task]['label_names']))
            )
        },
        'svc': {
            'cls': make_pipeline(StandardScaler(), SVC(gamma='auto'))
        },

        'xgboost': {
            'cls': xgb.XGBClassifier()
        }
    }

    return cls_model_spec

In [19]:
def get_data_spec():
    data_spec = {}

    data_spec['Propaganda'] = {
        'label2id': {
          "Not Propaganda": 0,
          "Propaganda": 1,
          "Unclear": 2,
          "Not Applicable": 3,
        },
        'id2label': {
          "0": "Not Propaganda",
          "1": "Propaganda",
          "2": "Unclear",
          "3": "Not Applicable",
        },
        'label_names': ["Not Propaganda", "Propaganda", "Unclear", "Not Applicable"],
    }

    data_spec['Bias'] = {

        'label2id': {
          "Unbiased": 0,
          "Biased against Palestine": 1,
          "Biased against Israel": 2,
          "Biased against both Palestine and Israel": 3,
          "Biased against others": 4,
          "Unclear": 5,
          "Not Applicable": 6
        },
        'id2label': {
          "0": "Unbiased",
          "1": "Biased against Palestine",
          "2": "Biased against Israel",
          "3": "Biased against both Palestine and Israel",
          "4": "Biased against others",
          "5": "Unclear",
          "6": "Not Applicable"
        },
        'label_names': [
            "Unbiased",
            "Biased against Palestine",
            "Biased against Israel",
            "Biased against both Palestine and Israel",
            "Biased against others",
            "Unclear",
            "Not Applicable"
        ],
    }

    return data_spec


In [20]:
def get_embedding_model_spec():
    embedding_model_spec = {
        'ML-E5-large': {
            'model_name': 'intfloat/multilingual-e5-large',
            'max_length': 512,
            'pooling_type': 'mean',
            'normalize': True,
            'batch_size': 1,
            'kwargs': {'device_map': 'cuda', 'torch_dtype': torch.float16}
        },
        'BGE-M3': {
            'model_name': 'BAAI/bge-m3',
            'max_length': 8192,
            'pooling_type': 'cls',
            'normalize': True,
            'batch_size': 1,
            'kwargs': {'device_map': 'cuda', 'torch_dtype': torch.float16}
        },
        'E5-mistral-7b': {
            'model_name': 'intfloat/e5-mistral-7b-instruct',
            'max_length': 32768,
            'pooling_type': 'last_token',
            'normalize': True,
            'batch_size': 1,
            'kwargs': {'load_in_4bit': True, 'bnb_4bit_compute_dtype': torch.float16}
        },
        'Nomic-Embed': {
            'model_name': 'nomic-ai/nomic-embed-text-v1',
            'max_length': 8192,
            'pooling_type': 'mean',
            'normalize': True,
            'batch_size': 1,
            'kwargs': {'device_map': 'cuda', 'trust_remote_code': True}
        }
    }

    return embedding_model_spec

In [21]:
def mean_pooling(model_output):
    return torch.mean(model_output["last_hidden_state"], dim=1)

In [22]:
def cls_pooling(model_output):
    return model_output[0][:, 0]

In [23]:
def last_token_pooling(model_output):
    return model_output[0][:, -1]

In [24]:
def get_sentence_embedding(
        text,
        tokenizer,
        embed_model,
        normalize,
        max_length,
        pooling_type='cls'
):
    if pooling_type == "last_token":
        encoded_input = tokenizer(
            text,
            max_length=max_length,
            return_attention_mask=False,
            padding=False,
            truncation=True
        )
        encoded_input['input_ids'] = encoded_input['input_ids'] + [tokenizer.eos_token_id]
        encoded_input = tokenizer.pad(
            [encoded_input],
            padding=True,
            return_attention_mask=True,
            return_tensors='pt'
        ).to("cuda")

    else:
        encoded_input = tokenizer(
            text,
            return_tensors="pt",
            max_length=max_length,
            truncation=True
        ).to("cuda")

    with torch.no_grad():
        model_output = embed_model(**encoded_input)

    sentence_embeddings = None
    match pooling_type:
        case "cls":
            sentence_embeddings = cls_pooling(model_output)
        case "mean":
            sentence_embeddings = mean_pooling(model_output)
        case "last_token":
            sentence_embeddings = last_token_pooling(model_output)

    if normalize:
        sentence_embeddings = F.normalize(sentence_embeddings)

    return sentence_embeddings

In [25]:
def embed(embed_model, tokenizer, data, model_spec):
    embeddings = [
        get_sentence_embedding(
            sentence,
            tokenizer,
            embed_model,
            model_spec['normalize'],
            model_spec['max_length'],
            model_spec['pooling_type']
        ) for sentence in data
    ]

    embeddings = torch.cat(embeddings)

    return embeddings

In [26]:
def create_embeddings(embedding_method, embedding_model_spec, data, save_to):
    print("Processing model : " + str(embedding_model_spec))

    tokenizer = AutoTokenizer.from_pretrained(
        embedding_model_spec[embedding_method]['model_name']
    )

    embed_model = AutoModel.from_pretrained(
        embedding_model_spec[embedding_method]['model_name'],
        **embedding_model_spec[embedding_method]['kwargs']
    )

    if embedding_method == "Nomic-Embed":
        embed_model.to('cuda')

    embeddings = embed(embed_model, tokenizer, data, embedding_model_spec[embedding_method])
    embeddings = embeddings.detach().cpu()
    print(f"{embedding_method} embedding shape = {embeddings.shape}")
    torch.save(embeddings, save_to)

In [27]:
def get_train_test_splits(task, data_spec):
    task_data_spec = data_spec[task]

    data_dir = os.path.join(os.getcwd(), 'data')
    task_data_dir = os.path.join(data_dir, task.lower())

    train_data = pd.read_excel(os.path.join(task_data_dir, f'{task.lower()}_train_data.xlsx'))
    test_data = pd.read_excel(os.path.join(task_data_dir, f'{task.lower()}_test_data.xlsx'))

    label2id = task_data_spec['label2id']

    train_data[task] = train_data[task].map(label2id)
    test_data[task] = test_data[task].map(label2id)

    data_splits = {"train": train_data, "test": test_data}

    return data_splits

In [28]:
def get_iaa_data(iaa_data_dir, iaa_index):
    iaa_data = pd.read_excel(os.path.join(iaa_data_dir, f'iaa{iaa_index}.xlsx'))
    return iaa_data

In [59]:
def get_train_data(task, data_spec):
    data_splits = get_train_test_splits(task, data_spec)
    train_data = data_splits["train"]

    return train_data

In [30]:
def generate_embeddings(texts, embedding_model_spec, embedding_method, path):
    create_embeddings(embedding_method, embedding_model_spec, texts, path)
    embeddings = torch.load(path)
    return embeddings

In [62]:
def predict(cls, train_data, x_pred, id2label):
    x_train, y_train = train_data
    cls.fit(x_train, y_train)
    y_pred = cls.predict(x_pred)
    predictions = [id2label[str(y_pred)] for y_pred in y_pred]
    return predictions

In [60]:
def main():

    task = "Propaganda"
    embedding_method = "ML-E5-large"
    cls_method = "knn"
    iaa_index = '1'

    data_spec = get_data_spec()
    data_dir = os.path.join(os.getcwd(), 'data')
    train_data = get_train_data(task, data_spec)
    train_texts = train_data["Text"].tolist()

    iaa_data_dir = os.path.join(data_dir, 'iaa')
    iaa_data = get_iaa_data(iaa_data_dir, iaa_index)
    iaa_texts = iaa_data["Text"].tolist()

    embedding_dir = os.path.join(os.getcwd(), 'embeddings')
    Path(os.path.join(embedding_dir, 'iaa', task.lower())).mkdir(parents=True, exist_ok=True)
    embedding_model_spec = get_embedding_model_spec()

    train_embeddings = generate_embeddings(
        train_texts,
        embedding_model_spec,
        embedding_method,
        os.path.join(embedding_dir, task.lower(), f'{embedding_method}_train_embeddings.pt')
    )

    iaa_embeddings = generate_embeddings(
        iaa_texts,
        embedding_model_spec,
        embedding_method,
        os.path.join(embedding_dir, 'iaa', task.lower(), f'{embedding_method}_{iaa_index}_embeddings.pt')
    )

    cls_model_spec = create_cls_model_spec(task, data_spec)
    cls = cls_model_spec[cls_method]["cls"]

    x_train = train_embeddings
    y_train = train_data[task].tolist()
    x_pred = iaa_embeddings

    task_predictions = predict(cls, (x_train, y_train), x_pred, data_spec[task]['id2label'])
    iaa_data[task] = task_predictions
    iaa_data.to_excel(
        os.path.join(
            data_dir,
            'iaa',
            f"{task.lower()}_{embedding_method}_{cls_method}_iaa{iaa_index}_data.xlsx"),
        index=False
    )

In [None]:
main()