# Практическое занятие №6

In [None]:
%pip install torch torchvision --index-url https://download.pytorch.org/whl/cu121

In [None]:
%pip install faiss-cpu langchain langchain-community beautifulsoup4 transformers sentence-transformers accelerate

In [None]:
from langchain.embeddings import SentenceTransformerEmbeddings
from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.utils.html import (PREFIXES_TO_IGNORE_REGEX,
                                  SUFFIXES_TO_IGNORE_REGEX)
from langchain_community.document_loaders import UnstructuredURLLoader
from langchain.vectorstores import FAISS

from sentence_transformers import SentenceTransformer
from sentence_transformers.models import Pooling, Transformer

from transformers import AutoTokenizer, AutoModelForCausalLM
from bs4 import BeautifulSoup as Soup
import unicodedata
import torch
import faiss

In [None]:
config = {
    'llm'       : 'IlyaGusev/saiga_llama3_8b',
    'encoder'   : 'cointegrated/rubert-tiny2',
    'url'       : 'https://tinyurl.com/34kex5my'
}

In [None]:
class RAG:
    def __init__(self, config):
        self.__config = config
        
        self.__tokenizer, self.__llm = self.__get_llm(self.__config['llm'])
        
        self.__encoder = self.__get_encoder(self.__config['encoder'])
        
        data = self.__get_data(self.__config['url'])
        chunks = self.__get_chunks(data)
        
        self.__retriever = self.__get_retriever(chunks)
        
    def __get_llm(self, model_id):
        tokenizer = AutoTokenizer.from_pretrained(model_id)
        llm = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.bfloat16,
            device_map="auto",
        )
        
        return tokenizer, llm
    
    def __get_data(self, url):
        loader = UnstructuredURLLoader(urls=[url])
        documents = loader.load()
        
        return documents

    def __get_chunks(self, documents, chunk_size=3000, chunk_overlap=500):
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, 
                                                       chunk_overlap=chunk_overlap, 
                                                       is_separator_regex = False,
                                                       add_start_index = False)
        chunks = text_splitter.split_documents(documents)
        
        return chunks
    
    def __get_encoder(self, encoder_name):
        return SentenceTransformerEmbeddings(model_name=encoder_name)
    
    def __get_retriever(self, chunks):
        index = FAISS.from_documents(chunks, self.__encoder)
        retriever = index.as_retriever()
        
        return retriever
    
    def __get_context(self, query):
        contexts = [unicodedata.normalize('NFKD', docs.page_content) for docs in self.__retriever.get_relevant_documents(query)]
        context = '.'.join(contexts)
        
        return context
    
    def __get_response(self, query, context, max_new_tokens=300, temperature=0.6, top_p=0.18, top_k=100):
        user_prompt = '''
        Используй фрагменты полученного контекста, чтобы ответить на вопрос. 
        Если ты не знаешь ответа, то скажи, что не знаешь, не придумывай ответ. 
        Используй максимум три предложения и отвечай кратко.\n
        Контекст:\n
        {context}\n
        Вопрос:\n
        {query}'''.format(context=context, query=query)
        
        SYSTEM_PROMPT = "Ты — Сайга, русскоязычный автоматический ассистент. Ты разговариваешь с людьми и помогаешь им."
        RESPONSE_TEMPLATE = "Ответ: "
        
        prompt = f'''
                    <|begin_of_text|><|start_header_id|>system<|end_header_id|> \
                    {SYSTEM_PROMPT} \
                    <|eot_id|><|start_header_id|>user<|end_header_id|> \
                    {user_prompt} \
                    <|eot_id|><|start_header_id|>assistant<|end_header_id|> \
                    {RESPONSE_TEMPLATE}
                '''
        
        def generate(model, tokenizer, prompt):
            data = tokenizer(prompt, return_tensors="pt", add_special_tokens=False)
            data = {k: v.to(model.device) for k, v in data.items()}
            output_ids = model.generate(
                **data,
                bos_token_id=128000,
                eos_token_id=128001,
                pad_token_id=128001,
                do_sample=True,
                max_new_tokens=max_new_tokens,
                no_repeat_ngram_size=15,
                repetition_penalty=1.0,
                temperature=temperature,
                top_k=top_k,
                top_p=top_p 
            )[0]
            output_ids = output_ids[len(data["input_ids"][0]) :]
            output = tokenizer.decode(output_ids, skip_special_tokens=True)
            
            return output.strip()
        
        response = generate(self.__llm, self.__tokenizer, prompt)
        
        return response
    
    def get_answer(self, query):
        context = self.__get_context(query)
        response = self.__get_response(query, context)
        response = response.split('assistant')[0]
        
        answer = """
        Вопрос: {query}\n
        =======================\n
        Ответ: {response}
        """.format(query=query, response=response)
        
        return answer

In [None]:
rag = RAG(config)

In [None]:
query = 'В каком году вышел фильм?'
answer = rag.get_answer(query)
print(answer)