In [None]:
!pip install langchain langchain_fireworks langchain_community beautifulsoup4 google-search-results chromadb langchainhub sentence-transformers langchain-chroma gradio aiolimiter lxml faiss-cpu flashrank

In [2]:
import asyncio
import aiohttp
import os
from langchain import hub
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_fireworks import FireworksEmbeddings, ChatFireworks
from langchain_community.utilities import GoogleSerperAPIWrapper
from langchain.prompts import PromptTemplate
from langchain.schema import Document
import requests
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import sent_tokenize
import re
import io
import time
import sys
import gradio as gr
import asyncio
from typing import List, Tuple, Any
from langchain_community.vectorstores import FAISS
from langchain.schema import Document
import numpy as np
from functools import lru_cache
import faiss
import httpx
from urllib.parse import urlparse
from langchain_core.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.retrievers import ContextualCompressionRetriever
from flashrank import Ranker, RerankRequest
import math

In [3]:
# Set up API clients
os.environ['FIREWORKS_API_KEY'] = 'API_KEY'
os.environ["SERPER_API_KEY"] = 'API_KEY'


# Download NLTK data for sentence tokenization
nltk.download('punkt', quiet=True)

True

In [4]:
# Initialize components
search = GoogleSerperAPIWrapper(k=3)
embeddings = FireworksEmbeddings(model="nomic-ai/nomic-embed-text-v1.5")
llm = ChatFireworks(model="accounts/fireworks/models/llama-v3p1-8b-instruct", temperature=0)
llm_8b = ChatFireworks(model="accounts/fireworks/models/llama-v3p1-8b-instruct", temperature=0)
llm_70b = ChatFireworks(model="accounts/fireworks/models/llama-v3p1-70b-instruct", temperature=0)

# Initialize Flashrank Rankers
ranker_nano = Ranker()

async def scrape_webpage(client, url):
    try:
        response = await client.get(url, timeout=3.0)
        response.raise_for_status()
        text = response.text
        soup = BeautifulSoup(text, 'lxml')
        content = ' '.join(soup.stripped_strings)
        return content[:5000], len(content[:5000])
    except (httpx.RequestError, httpx.TimeoutException) as exc:
        print(f"An error occurred while requesting {url}: {exc}")
    except httpx.HTTPStatusError as exc:
        print(f"Error response {exc.response.status_code} while requesting {url}")
    except Exception as e:
        print(f"Error scraping {url}: {e}")
    return "", 0

async def search_and_scrape(query, num_urls):
    search_results = search.results(query)
    scraped_urls = set()
    full_texts = []

    async with httpx.AsyncClient(timeout=httpx.Timeout(10.0, connect=3.0)) as client:
        tasks = []
        if 'organic' in search_results:
            for result in search_results['organic']:
                url = result.get('link')
                domain = urlparse(url).netloc if url else None
                if url and domain not in scraped_urls and len(tasks) < num_urls:
                    tasks.append(scrape_webpage(client, url))
                    scraped_urls.add(domain)

        results = await asyncio.gather(*tasks, return_exceptions=True)
        for result in results:
            if isinstance(result, tuple) and result[1] > 0:
                full_texts.append(result[0])

    return " ".join(full_texts)

def query_expansion(query, num_expansions):
    expansion_prompt = f"""
    Given the following search query, generate {num_expansions} additional related queries that could help find more comprehensive information on the topic. The queries should be different from each other and explore various aspects of the main query. Provide only the additional queries, numbered 1-{num_expansions}.

    Main query: {query}

    Additional queries:
    """

    response = llm.invoke(expansion_prompt)
    response_text = response.content if hasattr(response, 'content') else str(response)

    expanded_queries = [query]
    for line in response_text.split('\n'):
        if line.strip() and line[0].isdigit():
            expanded_queries.append(line.split('. ', 1)[1].strip())

    return expanded_queries[:num_expansions + 1]

def create_sentence_windows(text, window_size=3):
    sentences = sent_tokenize(text)
    windows = []
    for i in range(len(sentences)):
        window = " ".join(sentences[max(0, i-window_size):min(len(sentences), i+window_size+1)])
        windows.append(window)
    return windows

def generate_hypothetical_document(query):
    hyde_prompt = f"""
    Given the search query below, generate a hypothetical document that would be a perfect match for this query. The document should be concise, containing only 3 sentences of relevant information that directly addresses the query.

    Query: {query}

    Hypothetical Document (3 sentences):
    """

    response = llm.invoke(hyde_prompt)
    return response.content if hasattr(response, 'content') else str(response)

