## Agentic RAG System with LangChain & SingleStore Database

### This tutorial will guide you through creating an intelligent agentic system that:

#### Overview
- Fetches Wikipedia content
- Stores it as vectors in SingleStore
- Uses LangChain agents to provide contextually rich responses
- Implements retrieval-augmented generation (RAG)

#### Prerequisites
- Python 3.8+
- SingleStore account - Create a database.
- OpenAI API key

### Step 1: Install Required Packages

In [9]:
!pip install langchain-core langchain-openai langchain-community langchain --quiet
!pip install singlestoredb --quiet
!pip install wikipedia-api --quiet
!pip install tiktoken --quiet
!pip install numpy pandas --quiet

### Step 2: Set Up Credentials and Imports

In [10]:
import os
import warnings
warnings.filterwarnings('ignore')

In [12]:
os.environ['OPENAI_API_KEY'] = 'Add Your OpenAI API Key'
SINGLESTORE_HOST = 'Add Your Singlestore Host'
SINGLESTORE_PORT = 3306
SINGLESTORE_USER = 'admin'
SINGLESTORE_PASSWORD = 'Add Your Password'
SINGLESTORE_DATABASE = 'Leema'

In [13]:
import singlestoredb as s2
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain.agents import initialize_agent, Tool, AgentType
from langchain.memory import ConversationBufferMemory
import wikipediaapi
import numpy as np
from typing import List

### Step 3: Set Up SingleStore Connection

In [14]:
def create_singlestore_connection():
    try:
        connection = s2.connect(
            host=SINGLESTORE_HOST,
            port=SINGLESTORE_PORT,
            user=SINGLESTORE_USER,
            password=SINGLESTORE_PASSWORD,
            database=SINGLESTORE_DATABASE
        )
        print("✅ Successfully connected to SingleStore!")
        return connection
    except Exception as e:
        print(f"❌ Error connecting to SingleStore: {e}")
        return None

# Test connection
conn = create_singlestore_connection()

✅ Successfully connected to SingleStore!


### Step 4: Create Vector Table in SingleStore

In [15]:
def create_vector_table(connection):
    """Create a table for storing vectors and metadata"""
    
    create_table_query = """
    CREATE TABLE IF NOT EXISTS wikipedia_vectors (
        id INT AUTO_INCREMENT PRIMARY KEY,
        content TEXT,
        title VARCHAR(500),
        url VARCHAR(1000),
        chunk_index INT,
        embedding BLOB,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        INDEX(title),
        INDEX(chunk_index)
    );
    """
    
    try:
        with connection.cursor() as cursor:
            cursor.execute(create_table_query)
            connection.commit()
        print("✅ Vector table created successfully!")
    except Exception as e:
        print(f"❌ Error creating table: {e}")

# Create the table
if conn:
    create_vector_table(conn)

✅ Vector table created successfully!


### Step 5: Wikipedia Content Fetcher

In [16]:
class WikipediaFetcher:
    def __init__(self):
        self.wiki = wikipediaapi.Wikipedia(
            language='en',
            extract_format=wikipediaapi.ExtractFormat.WIKI,
            user_agent='LangChain-Tutorial/1.0'
        )
    
    def fetch_page_content(self, page_title: str) -> dict:
        """Fetch content from a Wikipedia page"""
        try:
            page = self.wiki.page(page_title)
            
            if not page.exists():
                print(f"❌ Page '{page_title}' does not exist")
                return None
            
            return {
                'title': page.title,
                'content': page.text,
                'url': page.fullurl,
                'summary': page.summary[:500] + "..." if len(page.summary) > 500 else page.summary
            }
        except Exception as e:
            print(f"❌ Error fetching page '{page_title}': {e}")
            return None
    
    def fetch_multiple_pages(self, page_titles: List[str]) -> List[dict]:
        """Fetch multiple Wikipedia pages"""
        pages = []
        for title in page_titles:
            print(f"📖 Fetching: {title}")
            page_data = self.fetch_page_content(title)
            if page_data:
                pages.append(page_data)
        return pages

In [19]:
wiki_fetcher = WikipediaFetcher()

# Define some interesting topics to fetch
topics = [
    "Retrieval-augmented generation",
    "Machine Learning",
    "Natural Language Processing",
    "Deep Learning",
    "Computer Vision",
    "database"
]

In [20]:
print("🔄 Fetching Wikipedia content...")
wikipedia_pages = wiki_fetcher.fetch_multiple_pages(topics)
print(f"✅ Fetched {len(wikipedia_pages)} pages successfully!")

🔄 Fetching Wikipedia content...
📖 Fetching: Retrieval-augmented generation
📖 Fetching: Machine Learning
📖 Fetching: Natural Language Processing
📖 Fetching: Deep Learning
📖 Fetching: Computer Vision
📖 Fetching: database
✅ Fetched 6 pages successfully!


### Step 6: Text Chunking and Embedding

