In [43]:
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Milvus
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain import hub
from langchain_community.llms import Ollama
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from typing import List, Dict, Any, Optional

In [54]:
from pymilvus import MilvusClient

client = MilvusClient(host='localhost', port='19530')
client.list_collections()

['LangChainCollection']

In [45]:
from langchain_core.embeddings import Embeddings
from typing import Optional

def load_embeddings_model_hf(model_name: Optional[str]) -> Embeddings:
    """
    load embeddings model.

    Args:
        model_name (str): Model name
    
    Returns:
        Embeddings: Embeddings model
    """

    # Instantiate embeddings
    embeddings_model = HuggingFaceEmbeddings(
        model_name=model_name        
    )

    return embeddings_model

def generate_document_embeddings(documents: List[str], embeddings_model: Embeddings) -> List[List[float]]:
    """
    Embed documents.

    Args:
        documents (List[str]): List of documents
        embeddings_model (Embeddings): Embeddings model
    
    Returns:
        List[List[float]]: List of embeddings
    """

    # Embed documents
    embeddings = embeddings_model.embed_documents(documents)

    return embeddings

def generate_query_embeddings(query: str, embeddings_model: Embeddings) -> List[float]:
    """
    Embed query.

    Args:
        query (str): Query
        embeddings_model (Embeddings): Embeddings model
    
    Returns:
        List[float]: Embedding
    """

    # Embed query
    embedding = embeddings_model.embed_query(query)

    return embedding

model_name = "all-mpnet-base-v2"

# Load embeddings model
embeddings_model = load_embeddings_model(model_name)




In [12]:
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_core.documents import Document
from typing import List, Dict, Any

def load_markdown_document(file_path: str, **kwargs: Dict[str, Any]) -> List[Document]:
    """
    Loads markdown document.

    Args:
        file_path (str): File path
        **kwargs: Keyword arguments

    Returns:
        List[Document]: List of documents

    Raises:
        Exception: If failed to load markdown document
    """
    try:
        # Instantiate loader
        loader = UnstructuredMarkdownLoader(file_path=file_path, **kwargs)
        # Load document
        document = loader.load()
    except Exception as e:
        raise Exception(f"Failed to load markdown document: {e}")
    return document

def load_pdf_document(file_path: str, **kwargs: Dict[str, Any]) -> List[Document]:
    """
    Loads PDF document.

    Args:
        file_path (str): File path
        **kwargs: Keyword arguments

    Returns:
        List[Document]: List of documents

    Raises:
        Exception: If failed to load PDF document
    """
    try:
        # Instantiate loader
        loader = PyPDFLoader(file_path=file_path, **kwargs)
        # Load document
        document = loader.load()
    except Exception as e:
        raise Exception(f"Failed to load PDF document: {e}")
    return document

MARKDOWN_PATH = "./data/README.md"


# Load markdown document
document = load_markdown_document(file_path=MARKDOWN_PATH)

print(document[0].page_content)
print(document[0].metadata)

trabalho-fp-2024-01

Trabalho Fábrica de Projetos 02 - 2024/01

Description

This is a full scale Retrieval Augmented Generation LLM server built using LangChain, Milvus, and HugginhFace.

API

Instantiate Milvus vectorstore, connect to milvus client. create a collection.

Generate

Loads an LLM model locally and generates a response based on a prompt

request body example
{
  "model": "llama3",
  "prompt": "Hello World",
  "stream": false,
  "options": {}
}

response
{
  "model": "llama3",
  "created_at": "2024-06-13 15:28:24",
  "response": "Hello World! Nice to meet you! What brings you here today?",
  "total duration": 5.958862066268921
}

Embeddings

@load_documents
- load document
- split document based on markdown
- store with HF embeddings model

@retrieve_documents
- instantiate milvus langchain client
- as retriever + embedding
- query

https://python.langchain.com/v0.2/docs/integrations/vectorstores/milvus/

@rag
https://github.com/stephen37/Milvus_demo/blob/main/milvus_rag/

In [23]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_text_splitters import MarkdownHeaderTextSplitter
from typing import List, Tuple

def split_markdown_text(text: str, headers_to_split_on: List[Tuple[str, str]], **kwargs: Dict[str, Any]) -> List[Document]:
    """
    Split markdown document into sections.

    Args:
        document (List[Document]): List of documents
        headers_to_split_on (List[Tuple[str, str]]): Headers to split on
        **kwargs: Keyword arguments

    Returns:
        List[Document]: List of documents

    Raises:
        Exception: If failed to split markdown document
    """
    try:
        # Instantiate markdown splitter
        markdown_splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=headers_to_split_on, **kwargs
            )
        # Split text
        splits = markdown_splitter.split_text(text)
    except Exception as e:
        raise Exception(f"Failed to split markdown document: {e}")

    return splits