def batch_rerank(ranker, query, documents, batch_size=32):
    all_reranked = []
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i + batch_size]
        passages = [{"id": j, "text": doc.page_content} for j, doc in enumerate(batch, start=i)]
        rerank_request = RerankRequest(query=query, passages=passages)
        reranked_batch = ranker.rerank(rerank_request)
        all_reranked.extend(reranked_batch)
    return all_reranked

def get_hyde_retriever(vectorstores, hyde_embedding, num_docs_to_rerank, num_docs_to_use, rerank_option):
    def retriever(query):
        all_docs = []
        for vectorstore in vectorstores:
            docs = vectorstore.similarity_search_by_vector(hyde_embedding, k=num_docs_to_rerank)
            all_docs.extend(docs)

        unique_docs = []
        seen_content = set()
        for doc in all_docs:
            content = doc.page_content
            if content not in seen_content:
                unique_docs.append(Document(page_content=content))
                seen_content.add(content)

        if rerank_option != "No Rerank":
            # Choose the appropriate ranker based on the rerank_option
            if rerank_option == "Nano":
                ranker = ranker_nano
            else:
                raise ValueError(f"Invalid rerank option: {rerank_option}")

            # Use batch reranking
            reranked_results = batch_rerank(ranker, query, unique_docs)

            # Sort the results by score in descending order
            reranked_results.sort(key=lambda x: x["score"], reverse=True)

            # Create new Document objects with reranked content, using only the top num_docs_to_use
            reranked_docs = [Document(page_content=result["text"]) for result in reranked_results[:num_docs_to_use]]
            return reranked_docs
        else:
            # If not using reranking, just return the top num_docs_to_use documents
            return unique_docs[:num_docs_to_use]
    return retriever

def batch_embed_documents(documents, batch_size=128):
    batched_embeddings = []
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i + batch_size]
        texts = [doc.page_content for doc in batch]
        embeddings_batch = embeddings.embed_documents(texts)
        batched_embeddings.extend(embeddings_batch)
    return batched_embeddings

async def process_query(query, num_expansions, num_urls, num_docs_to_rerank, num_docs_to_use, rerank_option, use_70b_model):
    try:
        start_time = time.time()

        hyde_start = time.time()
        hypothetical_doc = generate_hypothetical_document(query)
        hyde_time = time.time() - hyde_start
        print(f"hypothetical_doc length: {len(hypothetical_doc)}")
        print(f"-----HyDE generation time: {hyde_time:.2f} seconds")

        embed_start = time.time()
        hyde_embedding = embeddings.embed_query(hypothetical_doc)
        embed_time = time.time() - embed_start
        print(f"-----Embedding time: {embed_time:.2f} seconds")

        ext_start = time.time()
        expanded_queries = query_expansion(query, num_expansions)
        ext_time = time.time() - embed_start
        print(f"-----Query expansion time: {embed_time:.2f} seconds")

        scrape_start = time.time()
        all_texts = await asyncio.gather(*[search_and_scrape(eq, num_urls) for eq in expanded_queries])
        scrape_time = time.time() - scrape_start
        print(f"-----Web scraping time: {scrape_time:.2f} seconds")

        combined_text = " ".join(all_texts)
        print(f"Combined text length: {len(combined_text)} characters")

        sentence_windows = create_sentence_windows(combined_text)
        print(f"Number of sentence windows: {len(sentence_windows)}")

        index_documents = [Document(page_content=window) for window in sentence_windows]

        vectorstore_start = time.time()
        vectorstores = []
        for i in range(0, len(index_documents), 256):
            batch = index_documents[i:i + 256]

            # Use batch embedding here
            batch_embeddings = batch_embed_documents(batch)

            texts = [doc.page_content for doc in batch]

            vectorstore = FAISS.from_embeddings(
                embedding=embeddings,
                text_embeddings=list(zip(texts, batch_embeddings))
            )
            vectorstores.append(vectorstore)

        vectorstore_time = time.time() - vectorstore_start
        print(f"-----Vectorstore creation time: {vectorstore_time:.2f} seconds")

        retrieval_start = time.time()
        retriever = get_hyde_retriever(vectorstores, hyde_embedding, num_docs_to_rerank, num_docs_to_use, rerank_option)
        retrieved_docs = retriever(query)
        retrieval_time = time.time() - retrieval_start
        print(f"-----Retrieval{' and reranking' if rerank_option != 'No Rerank' else ''} time: {retrieval_time:.2f} seconds")

        print(f"Number of retrieved{' and reranked' if rerank_option != 'No Rerank' else ''} documents: {len(retrieved_docs)}")

        context_docs = [doc.page_content for doc in retrieved_docs]
        context = "\n\n".join(context_docs)

        total_processing_time = hyde_time + embed_time + scrape_time + vectorstore_time + retrieval_time
        print(f"-----Total processing time before answer generation: {total_processing_time:.2f} seconds")

        answer_start = time.time()
        prompt_template = """
        Use the following context to answer the question. Before answering the question generate a reasoning step. then answer.
        If you cannot answer based on the context, say "I don't have enough information to answer that question."

        Context:
        {context}

        Question: {question}

        Answer:
        """
        prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"])

        # Choose the model based on the use_70b_model parameter
        chosen_llm = llm_70b if use_70b_model else llm_8b

        rag_chain = prompt | chosen_llm | StrOutputParser()
        answer = rag_chain.invoke({"context": context, "question": query})
        answer_time = time.time() - answer_start
        print(f"-----Answer generation time: {answer_time:.2f} seconds")

        print("\n")
        print("-"*120)
        print("Final Answer:\n", answer)
        print("-"*120)

        return answer, context_docs

    except Exception as e:
        print(f"An error occurred: {e}")
        import traceback
        traceback.print_exc()
        return "I'm sorry, but I encountered an error while processing your query. Please try again.", []

