# Retrieval-augmented generation (RAG) with watsonx.ai

In [21]:
from typing import Any, Dict, Iterator, List, Optional
import requests
import json
from langchain_core.callbacks.manager import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk

class InstructLabLLM(LLM):
    """A custom chat model that communicates with an instructlab server.

    Example:

        .. code-block:: python

            model = InstructLabLLM(
                url="http://localhost:5000/your-endpoint",
                model_name="models/merlinite-7b-lab-Q4_K_M.gguf",
                system_message="You are a helpful assistant."
            )
            result = model.invoke([HumanMessage(content="hello")])
            result = model.batch([[HumanMessage(content="hello")],
                                 [HumanMessage(content="world")]])
    """

    url: str
    model_name: str
    system_message: str
    """The URL of the instructlab server, the model name, and the system message."""

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Run the LLM on the given input.

        Args:
            prompt: The prompt to generate from.
            stop: Stop words to use when generating. Model output is cut off at the
                first occurrence of any of the stop substrings.
            run_manager: Callback manager for the run.
            **kwargs: Arbitrary additional keyword arguments. These are usually passed
                to the model provider API call.

        Returns:
            The model output as a string.
        """
        if stop is not None:
            raise ValueError("stop kwargs are not permitted.")

        payload = {
            "model": self.model_name,
            "messages": [
                {"role": "system", "content": self.system_message},
                {"role": "user", "content": prompt}
            ]
        }
        headers = {
            "Content-Type": "application/json"
        }

        response = requests.post(self.url, data=json.dumps(payload), headers=headers)

        if response.status_code == 200:
            result = response.json()
            return result['choices'][0]['message']['content']
        else:
            raise Exception(f"Request failed with status code {response.status_code}")

    def _stream(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> Iterator[GenerationChunk]:
        """Stream the LLM on the given prompt.

        This method should be overridden by subclasses that support streaming.

        Args:
            prompt: The prompt to generate from.
            stop: Stop words to use when generating. Model output is cut off at the
                first occurrence of any of these substrings.
            run_manager: Callback manager for the run.
            **kwargs: Arbitrary additional keyword arguments. These are usually passed
                to the model provider API call.

        Returns:
            An iterator of GenerationChunks.
        """
        payload = {
            "model": self.model_name,
            "messages": [
                {"role": "system", "content": self.system_message},
                {"role": "user", "content": prompt}
            ]
        }
        headers = {
        'Content-Type': 'application/json'
        }

        response = requests.post(self.url, data=json.dumps(payload), headers=headers, stream=True)

        if response.status_code == 200:
            for line in response.iter_lines():
                if line:
                    result = json.loads(line.decode('utf-8'))
                    chunk = GenerationChunk(text=result.get("output", ""))
                    if run_manager:
                        run_manager.on_llm_new_token(chunk.text, chunk=chunk)
                    yield chunk
        else:
            raise Exception(f"Request failed with status code {response.status_code}")

    @property
    def _identifying_params(self) -> Dict[str, Any]:
        """Return a dictionary of identifying parameters."""
        return {"url": self.url, "model_name": self.model_name, "system_message": self.system_message}

    @property
    def _llm_type(self) -> str:
        """Get the type of language model used by this chat model. Used for logging purposes only."""
        return "instructlab_llm"


## Initialize InstructLab LLM

In [26]:
trained_model_url = "http://127.0.0.1:8000/v1/chat/completions"
trained_model_name = "instructlab-merlinite-7b-lab-trained/instructlab-merlinite-7b-lab-Q4_K_M.gguf"

In [27]:
system_message = "You are a helpful assistant."

In [28]:
instructLab_llm = InstructLabLLM(
    url=trained_model_url,
    model_name=trained_model_name,
    system_message=system_message
)

In [1]:
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_chroma import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_community.embeddings.sentence_transformer import SentenceTransformerEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import PromptTemplate

In [29]:
response = instructLab_llm.invoke("What is a generative AI? Respond concisely.")
print(response)

A generative AI is an artificial intelligence that can create new content, such as text, images, or music, from scratch. It uses algorithms to analyze and understand patterns in data, which it then uses to generate similar but unique content.


## Document Loader

In [30]:
loader = PyMuPDFLoader("documents/digitaltwin/UR10e User Manual.pdf")

In [31]:
docs = loader.load()

In [32]:
print("Number of pages:", len(docs))

Number of pages: 205


## Text Splitter

In [33]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)

## Embeddings

In [34]:
model_name = "all-MiniLM-L6-v2"
encode_kwargs = {'normalize_embeddings': True}
embedding_model = SentenceTransformerEmbeddings(
    model_name=model_name,
    encode_kwargs=encode_kwargs
)

## Vectorstore

In [35]:
vectorstore = Chroma.from_documents(documents=splits, embedding=embedding_model)

## Retriever

In [36]:
retriever = vectorstore.as_retriever()

## Prompt Template

In [37]:
template= """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.
Context: {context} 

Question: {question}

Answer:"""

In [38]:
prompt = PromptTemplate(input_variables=['context', 'question'], output_parser=StrOutputParser(), template=template)

## Q & A

In [39]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

In [40]:
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | instructLab_llm
    | StrOutputParser()
)

In [41]:
rag_chain.invoke("What Do the Boxes Contain?")

'The boxes contain:\n\n• Control Box with Teach Pendant\n• Mounting bracket for the Control Box'

In [42]:
rag_chain.invoke("How long does the workspace of the UR10e robot extend?")

'The workspace of the UR10e robot extends up to 1300 mm from the base joint.'