In [21]:
class DocumentProcessor:
    def __init__(self):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len,
        )
        self.embeddings = OpenAIEmbeddings()
    
    def process_documents(self, pages: List[dict]) -> List[Document]:
        """Convert Wikipedia pages to LangChain documents and chunk them"""
        documents = []
        
        for page in pages:
            # Create LangChain Document
            doc = Document(
                page_content=page['content'],
                metadata={
                    'title': page['title'],
                    'url': page['url'],
                    'summary': page.get('summary', '')
                }
            )
            
            # Split into chunks
            chunks = self.text_splitter.split_documents([doc])
            
            # Add chunk index to metadata
            for i, chunk in enumerate(chunks):
                chunk.metadata['chunk_index'] = i
                documents.append(chunk)
        
        return documents
    
    def create_embeddings(self, documents: List[Document]) -> List[List[float]]:
        """Create embeddings for all document chunks"""
        texts = [doc.page_content for doc in documents]
        embeddings = self.embeddings.embed_documents(texts)
        return embeddings

# Process documents
processor = DocumentProcessor()
print("🔄 Processing and chunking documents...")
documents = processor.process_documents(wikipedia_pages)
print(f"✅ Created {len(documents)} document chunks")

print("🔄 Creating embeddings...")
embeddings = processor.create_embeddings(documents)
print(f"✅ Created {len(embeddings)} embeddings")

🔄 Processing and chunking documents...
✅ Created 406 document chunks
🔄 Creating embeddings...
✅ Created 406 embeddings


### Step 7: Store Vectors in SingleStore

In [22]:
def store_vectors_in_singlestore(connection, documents: List[Document], embeddings: List[List[float]]):
    """Store document chunks and their embeddings in SingleStore"""
    
    insert_query = """
    INSERT INTO wikipedia_vectors 
    (content, title, url, chunk_index, embedding) 
    VALUES (%s, %s, %s, %s, %s)
    """
    
    try:
        with connection.cursor() as cursor:
            for doc, embedding in zip(documents, embeddings):
                # Convert embedding to bytes for storage
                embedding_bytes = np.array(embedding, dtype=np.float32).tobytes()
                
                cursor.execute(insert_query, (
                    doc.page_content,
                    doc.metadata['title'],
                    doc.metadata['url'],
                    doc.metadata['chunk_index'],
                    embedding_bytes
                ))
            
            connection.commit()
        
        print(f"✅ Successfully stored {len(documents)} vectors in SingleStore!")
        
    except Exception as e:
        print(f"❌ Error storing vectors: {e}")

# Store vectors
if conn:
    store_vectors_in_singlestore(conn, documents, embeddings)

✅ Successfully stored 406 vectors in SingleStore!


### Step 8: Create Vector Retriever

In [23]:
class SingleStoreRetriever:
    def __init__(self, connection, embeddings_model):
        self.connection = connection
        self.embeddings = embeddings_model
    
    def similarity_search(self, query: str, k: int = 5) -> List[Document]:
        """Perform similarity search using cosine similarity"""
        
        # Get query embedding
        query_embedding = self.embeddings.embed_query(query)
        query_bytes = np.array(query_embedding, dtype=np.float32).tobytes()
        
        # SQL query for cosine similarity
        search_query = """
        SELECT content, title, url, chunk_index, embedding,
               DOT_PRODUCT(embedding, %s) / 
               (SQRT(DOT_PRODUCT(embedding, embedding)) * SQRT(DOT_PRODUCT(%s, %s))) as similarity
        FROM wikipedia_vectors
        ORDER BY similarity DESC
        LIMIT %s
        """
        
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(search_query, (query_bytes, query_bytes, query_bytes, k))
                results = cursor.fetchall()
                
                documents = []
                for row in results:
                    doc = Document(
                        page_content=row[0],
                        metadata={
                            'title': row[1],
                            'url': row[2],
                            'chunk_index': row[3],
                            'similarity': float(row[5])
                        }
                    )
                    documents.append(doc)
                
                return documents
        
        except Exception as e:
            print(f"❌ Error in similarity search: {e}")
            return []

# Create retriever
retriever = SingleStoreRetriever(conn, processor.embeddings)

# Test retrieval
test_query = "What is retrieval augmented generation?"
test_results = retriever.similarity_search(test_query, k=3)
print(f"\n🔍 Test query: '{test_query}'")
print(f"📊 Found {len(test_results)} relevant chunks:")
for i, doc in enumerate(test_results):
    print(f"  {i+1}. {doc.metadata['title']} (similarity: {doc.metadata['similarity']:.3f})")
    print(f"     {doc.page_content[:100]}...")


🔍 Test query: 'What is retrieval augmented generation?'
📊 Found 3 relevant chunks:
  1. Retrieval-augmented generation (similarity: 0.889)
     Retrieval-augmented generation (RAG) is a technique that enables large language models (LLMs) to ret...
  2. Retrieval-augmented generation (similarity: 0.871)
     Retrieval-Augmented Generation (RAG) enhances large language models (LLMs) by incorporating an infor...
  3. Retrieval-augmented generation (similarity: 0.842)
     the model’s external knowledge base with the updated information" (augmentation). By dynamically int...