def split_documents(documents: List[Document], chunk_size:int=1000, chunk_overlap:int=200) -> List[Document]:
    """
    Split documents into chunks.

    Args:
        documents (List[Document]): List of documents
        chunk_size (int): Size of the chunk
        chunk_overlap (float): Overlap between chunks
    
    Returns:
        List[Document]: List of documents with chunks.
    """

    # Instantiate text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap, add_start_index=True
        )

    # Split text
    splits = text_splitter.split_documents(documents)

    return splits

# Headers to split on
headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
]

kwargs = {
    "strip_headers": True
}

# Split markdown documents
splits = split_markdown_document(document, headers_to_split_on, **kwargs)

print(len(splits))

1


In [32]:
from langchain_text_splitters import RecursiveCharacterTextSplitter, Language

PYTHON_CODE = """
def hello_world():
    print("Hello, World!")

# Call the function
hello_world()
"""
python_splitter = RecursiveCharacterTextSplitter.from_language(
    language=Language.PYTHON, chunk_size=50, chunk_overlap=0
)
python_docs = python_splitter.create_documents([PYTHON_CODE])
python_docs

[Document(page_content='def hello_world():\n    print("Hello, World!")'),
 Document(page_content='# Call the function\nhello_world()')]

In [48]:
from langchain_core.vectorstores import VectorStore, VectorStoreRetriever, VST

def load_milvus_vectorstore():

    pass

def load_vectorstore(vectorstore: VectorStore, documents: List[Document], embeddings_model: Embeddings, **kwargs: Dict[str, Any]) -> VST:
    """
    Loads documents into vectorstore.

    Args:
        vectorstore (VectorStore): Vector store
        documents (List[Document]): List of documents
        embeddings_model (Embeddings): Embeddings model
        **kwargs: Keyword arguments

    Returns:
        VST: Vector store
    """
    try:
        # Load vectorstore
        vectorstore = vectorstore.from_documents(
            documents=documents, embedding=embeddings_model, **kwargs
        )
    except Exception as e:
        raise Exception(f"Failed to load vectorstore: {e}")

    return vectorstore

def load_retriever(vectorstore: VectorStore) -> VectorStoreRetriever:
    """
    Loads retriever from vectorstore.

    Args:
        vectorstore (VectorStore): Vector store

    Returns:
        VectorStoreRetriever: Vector store retriever
    """
    try:
        # Load retriever
        retriever = vectorstore.as_retriever()
    except Exception as e:
        raise Exception(f"Failed to load retriever: {e}")

    return retriever





URI = "http://localhost:19530"

# Load vectorstore
vectorstore = Milvus.from_documents(
    documents=splits, embedding=embeddings_model,
    connection_args={"uri": URI}
)

# Load retriever
retriever = vectorstore.as_retriever() 



In [49]:
prompt = hub.pull("rlm/rag-prompt")

In [50]:
from langchain_core.language_models.llms import BaseLLM

def load_llm_ollama(model_name: str, base_url, pipeline_kwargs: Optional[Dict[str, Any]]=None) -> BaseLLM:
    """
    Load large language model from Ollama.

    Args:
        model_name (str): The name of the model to load
        pipeline_kwargs Optional(Dict[str, Any]): The pipeline actions.

    Returns:
        BaseLLM: The loaded language model.
    
    Raises:
        ValueError: If there is an error loading the model
    """
    if pipeline_kwargs is None:
        pipeline_kwargs = {}

    try:
        return Ollama(model=model_name, base_url=base_url, **pipeline_kwargs)
    except Exception as e:
        raise ValueError(f"Error loading model: {e}")
    

model_name = "llama3:instruct"
base_url = "https://localhost:11434"
pipeline_kwargs = {"stop": "<|eot_id|>"}

# Load LLM
llm = load_llm_ollama(model_name, base_url, pipeline_kwargs)

In [51]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

In [52]:
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough(), }
    | prompt
    | llm
    | StrOutputParser()
)

In [53]:
prompt = "What is the purpose of the document?"

for chunk in rag_chain.stream(prompt):
    print(chunk, end="", flush=True)

I don't know. The provided context does not indicate the purpose of a specific document, but rather appears to be code snippets or instructions for using Milvus and LLM RAG Server.

In [None]:
# https://github.com/stephen37/Milvus_demo/blob/main/milvus_rag/rag_milvus_ollama.ipynb