# import lib

In [None]:
import requests
import langchain
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.document_loaders import TextLoader
import os
from langchain_community.embeddings import HuggingFaceInstructEmbeddings
from langchain_community.vectorstores import FAISS
import warnings
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.retrievers import TFIDFRetriever
from langchain.retrievers import EnsembleRetriever
from langchain_community.llms import YandexGPT
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
from langchain.tools import BaseTool


# YandexGPT

In [None]:
def yagpt_request(question, context):
    url = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
    headers = {
        "Content-Type": "application/json",
        "Authorization": "Api-Key ----"
    }

    ya_prompt = {
        "modelUri": "gpt://b1g72uajlds114mlufqi/yandexgpt/latest",
        "completionOptions": {
            "stream": False,
            "temperature": 0.6,
            "maxTokens": "1024"
        },
        "messages": [
            {
                "role": "system",
                "text": context
            },
            {
                "role": "user",
                "text": question
            }
        ]
    }

    response = requests.post(url, headers=headers, json=ya_prompt)
    result = eval(response.text)

    return result['result']['alternatives'][0]['message']['text']

In [None]:
yagpt_request('Ты Лев или пёс?', 'Ты — Пётр Первый')

'Я — Пётр Первый, и я человек, а не лев или пёс. Но я могу быть львом в том смысле, что я стремился изменить Россию и сделать её более сильной и современной, как царь зверей.'

## Add YandexGPT to langchain

In [None]:
from typing import Any, Dict, Iterator, List, Mapping, Optional

from langchain_core.callbacks.manager import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk


