In [None]:
%%script echo skipping....
%conda install jupyter pytorch
%pip install 'git+https://github.com/deepset-ai/haystack.git#main' sentence-transformers 'txtai[pipeline-data]' qdrant-haystack gradio

In [None]:
%%script echo skipping.....
!docker run --rm -p 6333:6333 -p 6334:6334 -v $(pwd)/qdrant_storage:/qdrant/storage:z -d qdrant/qdrant
!docker run -d -p 9998:9998 apache/tika:latest

In [None]:
from haystack import Pipeline, Document
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.writers import DocumentWriter
from haystack.components.writers.document_writer import DuplicatePolicy
from haystack.dataclasses import ChatMessage
from qdrant_haystack import QdrantDocumentStore
from qdrant_haystack.retriever import QdrantRetriever
from txtai.pipeline import Textractor

document_store = QdrantDocumentStore(
    ":memory:",#"http://127.0.0.1",
    recreate_index=True,
    return_embedding=True,
    wait_result_from_api=True,
    index='zarathustra-rag2a'
)

document_embedder = SentenceTransformersDocumentEmbedder(
    model_name_or_path="BAAI/llm-embedder",
    prefix="Represent this document for retrieval: "
)
document_embedder.warm_up()

In [None]:
# %%script echo skipping....
documents = []
textract = Textractor(paragraphs=True)
for paragraph in textract("zarathustra-critical-guide.html"):
    if len(paragraph) > 32:
        documents.append(
            Document(
                meta={'name': "Nietzsche's 'Thus Spoke Zarathustra': A Critical Guide"},
                content=paragraph
            )
        )

for paragraph in textract("zarathustra.md"):
    if len(paragraph) > 32:
        documents.append(
            Document(
                meta={'name': "Thus Spoke Zarathustra"},
                content=paragraph
            )
        )

In [None]:
document_writer = DocumentWriter(document_store = document_store)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component(instance=DocumentCleaner(), name="cleaner")
indexing_pipeline.add_component(instance=document_embedder, name="embedder")
indexing_pipeline.add_component(instance=document_writer, name="writer")
indexing_pipeline.connect("cleaner", "embedder")
indexing_pipeline.connect("embedder", "writer")
indexing_pipeline.draw("indexing_pipeline.png")
indexing_pipeline.warm_up()


In [None]:
# %%script echo skipping.......
indexing_pipeline.run(
    {
        "cleaner": {
            "documents": documents
        },
        "writer": {
            "policy": DuplicatePolicy.OVERWRITE
        }
    }
)

In [None]:
retriever = QdrantRetriever(
    document_store=document_store,
    top_k=10
)

text_embedder = SentenceTransformersTextEmbedder(
        model_name_or_path="BAAI/llm-embedder",
        prefix="Represent this query for retrieving relevant documents: "
    )

template = """
Given the following information, follow my instruction.

Context: 
{% for document in documents %}
    {{ document.content }}
{% endfor %}

My Instruction: {{ question }}
"""

prompt_builder = DynamicChatPromptBuilder(runtime_variables=["documents"])


In [None]:
from collections import deque
from time import sleep

class QueueIterator:
    def __init__(self):
        self.queue = deque()

    def add(self, item):
        self.queue.append(item)

    def __iter__(self):
        return self

    def __next__(self):
        retry_countdown = 60
        while retry_countdown > 0:
            popped = self.pop()
            if not popped:
                retry_countdown -= 1
                sleep(10)
            else:
                return popped
        raise StopIteration

    def pop(self):
        if self.queue:
            return self.queue.popleft()
        else:
            return False

In [None]:
qi = QueueIterator()
llm = OpenAIChatGenerator(streaming_callback=lambda chunk: qi.add(chunk.content),api_base_url="", api_key="")

In [None]:
rag_pipeline = Pipeline()
rag_pipeline.add_component("text_embedder", text_embedder)
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.draw("rag_pipeline.png")
rag_pipeline.warm_up()

In [None]:
def send(message, history):
    def really_send_for_real():
        messages = [ChatMessage.from_user(template)]
        response = rag_pipeline.run(
            {
                "text_embedder": {"text": message},
                "prompt_builder": {
                    "template_variables": {"question": message},
                    "prompt_source": messages
                }
            }
        )
    return really_send_for_real
import multiprocessing

def send_and_return(message, history):
    p = multiprocessing.Process(target=send(message, history))
    p.start()
    return qi

# print(send("Who is zarathustra?", None))

In [None]:
# %%script echo skipping....
import gradio as gr


demo = gr.ChatInterface(fn=send_and_return, title="RAG2A")
demo.launch(inline=False)