def gradio_interface(query, num_expansions, num_urls, num_docs_to_rerank, num_docs_to_use, rerank_option, use_70b_model):
    old_stdout = sys.stdout
    sys.stdout = buffer = io.StringIO()

    answer, context_docs = asyncio.run(process_query(query, num_expansions, num_urls, num_docs_to_rerank, num_docs_to_use, rerank_option, use_70b_model))

    sys.stdout = old_stdout
    captured_output = buffer.getvalue()

    # Process the context to show only 150 characters per document
    truncated_docs = [f"Document {i+1}: {doc[:150]}..." for i, doc in enumerate(context_docs)]
    truncated_context = "\n\n".join(truncated_docs)

    # Add the truncated context to the captured output
    captured_output += f"\n\nContext used for answer generation (first 150 characters of each document, {len(context_docs)} documents in total):\n" + truncated_context

    return captured_output

# Create Gradio interface
iface = gr.Interface(
    fn=gradio_interface,
    inputs=[
        gr.Textbox(label="Enter your query"),
        gr.Slider(minimum=0, maximum=3, value=1, step=1, label="Number of query expansions"),
        gr.Slider(minimum=1, maximum=10, value=3, step=1, label="Number of URLs to scrape per extended query"),
        gr.Slider(minimum=20, maximum=100, value=80, step=1, label="Number of documents to rerank"),
        gr.Slider(minimum=10, maximum=100, value=50, step=1, label="Number of reranked documents to use"),
        gr.Radio(["No Rerank", "Nano"], label="Reranking Option", value="Nano"),
        gr.Radio(["LLaMA3.1 8B", "LLaMA3.1 70B"], label="Question Answering Option", value="LLaMA3.1 8B")
    ],
    outputs="text",
    title="Advanced RAG Query Processing with Flashrank",
    description="Enter a query and adjust parameters to get a detailed answer based on web search and document analysis. Choose from different reranking options."
)

if __name__ == "__main__":
    iface.launch(share=True, debug=True)

ms-marco-TinyBERT-L-2-v2.zip: 100%|██████████| 3.26M/3.26M [00:00<00:00, 31.2MiB/s]


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
Running on public URL: https://dc4880b2e8567a4691.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


Traceback (most recent call last):
  File "<ipython-input-4-304ab3ec0494>", line 164, in process_query
    all_texts = await asyncio.gather(*[search_and_scrape(eq, num_urls) for eq in expanded_queries])
  File "<ipython-input-4-304ab3ec0494>", line 28, in search_and_scrape
    search_results = search.results(query)
  File "/usr/local/lib/python3.10/dist-packages/langchain_community/utilities/google_serper.py", line 62, in results
    return self._google_serper_api_results(
  File "/usr/local/lib/python3.10/dist-packages/langchain_community/utilities/google_serper.py", line 164, in _google_serper_api_results
    response.raise_for_status()
  File "/usr/local/lib/python3.10/dist-packages/requests/models.py", line 1024, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://google.serper.dev/search?q=How+canI+take+are+of+my+dog%3F&gl=us&hl=en&num=3


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://dc4880b2e8567a4691.gradio.live