class CustomLLM(LLM):

    def _call(
        self,
        prompt: str,
        role: str = 'Ты — RuMate, сервис для ответа по документации RuStore, если вопрос не касается документации приложения (не связан с программированием, подключением подписок, выкладывание приложения и т.д), то стоит отвечать так: "Извините, я бот отвечающий на вопросы по документации, уточните свой вопрос"',
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        '''
        if stop is not None:
            raise ValueError("stop kwargs are not permitted.")
            '''
        return yagpt_request(prompt, role)

    def _stream(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> Iterator[GenerationChunk]:
        """Stream the LLM on the given prompt.

        This method should be overridden by subclasses that support streaming.

        If not implemented, the default behavior of calls to stream will be to
        fallback to the non-streaming version of the model and return
        the output as a single chunk.

        Args:
            prompt: The prompt to generate from.
            stop: Stop words to use when generating. Model output is cut off at the
                first occurrence of any of these substrings.
            run_manager: Callback manager for the run.
            **kwargs: Arbitrary additional keyword arguments. These are usually passed
                to the model provider API call.

        Returns:
            An iterator of GenerationChunks.
        """
        for char in prompt[: self.n]:
            chunk = GenerationChunk(text=char)
            if run_manager:
                run_manager.on_llm_new_token(chunk.text, chunk=chunk)

            yield chunk

    @property
    def _identifying_params(self) -> Dict[str, Any]:
        """Return a dictionary of identifying parameters."""
        return {
            # The model name allows users to specify custom token counting
            # rules in LLM monitoring applications (e.g., in LangSmith users
            # can provide per token pricing for their model and monitor
            # costs for the given LLM.)
            "model_name": "CustomChatModel",
        }

    @property
    def _llm_type(self) -> str:
        """Get the type of language model used by this chat model. Used for logging purposes only."""
        return "custom"

In [None]:
llm = CustomLLM()
print(llm)

[1mCustomLLM[0m
Params: {'model_name': 'CustomChatModel'}


In [None]:
llm.invoke('Ты кто')

'Я — RuMate, сервис для ответа по документации RuStore. Если ваш вопрос не касается документации приложения (не связан с программированием, подключением подписок, выкладыванием приложения и т. д.), то стоит уточнить свой вопрос.'

In [None]:
llm.batch(["woof woof woof", "meow meow meow"])

['Извините, я бот, отвечающий на вопросы по документации. Уточните свой вопрос.\n\nWoof woof woof! Что вы хотели бы узнать о RuStore?',
 'Извините, я бот, отвечающий на вопросы по документации. Уточните свой вопрос.\n\nmeow meow meow']

# RAG system

In [None]:
from langchain.tools import BaseTool
from langchain.agents import initialize_agent
from langchain.agents import AgentType
from langchain.prompts import StringPromptTemplate
from langchain.schema import AgentAction, AgentFinish, OutputParserException
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser
from typing import List, Union
import re

## Template for help Agents

In [None]:
when_need_use_ask = open(r'/content/when_need_use_ask.txt','r').readline()
when_need_use_fix_code = open(r'/content/when_need_use_fix_code.txt','r').readline()
when_need_use_rag = open(r'/content/when_need_use_rag.txt','r').readline()
when_need_use_simplify = open(r'/content/when_need_use_simplify.txt','r').readline()
when_need_use_spliter = open(r'/content/when_need_use_spliter.txt','r').readline()

when_need_use_ask_prompt = open(r'/content/AddAckerPrompt.txt','r').readline()
when_need_use_fix_code_prompt = open(r'/content/FixCodePrompt.txt','r').readline()
when_need_use_rag_prompt = open(r'/content/RagFuncPrompt.txt','r').readline()
when_need_use_simplify_prompt = open(r'/content/SimplifyPrompt.txt','r').readline()
when_need_use_spliter_prompt = open(r'/content/SplitToSimplePrompt.txt','r').readline()

In [None]:
class RagFuncTool(BaseTool):
  name = "RAG system"
  context = when_need_use_rag_prompt
  description = when_need_use_rag
  def _run(self, query: str, context=context):
    return refactor_sim_search_res(query)

class SplitToSimpleTool(BaseTool):
  name = "Split system"
  context = when_need_use_spliter_prompt
  description = when_need_use_spliter
  def _run(self, query: str, context=context):
    return yagpt_request(query, context)

class AddAckerTool(BaseTool):
  name = "Ask system"
  context = when_need_use_ask_prompt
  description = when_need_use_ask
  def _run(self, query: str, context=context):
    return yagpt_request(query, context)

class SimplifyTool(BaseTool):
  name = "Simple query system"
  context = when_need_use_simplify_prompt
  description = when_need_use_simplify
  def _run(self, query: str, context=context):
    return yagpt_request(query, context)

class FixCodeTool(BaseTool):
  name = "Fix Code system"
  context = when_need_use_fix_code_prompt
  description = when_need_use_fix_code
  def _run(self, query: str, context=context):
    return yagpt_request(query, context)


In [None]:
RagFunc = RagFuncTool()
SplitToSimple = SplitToSimpleTool()
AddAcker = AddAckerTool()
Simplify = SimplifyTool()
FixCode = FixCodeTool()

tools = [
    Tool(name="Search_RAG", func=RagFunc._run, description=when_need_use_rag),
    Tool(name="Spliter", func=SplitToSimple._run, description=when_need_use_spliter),
    Tool(name='simplifier', func=Simplify._run, description=when_need_use_simplify),
    Tool(name='code_fixer', func=FixCode._run, description=when_need_use_fix_code)
]

## Template for main Agent

In [None]:
# Set up the base template
template = """Ты являешься ботом сервиса RuMate, который помогает отвечать на вопросы по документации сервиса для загрузки приложений. Ответь на следующие вопросы как можно лучше.
В первую очередь старайся использовать функцию 'Search_RAG' для поиска релевантного ответа.
Если не знаешь, то не нужно ничего придумывать.
Если вопрос не по теме документации, то отвечай по типу "уточните, пожалуйста, вопрос".
У тебя есть доступ к следующим инструментам:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Начинай! Не забудьте говорить вежливо, чётко и ясно, когда будете давать свой окончательный ответ.

Question: {input}
{agent_scratchpad}"""

In [None]:
# Set up a prompt template
class CustomPromptTemplate(StringPromptTemplate):
    # The template to use
    template: str
    # The list of tools available
    tools: List[Tool]

    def format(self, **kwargs) -> str:
        # Get the intermediate steps (AgentAction, Observation tuples)
        # Format them in a particular way
        intermediate_steps = kwargs.pop("intermediate_steps")
        thoughts = ""
        for action, observation in intermediate_steps:
            thoughts += action.log
            thoughts += f"\nObservation: {observation}\nThought: "
        # Set the agent_scratchpad variable to that value
        kwargs["agent_scratchpad"] = thoughts
        # Create a tools variable from the list of tools provided
        kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools])
        # Create a list of tool names for the tools provided
        kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools])
        return self.template.format(**kwargs)

In [None]:
prompt = CustomPromptTemplate(
    template=template,
    tools=tools,
    # This omits the `agent_scratchpad`, `tools`, and `tool_names` variables because those are generated dynamically
    # This includes the `intermediate_steps` variable because that is needed
    input_variables=["input", "intermediate_steps"]
)

