In [1]:
import pandas as pd
import os
import torch
from transformers import AutoModel, AutoTokenizer

class TrainableBERTCLSMeanPooler(torch.nn.Module):
    def __init__(self, model_name='allenai/longformer-base-4096', stride=10, max_length=1024):
        super().__init__()
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.tokenizer_args = {
            "max_length": max_length,
            "stride": stride,
            "return_overflowing_tokens": True,
            "truncation": True,
            "padding": True,
            "return_tensors": "pt"
        }

    def forward(self, texts: list[str]):
        if isinstance(texts, str):
            texts = [texts]

        inputs = self.tokenizer(texts, **self.tokenizer_args)
        input_ids = inputs['input_ids'].to(self.model.device)
        attention_mask = inputs['attention_mask'].to(self.model.device)
        token_type_ids = inputs.get('token_type_ids')
        if token_type_ids is not None:
            token_type_ids = token_type_ids.to(self.model.device)
        if "overflow_to_sample_mapping" not in inputs:
            inputs['overflow_to_sample_mapping'] = [0]
            
        mapping = inputs['overflow_to_sample_mapping']

        model_inputs = {
            "input_ids": input_ids,
            "attention_mask": attention_mask
        }
        if token_type_ids is not None:
            model_inputs["token_type_ids"] = token_type_ids

        outputs = self.model(**model_inputs)  # (batch * chunks, seq_len, hidden_size)
        cls_embs = outputs.last_hidden_state[:, 0]  # (batch * chunks, hidden_size)

        # Now average CLS embeddings for each original sample using mapping
        grouped_cls = []
        for i in range(len(texts)):
            indices = [j for j, v in enumerate(mapping) if v == i]
            cls_group = cls_embs[indices]
            mean_cls = cls_group.mean(dim=0)
            grouped_cls.append(mean_cls)

        return torch.stack(grouped_cls, dim=0)  # (batch, hidden_size)


In [2]:
model = TrainableBERTCLSMeanPooler()
model.load_state_dict(torch.load("model_weights.pt", map_location=torch.device("cuda" if torch.cuda.is_available() else "cpu")))
model.eval()

TrainableBERTCLSMeanPooler(
  (model): LongformerModel(
    (embeddings): LongformerEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
      (position_embeddings): Embedding(4098, 768, padding_idx=1)
    )
    (encoder): LongformerEncoder(
      (layer): ModuleList(
        (0-11): 12 x LongformerLayer(
          (attention): LongformerAttention(
            (self): LongformerSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (query_global): Linear(in_features=768, out_features=768, bias=True)
              (key_global): Linear(in_features=768, out_features=768, bias=True)
              (value_glo

In [3]:
import pandas as pd
from similarity import CosineSimScorer

problemset_df = pd.read_csv("C:\\Users\\mokrota\\Documents\\GitHub\\math_problem_recommender\\math_problem_recommender\\benchmark\\benchmarkv3\\df.csv")
qa_df = pd.read_csv("C:\\Users\\mokrota\\Documents\\GitHub\\math_problem_recommender\\math_problem_recommender\\benchmark\\benchmarkv3\\q&a.csv")

In [4]:
def parse_text(row):
    names = ["Anchor", "Golden", "Silver", "Wrong"]
    texts = {}

    for name in names:
        id_name = row[name]
        t = problemset_df[problemset_df['id'] == id_name]['Problem&Solution'].iloc[0]
        texts[name] = t

    return texts

In [5]:
qa_df['Problem&Solution'] = qa_df.apply(parse_text, axis=1)

In [6]:
problemsolution = qa_df['Problem&Solution'].to_list()
# queries = qa_df['query'].to_list()

anchors = []
texts = []
for d in problemsolution:
    anchors.append(d['Anchor'])
    group = [d['Golden'], d['Silver'], d['Wrong']]
    texts.append(group)

In [7]:
from tqdm import tqdm

def embed():
    # Embed each anchor one by one
    anchors_emb_list = [model([a]) for a in anchors]  # Each output shape: (1, hidden_size)
    anchors_emb = torch.cat(anchors_emb_list, dim=0)  # Final shape: (len(anchors), hidden_size)

    texts_emb = []
    for ts in tqdm(texts):
        if isinstance(ts, str):
            emb = model([ts])  # Single string input
        else:
            emb = model(ts)  # ts is already a list of strings
        texts_emb.append(emb)  # Shape: (len(ts), hidden_size) or (1, hidden_size)

    return anchors_emb, texts_emb


In [8]:
import numpy as np
from scipy.stats import spearmanr
def calc_metrics(pred_ranks, true_ranks):
    pair_check = np.array(true_ranks) == np.array(pred_ranks)
    accuracy = pair_check.mean()
    s = []
    for true_rank, pred_rank in zip(true_ranks, pred_ranks):
        rho, _ = spearmanr(true_rank, pred_rank)
        s.append(rho)
    spearmans = np.mean(s)
    return {"accuracy": accuracy,
            "spearman": spearmans}

In [None]:
def evaluate():
    anchors_emb, texts_emb = embed()
    ranker = CosineSimScorer()
    pred_ranks = []
    for anchor, ts in zip(anchors_emb, texts_emb):
        pred_ranks.append(ranker.rank(anchor, ts).cpu().numpy())
    pred_ranks = np.stack(pred_ranks)
    true_ranks = np.arange(len(ts))
    true_ranks = np.tile(true_ranks, (len(anchors), 1))
    metrics = calc_metrics(pred_ranks, true_ranks)
    return metrics

In [10]:
results = []
res = evaluate()
res['name'] = "Tuned longformer"
results.append(res)

Input ids are automatically padded to be a multiple of `config.attention_window`: 512
100%|██████████| 7/7 [00:35<00:00,  5.07s/it]


here


In [11]:
pd.DataFrame(results)

Unnamed: 0,accuracy,spearman,name
0,0.428571,0.142857,Tuned longformer
