In [None]:
import os
import asyncio
from typing import List
from redis import Redis
from redisvl.utils.vectorize import HFTextVectorizer
from redisvl.index import SearchIndex, AsyncSearchIndex
from redisvl.query import VectorQuery
from redisvl.redis.utils import array_to_buffer
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
import openai

# ============================================
# CONFIGURATION
# ============================================

# Redis connection settings
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "")
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}"

# OpenAI settings
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
openai.api_key = OPENAI_API_KEY

# Document processing settings
CHUNK_SIZE = 2500
CHUNK_OVERLAP = 0
NUM_RESULTS = 5

# ============================================
# DOCUMENT PROCESSING
# ============================================

def load_and_chunk_document(file_path: str) -> List:
    """
    Load a PDF and split it into chunks
    
    Args:
        file_path: Path to the PDF file
        
    Returns:
        List of document chunks
    """
    print(f"Loading document: {file_path}")
    
    # Initialize the PDF loader
    loader = PyPDFLoader(file_path)
    
    # Create text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP
    )
    
    # Load and split the document
    chunks = loader.load_and_split(text_splitter)
    
    print(f"Created {len(chunks)} chunks")
    return chunks


# ============================================
# EMBEDDING GENERATION
# ============================================

def create_vectorizer():
    """Initialize the HuggingFace vectorizer"""
    return HFTextVectorizer(
        model="sentence-transformers/all-MiniLM-L6-v2",
        dims=384
    )


def generate_embeddings(chunks: List, vectorizer: HFTextVectorizer) -> List:
    """
    Generate vector embeddings for document chunks
    
    Args:
        chunks: List of document chunks
        vectorizer: HFTextVectorizer instance
        
    Returns:
        List of embeddings
    """
    print("Generating embeddings...")
    
    # Extract text content from chunks
    texts = [chunk.page_content for chunk in chunks]
    
    # Generate embeddings
    embeddings = vectorizer.embed_many(texts, as_buffer=False)
    
    print(f"Generated {len(embeddings)} embeddings")
    return embeddings


# ============================================
# REDIS SETUP
# ============================================

def create_redis_schema():
    """Define the Redis schema for document storage"""
    return {
        "index": {
            "name": "redisvl_rag",
            "prefix": "doc"
        },
        "fields": [
            {"name": "chunk_id", "type": "numeric"},
            {"name": "content", "type": "text"},
            {
                "name": "text_embedding",
                "type": "vector",
                "attrs": {
                    "dims": 384,
                    "algorithm": "flat",
                    "distance_metric": "cosine"
                }
            }
        ]
    }


def setup_redis_index(schema: dict, redis_url: str) -> SearchIndex:
    """
    Create and initialize Redis index
    
    Args:
        schema: Redis schema definition
        redis_url: Redis connection URL
        
    Returns:
        SearchIndex instance
    """
    print("Creating Redis index...")
    
    index = SearchIndex.from_dict(schema, redis_url=redis_url)
    index.create(overwrite=True, drop=True)
    
    print("Index created successfully")
    return index


def load_data_to_redis(chunks: List, embeddings: List, index: SearchIndex):
    """
    Load document chunks and embeddings into Redis
    
    Args:
        chunks: List of document chunks
        embeddings: List of embeddings
        index: SearchIndex instance
    """
    print("Loading data into Redis...")
    
    # Prepare data for Redis
    data = [
        {
            'chunk_id': i,
            'content': chunk.page_content,
            'text_embedding': array_to_buffer(embeddings[i], dtype='float32')
        } for i, chunk in enumerate(chunks)
    ]
    
    # Load into Redis
    index.load(data)
    
    print(f"Loaded {len(data)} documents into Redis")


# ============================================
# RAG PIPELINE FUNCTIONS
# ============================================

def embed_query(query: str, vectorizer: HFTextVectorizer):
    """
    Convert user query to vector representation
    
    Args:
        query: User's question
        vectorizer: HFTextVectorizer instance
        
    Returns:
        Query embedding vector
    """
    return vectorizer.embed(query)