In [None]:
class CustomOutputParser(AgentOutputParser):

    def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
        # Check if agent should finish
        if "Final Answer:" in llm_output:
            return AgentFinish(
                # Return values is generally always a dictionary with a single `output` key
                # It is not recommended to try anything else at the moment :)
                return_values={"output": llm_output.split("Final Answer:")[-1].strip()},
                log=llm_output,
            )
        # Parse out the action and action input
        regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
        match = re.search(regex, llm_output, re.DOTALL)
        if not match:
            raise OutputParserException(f"Could not parse LLM output: `{llm_output}`")
        action = match.group(1).strip()
        action_input = match.group(2)
        # Return the action and action input
        return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output)

In [None]:
output_parser = CustomOutputParser()

## Integration agent to system

In [None]:
# LLM chain consisting of the LLM and a prompt
llm_chain = LLMChain(llm=llm, prompt=prompt)

In [None]:

tool_names = [tool.name for tool in tools]
agent = LLMSingleActionAgent(
    llm_chain=llm_chain,
    output_parser=output_parser,
    stop=["\nObservation:"],
    allowed_tools=tool_names
)

In [None]:
agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True,
                                                    handle_parsing_errors=True, max_iterations=5, early_stopping_method='force',
                                                    tags=sss)

In [None]:
answer = agent_executor.run("How to create аккаунит для разработчика?")

In [None]:
print(answer)

** Чтобы создать аккаунт для разработчика, нужно перейти на сайт RuStore, нажать кнопку регистрации, ввести данные, подтвердить адрес электронной почты и принять условия использования.


## Connection with Retriever

In [None]:
!pip install langchain==0.1.2 sentence_transformers==2.2.2

In [None]:
from langchain.retrievers import BM25Retriever
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langchain_community.chains.pebblo_retrieval.models import VectorDB
from langchain_community.embeddings import HuggingFaceInstructEmbeddings

In [None]:
k = 7
retriever_base =  BM25Retriever.from_documents(texts, k=k)
retriever_advanced = vectordb.as_retriever(search_kwargs = {"k":  k, "search_type" : "similarity"})
ensemble_retriever = EnsembleRetriever(retrievers=[retriever_base, retriever_advanced], weights=[0.5, 0.5], k=k)


In [None]:
from langchain_community.embeddings.huggingface import HuggingFaceInstructEmbeddings
import sentence_transformers
import InstructorEmbedding
import rank_bm25

In [None]:
loader = DirectoryLoader(
    '/content/full_doc',
    glob="./*.md",
    loader_cls=TextLoader,
    show_progress=True,
    use_multithreading=True
)

documents = loader.load()
print(f'We have {len(documents)} pages in total')

for page in range(len(documents)):
    page_content = documents[page].page_content
    page_content = page_content.replace('/help/', 'https://www.rustore.ru/help/')
    documents[page].page_content = page_content

headers_to_split_on = [
    ("#", "Header 1"),
]

markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False)

all_splits = []

for doc in documents:
    md_header_splits = markdown_splitter.split_text(doc.page_content)
    for i in range(len(md_header_splits)):
        md_header_splits[i].metadata['source'] = doc.metadata['source']
    all_splits.extend(md_header_splits)

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=800,
    chunk_overlap=500
)

texts = text_splitter.split_documents(documents)
print(f'We have created {len(texts)} chunks')

embeddings = HuggingFaceInstructEmbeddings(
    model_name='sentence-transformers/all-MiniLM-L6-v2',
    model_kwargs={"device": "cpu"}
)

vectordb = FAISS.load_local(
    '/content/faiss_index_hp',  # from output folder
    embeddings
)

k = 7
retriever_base =  BM25Retriever.from_documents(texts, k=k)
retriever_advanced = vectordb.as_retriever(search_kwargs = {"k":  k, "search_type" : "similarity"})
ensemble_retriever = EnsembleRetriever(retrievers=[retriever_base, retriever_advanced], weights=[0.5, 0.5], k=k)


100%|██████████| 386/386 [00:00<00:00, 2964.65it/s]


We have 386 pages in total
We have created 7866 chunks
load INSTRUCTOR_Transformer
max_seq_length  512


In [None]:
from langchain_text_splitters.markdown import MarkdownHeaderTextSplitter
import faiss
from langchain.retrievers.bm25 import BM25Retriever

In [None]:
def refactor_sim_search_res(question):
    retriever_ans = ensemble_retriever.invoke(question, k=6)
    retriever_ans = '\n\n'.join([i.page_content for i in retriever_ans])

    context = tempulate.format(context=retriever_ans)

    text = yagpt_request(question, context)
    return text