In [None]:
!pip install beir

In [5]:
import pandas as pd
import uuid
from beir.datasets.data_loader import GenericDataLoader
from beir.retrieval.evaluation import EvaluateRetrieval
from beir.retrieval.search.dense import DenseRetrievalExactSearch as DRES
from beir.retrieval import models
import yaml
import os
import json
import datetime

class EvaluateSBERTModels:
    
    def __init__(self, config_file_path):
        
        with open(config_file_path) as f:
            config = yaml.load(f, Loader=yaml.FullLoader)

        self.config = config
        
        self.target_dir = self.config['target_dir']
        
        if self.target_dir is None or len(self.target_dir) == 0:
            self.target_dir = os.getcwd()
            
        
        # evaluator inputs path
        self.eval_input_base_path = os.path.join(self.target_dir,'sbert_eval_{}'.format(datetime.datetime.now().strftime("%Y%m%d%H%M%S")),'evaluator_input')
        if not os.path.exists(self.eval_input_base_path):
            os.makedirs(self.eval_input_base_path)
        
        # create results base path
        self.results_base_path = os.path.join(self.target_dir,'sbert_eval_{}'.format(datetime.datetime.now().strftime("%Y%m%d%H%M%S")),'results')
        if not os.path.exists(self.results_base_path):
            os.makedirs(self.results_base_path)
    
    def create_data_for_evaluator(self):
        '''
        Function to convert input data to BEIR data loader compatible format.
        '''
        
        assert len(self.config['data_path'])>0, "Data path cannot be empty."
        
        # load data
        data = pd.read_csv(self.config['data_path'])

        corpus=[]
        queries=[]
        qrels=[]

        for index,item in data.iterrows():
            data={}
            query={}
            doc_id = str(uuid.uuid4())
            data['_id'] = doc_id
            data['text'] = item['Answers']
            data['title'] = ""
            corpus.append(data)
            q_id = str(uuid.uuid4())
            query['_id'] = q_id
            query['text'] = item['Questions']
            queries.append(query)
            qrels.append('{}\t{}\t1'.format(q_id, doc_id))
        
        
        # write corpus
        with open(os.path.join(self.eval_input_base_path,'corpus.jsonl'),'w') as f:
            for index,_dict in enumerate(corpus):
                if index<len(corpus)-1:
                    f.write(json.dumps(_dict)+'\n')
                else:
                    f.write(json.dumps(_dict))
                    
        # write queries
        with open(os.path.join(self.eval_input_base_path,'queries.jsonl'),'w') as f:
            for index,_dict in enumerate(queries):
                if index<len(corpus)-1:
                    f.write(json.dumps(_dict)+'\n')
                else:
                    f.write(json.dumps(_dict))
                    
        # write qrels
        with open(os.path.join(self.eval_input_base_path,'qrels.tsv'),'w') as f:
            # add header
            f.write('query-id\tcorpus-id\tscore')
            for index,line in enumerate(qrels):
                if index<len(corpus)-1:
                    f.write(line+"\n")
                else:
                    f.write(line)
    
    def load_data_for_evaluator(self):
        '''
        Function to load the data for the evaluator.
        '''
        corpus, queries, qrels = GenericDataLoader(
        corpus_file=os.path.join(self.eval_input_base_path,'corpus.jsonl'), 
        query_file=os.path.join(self.eval_input_base_path,'queries.jsonl'), 
        qrels_file=os.path.join(self.eval_input_base_path,'qrels.tsv')).load_custom()
        
        return corpus, queries, qrels
        
    
    def evaluate_model(self, model, corpus, queries, qrels):
        '''
        Function to evaluate a SBERT model.

        Input:
            model: model_path or model id.
            batch_size: batch size for input.
            score_function: distance measure ('dot' or 'cos_sim')
        Output:
            ndgc: Normalized Discounted cumulative gain scores for a given model.
            _map: Mean average precision scores for a given model.
            recall: Recall scores for a given model.
            precision: Precision scores for a given model.
        '''
        
        batch_size = self.config['batch_size']
        if batch_size is None:
            batch_size = 64
            
        score_function = self.config['score_function']
        if score_function is None:
            score_function = "dot"
            
        
        model = DRES(models.SentenceBERT(model), batch_size=batch_size)
        retriever = EvaluateRetrieval(model, score_function=score_function)
        results = retriever.retrieve(corpus, queries)

        ndcg, _map, recall, precision = retriever.evaluate(qrels, results, retriever.k_values)

        return ndcg, _map, recall, precision


    def run_evaluator(self):
        '''
        Pipeline function to run the evaluator.
        '''
        
        assert len(self.config['models'])>0, "Evaluator requires 1 or more models."
        
        ndgc_results=[]
        map_results=[]
        recall_results=[]
        precision_results=[]
        
        # create data for evalutor
        self.create_data_for_evaluator()
        
        # load data for evaluator
        corpus, queries, qrels = self.load_data_for_evaluator()
        
        # analyze models
        for model in self.config['models']:
            
            ndcg, _map, recall, precision = self.evaluate_model(model, corpus, queries, qrels)
            ndcg['Model'] = model
            ndgc_results.append(ndcg)
            _map['Model'] = model
            map_results.append(_map)
            recall['Model'] = model
            recall_results.append(recall)
            precision['Model'] = model
            precision_results.append(precision)
            
        
        with open(os.path.join(self.results_base_path,'NDGC.json'), 'w') as f:
            f.write(json.dumps(ndgc_results))
            
        with open(os.path.join(self.results_base_path,'MAP.json'), 'w') as f:
            f.write(json.dumps(map_results))
            
        with open(os.path.join(self.results_base_path,'RECALL.json'), 'w') as f:
            f.write(json.dumps(recall_results))
            
        with open(os.path.join(self.results_base_path,'PRECISION.json'), 'w') as f:
            f.write(json.dumps(precision_results))
        

In [6]:
sbert_evaluator = EvaluateSBERTModels(config_file_path='config.yaml')

In [7]:
sbert_evaluator.config['models']

['thenlper/gte-large', 'BAAI/bge-large-en-v1.5', 'intfloat/e5-large-v2']

In [8]:
sbert_evaluator.run_evaluator()

  0%|          | 0/98 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]