async def retrieve_context(
    async_index: AsyncSearchIndex, 
    query_vector, 
    num_results: int = NUM_RESULTS
) -> str:
    """
    Fetch relevant context from Redis using vector search
    
    Args:
        async_index: AsyncSearchIndex instance
        query_vector: Embedded query vector
        num_results: Number of chunks to retrieve
        
    Returns:
        Combined context string
    """
    results = await async_index.query(
        VectorQuery(
            vector=query_vector,
            vector_field_name="text_embedding",
            num_results=num_results,
            return_fields=["content"],
            return_score=True
        )
    )
    
    # Combine retrieved chunks into context
    context = "\n\n".join([result['content'] for result in results])
    
    return context


def promptify(query: str, context: str) -> str:
    """
    Create a prompt for the LLM with context and query
    
    Args:
        query: User's question
        context: Retrieved context from Redis
        
    Returns:
        Formatted prompt string
    """
    return f'''Use the provided context below derived from documents to answer 
    the user's question. If you can't answer the user's question based on the 
    context, do not guess. Respond with "I don't have enough information to 
    answer that question."
    
    Context:
    {context}
    
    Question: {query}
    
    Answer:'''


async def generate_llm_response(query: str, context: str) -> str:
    """
    Generate answer using OpenAI's LLM
    
    Args:
        query: User's question
        context: Retrieved context
        
    Returns:
        LLM-generated answer
    """
    prompt = promptify(query, context)
    
    response = await openai.AsyncClient().chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.2,
        seed=42
    )
    
    return response.choices[0].message.content


async def answer_question(
    async_index: AsyncSearchIndex, 
    query: str, 
    vectorizer: HFTextVectorizer
) -> str:
    """
    Complete RAG pipeline: embed query, retrieve context, generate answer
    
    Args:
        async_index: AsyncSearchIndex instance
        query: User's question
        vectorizer: HFTextVectorizer instance
        
    Returns:
        Final answer from the LLM
    """
    print(f"\nQuery: {query}")
    
    # Step 1: Embed the query
    query_vector = embed_query(query, vectorizer)
    
    # Step 2: Retrieve matching context from Redis
    context = await retrieve_context(async_index, query_vector)
    
    # Step 3: Generate answer using LLM
    answer = await generate_llm_response(query, context)
    
    return answer


# ============================================
# MAIN EXECUTION
# ============================================

async def main():
    """
    Main execution function demonstrating the complete RAG pipeline
    """
    print("=" * 60)
    print("RAG System with Redis and OpenAI")
    print("=" * 60)
    
    # Step 1: Load and chunk the document
    pdf_path = "resources/nke-10k-2023.pdf"  # Update with your PDF path
    chunks = load_and_chunk_document(pdf_path)
    
    # Step 2: Initialize vectorizer and generate embeddings
    vectorizer = create_vectorizer()
    embeddings = generate_embeddings(chunks, vectorizer)
    
    # Step 3: Setup Redis
    schema = create_redis_schema()
    sync_index = setup_redis_index(schema, REDIS_URL)
    
    # Step 4: Load data into Redis
    load_data_to_redis(chunks, embeddings, sync_index)
    
    # Step 5: Create async index for querying
    async_index = AsyncSearchIndex.from_dict(schema, redis_url=REDIS_URL)
    
    # Step 6: Test the RAG system with sample queries
    print("\n" + "=" * 60)
    print("Testing RAG System")
    print("=" * 60)
    
    sample_queries = [
        "What were Nike's profit margins and company performance?",
        "What are the main risks facing Nike's business?",
        "What is Nike's strategy for digital transformation?"
    ]
    
    for query in sample_queries:
        answer = await answer_question(async_index, query, vectorizer)
        print(f"\nAnswer: {answer}")
        print("-" * 60)
    
    print("\nRAG System test complete!")


if __name__ == "__main__":
    # Run the async main function
    asyncio.run(main())