In [1]:
from fastchat.serve.inference import load_model
from fastchat.conversation import conv_templates
import re
import torch

class model:
    def __init__(self, path='C:\\Users\\vloba\\OneDrive\\Projects\\TFM\\vicuna13b', device='cuda'):
        self.model, self.tokenizer = load_model(
            model_path = path,
            device = device,
            num_gpus = 1,
            max_gpu_memory = '24Gib',
            load_8bit = True,
        )
        self.device = device
        
    def prompt(self, question, context):
        answer = context + '\n\n' + \
            'Sé breve, claro y conciso.\n' + \
            question 
        return answer
    
    def context (self, docs):
        context=' '.join([doc for doc in docs])
        return re.sub('\n', ' ', context)
    
    @torch.inference_mode()
    def query(self, question):
        conv = conv_templates["vicuna_v1.1"].copy()
        conv.append_message(conv.roles[0], question)
        conv.append_message(conv.roles[1], None)
        prompt = conv.get_prompt()
        input_ids = self.tokenizer([prompt]).input_ids
        output_ids = self.model.generate(
                torch.as_tensor(input_ids).to(self.device),
                do_sample=True,
                temperature=0.1,
                max_new_tokens=200,
            )

        output_ids = output_ids[0][len(input_ids[0]):]

        outputs = self.tokenizer.decode(
            output_ids, skip_special_tokens=True, spaces_between_special_tokens=False
        )

        conv.correct_message(question)
        return outputs


In [27]:
from langchain.document_loaders import PyPDFDirectoryLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from sentence_transformers import CrossEncoder
import json
import numpy as np
import pandas as pd
import time

class tester:
    def __init__(self, model, path, chunk_size=[600], k=[5]):
        self.model = model
        self.chunk_size = chunk_size
        self.k = k
        self.loader = PyPDFDirectoryLoader(path)
        self.embeddings = HuggingFaceEmbeddings(model_name='hiiamsid/sentence_similarity_spanish_es')
        self.cross_encoder = CrossEncoder('cross-encoder/mmarco-mMiniLMv2-L12-H384-v1')
    
    def test(self, df, n=1):
        assert len(self.chunk_size) == len(self.k), 'chunk_size and k must have the same length'
        for i in range(len(self.chunk_size)):
            chunk_size = self.chunk_size[i]
            k = self.k[i]
            print(f'chunk_size: {chunk_size}, k: {k}')
            print('Splitting the documents')
            splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_size//5)
            documents = splitter.split_documents(self.loader.load())
            print('Calculating the embeddings')
            docsearch = Chroma.from_documents(documents, self.embeddings)
            for j in range(n):
                print(f'Iteration {j+1}')
                start = time.time()
                answers = []
                print('Starting the test')
                for index, row in df.iterrows():
                    #calculating the answers
                    question = row['question'] + '\na. ' + row['option A'] + '\nb. ' + row['option B'] + '\nc. ' + row['option C'] + '\nd. ' + row['option D']
                    docs = [doc.page_content for doc in docsearch.similarity_search(row['question'], k=k)]
                    scores = self.cross_encoder.predict([[question, doc] for doc in docs])
                    scores = np.array(scores)[scores > 0.5]
                    top_k_indices = scores.argsort()[::-1][:k]
                    docs = [docs[i] for i in top_k_indices]
                    context = model.context(docs)
                    prompt = model.prompt(question, context)
                    answer = model.query(prompt)
                    answers.append({
                        'index': index,
                        'question': question,
                        'right answer': row['answer'] + '. ' + row['option ' + row['answer'].upper()],
                        'answer': answer,
                    })
                    end = time.time()
                with open(f'answers_{chunk_size}_{k}_{j}.json', 'w') as f:
                    json.dump({
                        'chunk_size': chunk_size,
                        'k': k,
                        'duration': end - start,
                        'answers': answers,
                    }, f)
        print('Finished')



In [3]:
model = model()

init_kwargs {'torch_dtype': torch.float16}


100%|██████████| 3/3 [01:31<00:00, 30.48s/it]


In [29]:
tester = tester(model, path="documents", chunk_size=[1000, 1000, 600, 600, 600], k=[2, 3, 3, 4, 5])

In [5]:
df = pd.read_csv('Exam questions.csv')

In [None]:
tester.test(df, n=2)