### Step 9: Create LangChain Agent with Tools

In [24]:
from langchain.agents import initialize_agent, Tool, AgentType
from langchain.memory import ConversationBufferMemory
from langchain.schema import BaseRetriever

class CustomRetriever(BaseRetriever):
    """Custom retriever that works with our SingleStore implementation"""
    
    def __init__(self, singlestore_retriever):
        self.singlestore_retriever = singlestore_retriever
    
    def _get_relevant_documents(self, query: str) -> List[Document]:
        """Retrieve relevant documents"""
        return self.singlestore_retriever.similarity_search(query, k=5)
    
    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        """Async version of document retrieval"""
        return self._get_relevant_documents(query)

class WikipediaRAGAgent:
    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm
        self.memory = ConversationBufferMemory(
            memory_key="chat_history", 
            return_messages=True
        )
    
    def search_knowledge_base(self, query: str) -> str:
        """Search the Wikipedia knowledge base"""
        docs = self.retriever.similarity_search(query, k=5)
        
        if not docs:
            return "No relevant information found in the knowledge base."
        
        # Combine the most relevant chunks
        context = "\n\n".join([
            f"From '{doc.metadata['title']}':\n{doc.page_content}"
            for doc in docs[:3]  # Use top 3 results
        ])
        
        return context
    
    def answer_with_context(self, query: str) -> str:
        """Answer questions using retrieved context"""
        # Get relevant context
        context = self.search_knowledge_base(query)
        
        if "No relevant information found" in context:
            return "I don't have enough information in my knowledge base to answer that question."
        
        # Create a prompt with context
        prompt = f"""
        Based on the following context from Wikipedia, please answer the question.
        
        Context:
        {context}
        
        Question: {query}
        
        Answer based on the context provided:
        """
        
        try:
            response = self.llm.invoke(prompt)
            return response.content if hasattr(response, 'content') else str(response)
        except Exception as e:
            return f"Error generating response: {e}"
    
    def create_agent(self):
        """Create the LangChain agent with tools"""
        
        # Define tools
        tools = [
            Tool(
                name="Wikipedia_Knowledge_Base",
                func=self.search_knowledge_base,
                description="Search the Wikipedia knowledge base for information about AI, retrieval augmented generation, ML, NLP, Computer Vision, Deep Learning, and database. Use this when you need factual information about these topics."
            ),
            Tool(
                name="Answer_With_Context",
                func=self.answer_with_context,
                description="Answer questions using the Wikipedia knowledge base with proper context. Use this for detailed explanations and comprehensive answers."
            )
        ]
        
        # Create agent
        agent = initialize_agent(
            tools=tools,
            llm=self.llm,
            agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
            memory=self.memory,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=3,
            early_stopping_method="generate"
        )
        
        return agent

# Initialize the LLM and agent
llm = ChatOpenAI(temperature=0.7, model="gpt-3.5-turbo")
rag_agent = WikipediaRAGAgent(retriever, llm)
agent = rag_agent.create_agent()

print("✅ RAG Agent created successfully!")

✅ RAG Agent created successfully!


  self.memory = ConversationBufferMemory(
  agent = initialize_agent(


### Step 10: Interactive Question-Answering System

In [25]:
def ask_agent(question: str):
    """Ask a question to the RAG agent"""
    try:
        response = agent.invoke({"input": question})
        return response.get("output", "No response generated")
    except Exception as e:
        print(f"Agent error: {e}")
        # Fallback to direct answer
        return rag_agent.answer_with_context(question)

def ask_direct_question(question: str):
    """Ask a question directly without agent (simpler approach)"""
    return rag_agent.answer_with_context(question)

# Test the agent with various questions
test_questions = [
    "What is the difference between machine learning and retrieval augmented generation?",
    "How does natural language processing work?",
    "What are the main applications of a database?",
    "Can you explain what artificial intelligence is?",
    "What role does retrieval augmented generation play in AI?"
]

print("🤖 Testing the RAG System:")
print("=" * 50)

for question in test_questions:
    print(f"\n❓ Question: {question}")
    print("🤖 Direct Answer:")
    response = ask_direct_question(question)
    print(response)
    print("-" * 50)

🤖 Testing the RAG System:

❓ Question: What is the difference between machine learning and retrieval augmented generation?
🤖 Direct Answer:
The main difference between machine learning and retrieval-augmented generation is that retrieval-augmented generation enhances large language models (LLMs) by incorporating an information-retrieval mechanism that allows models to access and utilize additional data beyond their original training set. This means that retrieval-augmented generation not only relies on the training data but also retrieves and incorporates new information from external sources to generate more accurate and contextually relevant responses. In contrast, traditional machine learning models typically rely solely on the data they were trained on and do not have the capability to dynamically integrate new information from external sources.
--------------------------------------------------

❓ Question: How does natural language processing work?
🤖 Direct Answer:
Natural langua