RAG with LM studio

In [None]:
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os

def read_pdf(path):
    r_splitter = RecursiveCharacterTextSplitter(
        chunk_size=2000,
        chunk_overlap=450,
        separators=["\n\n", "\n", ".", " ", ""]
    )

    loaders = []
    docs = []

    for filename in os.listdir(path):
        if filename.endswith(".pdf"):
            file_path = os.path.join(path, filename)
            loader = PyPDFLoader(file_path)
            loaders.append(loader)
            docs.extend(loader.load())  # Load documents immediately

    r_splits = r_splitter.split_documents(docs)

    return r_splits

path = r"sample"

r_splits = read_pdf(path)
print(len(r_splits))
print(len(r_splits))
page = r_splits[0]
page.metadata
print(page.page_content[0:100])

In [None]:
page = r_splits[5]
print(page.metadata)


In [None]:
from langchain.vectorstores import Chroma
from langchain.docstore.document import Document
from openai import OpenAI
from typing import List
import os
import shutil
import tempfile
import time



client = OpenAI(base_url="http://localhost:1238/v1", api_key="lm-studio")



class CustomEmbedding2:
    def __init__(self):
        self.embeddings = []  # List to store the embeddings

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        embeddings = [get_embedding(text) for text in texts]
        self.embeddings = embeddings  # Store the embeddings in the `embeddings` attribute
        return embeddings

    def embed_query(self, text: str) -> List[float]:
        embedding = get_embedding(text)
        self.embeddings = [embedding]
        return embedding

    def get_embeddings(self) -> List[List[float]]:
        return self.embeddings

def get_embedding(text, model="TheBloke/nomic-embed-text"):
    text = text.replace("\n", " ")
    return client.embeddings.create(input=[text], model=model).data[0].embedding


def create_vectordb(embedding, r_splits, new_vectordb=True, user_path=None, temp_dir=None):
    cwd = os.getcwd()  # Get the current working directory

    if new_vectordb:
        if user_path is None:
            if temp_dir is None:
                # Create a temporary directory for persisting the vector database
                temp_dir = tempfile.mkdtemp()
                persist_directory = os.path.join(cwd, temp_dir, 'chroma_db')
            else:
                persist_directory = os.path.join(cwd, temp_dir, 'chroma_db')
        else:
            persist_directory = os.path.join(cwd, user_path, 'chroma_db')

        # Remove the existing persist directory (if any)
        if os.path.exists(persist_directory):
            shutil.rmtree(persist_directory)

        print("Creating a new vector database...")
        start_time = time.time()
        vectordb = Chroma.from_documents(
            documents=r_splits,
            embedding=embedding,
            persist_directory=persist_directory
        )
        end_time = time.time()
        print(f"New vector database created in {end_time - start_time:.2f} seconds. Directory: {persist_directory}")

    else:
        if user_path is None:
            print("Please provide a valid user path to load the existing vector database.")
            return None
        else:
            persist_directory = os.path.join(cwd, user_path, 'chroma_db')
            if not os.path.exists(persist_directory):
                print(f"Vector database not found in the specified path: {persist_directory}")
                return None

            print("Loading existing vector database...")
            start_time = time.time()
            vectordb = Chroma(persist_directory=persist_directory, embedding_function=embedding)
            end_time = time.time()
            print(f"Existing vector database loaded in {end_time - start_time:.2f} seconds. Directory: {persist_directory}")

    return vectordb, persist_directory

# Default values
new_vectordb = True
user_path = os.path.join(os.getcwd(), "docs", "sample")
temp_dir = os.path.join(os.getcwd(), "docs", "temp")


# Extract the text from the Document objects
texts = [doc.page_content for doc in r_splits]
# Create the custom embedding object
embedding = CustomEmbedding2()
# Optionally, you can override the default values here
# For example:
# new_vectordb = False
# user_path = "/path/to/your/directory"

vectordb, temp_dir = create_vectordb(embedding, r_splits, new_vectordb, user_path, temp_dir)
print("\nfinished", temp_dir, len(vectordb))
print(vectordb._collection.count())

In [None]:
print(vectordb._collection.count())
query= "Raise your hand if the text on the small screens is legible"
most_similar = vectordb.similarity_search(query, k=5)#, filter={"source":"docs/cs229_lectures/MachineLearning-Lecture03.pdf"})
print(most_similar)
# Check the size of the vector database
print(f"Vector database size: {len(vectordb)}")
print(f" database size: {len(r_splits)}")
for doc in most_similar:
    print(doc.metadata)

In [None]:
from langchain.vectorstores import Chroma
from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
import textwrap

# ... (previous code) ...

# Create a RAG model
llm = OpenAI(base_url="http://localhost:1238/v1", api_key="lm-studio")
retriever = vectordb.as_retriever()

# Define a custom prompt template
template = """
Use the following context from the vector database and the previous conversation to answer the query. Incorporate your own knowledge and reasoning as an AI assistant:

Previous Conversation: {history}

Context: {context}

Query: {query}

Answer:
"""

prompt_template = PromptTemplate(input_variables=["history", "context", "query"], template=template)

# Create a custom chain
qa_chain = LLMChain(llm=llm, prompt=prompt_template)

history = []


In [None]:
while True:
    query = input("> ")
    context = retriever.get_relevant_documents(query)
    result_str = qa_chain.run(
        history="\n".join([f"{msg['role']}: {msg['content']}" for msg in history]),
        query=query,
        max_tokens=4096,
        context="".join([doc.page_content for doc in context])
    )

    # Print the query
    #print("\nQuery:")
    #print(f"{query}")

    # Print the context
    #print("\nContext:")
    #for doc in context:
      #  print(doc.page_content)
      #  print("-" * 80)

    # Print the final answer
    final_answer_start = result_str.find("Final Answer:")
    if final_answer_start != -1:
        final_answer = result_str[final_answer_start + len("Final Answer:"):].strip()
        print("\nFinal Answer:")
        print(textwrap.fill(final_answer, width=80))
    else:
        print("\nAnswer:")
        print(textwrap.fill(result_str, width=80))

    # Update the history
    history.append({"role": "user", "content": query})
    history.append({"role": "assistant", "content": result_str})