In [2]:
import os
from dotenv import load_dotenv

load_dotenv("../.env")


os.environ["HF_HOME"] = "/home/t/.cache/huggingface"

In [3]:
from nest_asyncio import apply

apply()

In [4]:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.core.schema import NodeWithScore, Node

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# rerank = SentenceTransformerRerank(
#     model="cross-encoder/ms-marco-MiniLM-L-2-v2", top_n=3
# )

In [6]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore, Node


class RetrieverEvent(Event):
    """Result of running retrieval"""

    nodes: list[NodeWithScore]


class RerankEvent(Event):
    """Result of running reranking on retrieved nodes"""

    nodes: list[NodeWithScore]


class SearchEvent(Event):
    query: str

In [7]:
# from llama_index.embeddings.huggingface import HuggingFaceEmbedding

# # loads BAAI/bge-small-en
# # embed_model = HuggingFaceEmbedding()

# # loads BAAI/bge-small-en-v1.5
# embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")

In [8]:
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor.llm_rerank import LLMRerank
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)

from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.node_parser import (
    SentenceSplitter,
    TextSplitter,
    HierarchicalNodeParser,
)
from llama_index.vector_stores.qdrant import QdrantVectorStore

import qdrant_client
from llama_index.core import VectorStoreIndex
from llama_index.core.workflow import Workflow, step, StopEvent, StartEvent, Context
from llama_index.core.node_parser import get_leaf_nodes, get_root_nodes
from llama_index.core import StorageContext
from llama_index.core import VectorStoreIndex
from llama_index.core.retrievers import AutoMergingRetriever

from llama_index.core.indices import vector_store


# from llama_index.llms.groq import Groq
from llama_index.llms.gemini import Gemini
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.gemini import GeminiEmbedding
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core import Settings
from llama_index.core.data_structs import Node
from llama_index.core.response_synthesizers import ResponseMode
from llama_index.core import get_response_synthesizer
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from pprint import pprint
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.core.storage.docstore import SimpleDocumentStore

## Model setting


In [9]:
def print_node(node: Node | NodeWithScore, metadata: bool = True):
    char = 300
    print("id: ", node.id_)
    print("=" * 10, "METADATA", "=" * 10)
    pprint(node.metadata, indent=4)
    print("=" * 10, "TEXT", "=" * 10)
    print(node.text[:char] + f"...({len(node.text[char:])} chars truncated.)")
    print()

In [93]:
from llama_index.llms.groq import Groq

In [94]:
groq_llm = Groq(model='llama-3.1-8b-instant')

In [10]:
# llm = Gemini(model="models/gemini-1.5-pro")
ollama_llm = Ollama(model="qwen2:0.5b")

# embedding_model = GeminiEmbedding()
embedding_model2 = HuggingFaceEmbedding(
    model_name="BAAI/bge-small-en-v1.5", cache_folder=os.environ["HF_HOME"] + "/hub"
)
embedding_model3 = OllamaEmbedding(model_name="all-minilm")

Settings.llm = ollama_llm
Settings.embed_model = embedding_model3

In [11]:
# Settings.llm = llm

In [12]:
def get_retriever(dirname: str):
    client = qdrant_client.QdrantClient(location=":memory:")
    vector_store = QdrantVectorStore(client=client, collection_name="test_store")
    # vector_store = VectorStore()

    transformations = [
        # SentenceSplitter(separator="\n", chunk_overlap=30, chunk_size=150),
        HierarchicalNodeParser.from_defaults()
    ]

    embed_model = OllamaEmbedding(model_name="all-minilm")

    transformations = transformations + [embed_model]

    ingestion_pipeline = IngestionPipeline(transformations=transformations)

    documents = SimpleDirectoryReader(dirname).load_data()
    nodes = ingestion_pipeline.run(documents=documents)
    leaf_nodes = get_leaf_nodes(nodes)
    docstore = SimpleDocumentStore()

    # insert nodes into docstore
    docstore.add_documents(nodes)

    # define storage context (will include vector store by default too)
    storage_context = StorageContext.from_defaults(docstore=docstore)
    # vector_store.add(nodes)
    # storage_context = StorageContext.from_defaults(vector_store=vector_store)
    base_index = VectorStoreIndex(leaf_nodes, storage_context=storage_context)
    base_retriever = base_index.as_retriever(similarity_top_k=3)
    retriever = AutoMergingRetriever(base_retriever, storage_context, verbose=True)

    return base_retriever, retriever

In [13]:
# base_retriever, retriever = get_retriever("data")
# print(retriever)

In [14]:
# nodes = retriever.retrieve("spatial")
# base_nodes = base_retriever.retrieve("spatial")

In [15]:
# print("nodes:")
# print(nodes)
# print()
# print("Base Nodes: ")
# print(base_nodes)

In [16]:
# len(nodes), len(base_nodes)

In [17]:
# from llama_index.core.response.notebook_utils import display_source_node

# for node in nodes:
#     display_source_node(node, source_length=10000)

In [18]:
# for node in base_nodes:
#     display_source_node(node, source_length=10000)

In [19]:
# print_node(nodes[0])

In [20]:
def node_text(node):
    print("id: ", node.id_)
    print("Similarity: ", node.score)
    text = (node.text).strip()
    char = 300
    rest_char_len = len(text[char:])
    print(
        "Text: ",
        (
            text[:char] + f"...({rest_char_len} chars truncated.)"
            if rest_char_len
            else " "
        ),
    )
    print()

In [21]:
# for node in nodes:
#     node_text(node)

In [22]:
# nodes[0].text

In [23]:
# for node in base_nodes:
#     node_text(node)

## Rag development


In [24]:
from dataclasses import dataclass


class ReflectionEvent(Event):
    """Result of Reflection on the model's generated answer"""

    answer: str


@dataclass
class ReflectionOutput:
    requires_regeneration: bool
    reason: str


class ReDoEvent(Event):
    """Result after Reflection to again generate the answer"""

    query: str
    old_answer: str
    reason: str

In [25]:
import json

In [26]:
aa = json.loads('{"requires_regeneration":true, "reason":"no reason"}')
aa

{'requires_regeneration': True, 'reason': 'no reason'}

In [27]:
# aa = ReflectionOutput('{"requires_regeneration":"True", "reason":"no reason"}')

In [28]:
# aa

In [29]:
from llama_index.core import PromptTemplate
import time
import re
from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)
from llama_index.core.base.llms.types import ChatMessage

In [30]:
def time_it(func):
    """
    This decorator measures the execution time of a function.
    """
    import time

    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(
            f"Function '{func.__name__}' took {end_time - start_time:.4f} seconds to execute."
        )
        return result

    return wrapper

In [31]:
def extract_info_from_json_string(json_string):
    # Define a regex pattern to extract "requires_regeneration" and "reason"
    pattern = re.compile(
        r'\{\s*"requires_regeneration":\s*(True|False)\s*,\s*"reason":\s*"([^"]*)"\s*\}',
        re.DOTALL,
    )

    # Search for the pattern in the JSON string
    match = pattern.search(json_string)

    if match:
        requires_regeneration = match.group(1)
        reason = match.group(2)
        return {
            "requires_regeneration": requires_regeneration == "True",
            "reason": reason,
        }
    else:
        # Handle cases where the regex does not match the expected format
        return {
            "requires_regeneration": False,
            "reason": "Unable to parse the provided JSON string.",
        }

In [80]:
# llm = ollama_llm
llm = Gemini(model="models/gemini-1.5-pro")

In [115]:
from json import JSONDecodeError
import timeit

from llama_index.core.prompts import ChatMessage
from llama_index.core.storage import docstore


class RAGWorkflow(Workflow):

    def __init__(self, llm, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # self.ranker = SentenceTransformerRerank(
        #     model="cross-encoder/ms-marco-MiniLM-L-2-v2", top_n=3
        # )
        self.client = None
        self.vector_store = None
        self.llm = llm
        self.llm2 = llm
        self.docstore = None
        self.index = None
        self.storage_context = None
        self.nodes = []
        self.setup()
        self.redo = 0
        self.redo_limit = 3
        self.history = [ChatMessage(role='system', content="you are a helpful assistant.")]

    def setup(self):
        self.client = qdrant_client.QdrantClient(location=":memory:")
        self.vector_store = QdrantVectorStore(
            client=self.client, collection_name="test_store"
        )

    @step(pass_context=True)
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        start_time = time.perf_counter()
        dirname = ev.get("dirname")
        transformations = ev.get("transformations")
        embed_model = ev.get("embed_model")

        if not dirname:
            return None

        # vector_store = VectorStore()
        if not transformations:
            transformations = [
                SentenceSplitter(separator="\n", chunk_overlap=30, chunk_size=150),
                HierarchicalNodeParser.from_defaults(),
            ]

        if not embed_model:
            embed_model = OllamaEmbedding(model_name="all-minilm")

        transformations = transformations + [embed_model]

        ingestion_pipeline = IngestionPipeline(transformations=transformations)

        documents = SimpleDirectoryReader(dirname).load_data()
        nodes = ingestion_pipeline.run(documents=documents)
        leaf_nodes = get_leaf_nodes(nodes)

        # insert nodes into docstore
        if not self.docstore:
            self.docstore = SimpleDocumentStore()

        self.docstore.add_documents(nodes)

        self.storage_context = StorageContext.from_defaults(docstore=self.docstore)

        if not self.index:
            self.index = VectorStoreIndex(
                leaf_nodes, show_progress=True, storage_context=self.storage_context
            )
        else:
            self.index.insert_nodes(leaf_nodes)

        ctx.data["index"] = self.index
        ctx.data["ingestion_pipeline"] = ingestion_pipeline
        ctx.data["vector_store"] = self.vector_store
        ctx.data["storage_context"] = self.storage_context

        print(f"Ingestion Took {time.perf_counter()-start_time} units")
        return StopEvent(result=f"Indexed {len(documents)} documents.")

    def clean_db(self):
        self.setup()
        self.index = None
        self.docstore = None

    @step(pass_context=True)
    async def retrieve(self, ctx: Context, ev: StartEvent) -> RerankEvent | None:
        "Entry point for RAG, triggered by a StartEvent with `query`."
        start_time = time.perf_counter()

        query = ev.get("query")
        if not query:
            return None

        print(f"Query the database with: {query}")

        # store the query in the global context
        ctx.data["query"] = query

        # get the index from the global context
        index = ctx.data.get("index")
        if index is None:
            print("Index is empty, load some documents before querying!")
            return None

        base_retriever = index.as_retriever(similarity_top_k=3)
        retriever = AutoMergingRetriever(
            base_retriever, ctx.data["storage_context"], verbose=True
        )

        nodes = retriever.retrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        print(f"Retrieve Took {time.perf_counter()-start_time} units")

        return RerankEvent(nodes=nodes)

    # @step(pass_context=True)
    # async def rerank(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent:
    #     start_time = time.perf_counter()

    #     print("Reranking...")
    #     print(ctx.data.get("query"), flush=True)
    #     new_nodes = self.ranker.postprocess_nodes(
    #         ev.nodes, query_str=ctx.data.get("query")
    #     )
    #     print(f"Reranked nodes to {len(new_nodes)}")
    #     print(f"Rerank Took {time.perf_counter()-start_time} units")

    #     return RerankEvent(nodes=new_nodes)

    @step(pass_context=True)
    async def synthesize(self, ctx: Context, ev: RerankEvent) -> ReflectionEvent:
        """Return a streaming response using reranked nodes."""
        start_time = time.perf_counter()

        query = ctx.data.get("query")
        context = "\n\n".join(node.text for node in ev.nodes)
        ctx.data["context"] = context
        qa_prompt_tmpl_str = (
            "Context information is below.\n"
            "---------------------\n"
            "{context_str}\n"
            "---------------------\n"
            "Given the context information and not prior knowledge, "
            "follow any given instructions in query."
            "answer the query.\n"
            "Query: {query_str}\n"
            "Answer: "
        )
        qa_prompt_tmpl = PromptTemplate(qa_prompt_tmpl_str)
        prompt = qa_prompt_tmpl.format(query_str=query, context_str=context)
        chatmessage = ChatMessage(role='user', content=prompt)
        response = self.llm.chat(messages=self.history+[chatmessage])
        self.history.append(response.message)
        print("type: ", type(response))
        print(f"Synthesize Took {time.perf_counter()-start_time} units")
        return ReflectionEvent(answer=str(response.message.content))

    @step(pass_context=True)
    async def resynthesize(self, ctx: Context, ev: ReDoEvent) -> ReflectionEvent:
        """Return a streaming response using reranked nodes."""
        start_time = time.perf_counter()

        query = ctx.data.get("query")
        context = ctx.data["context"]

        qa_prompt_tmpl_str = (
            "Context information is below.\n"
            "---------------------\n"
            "{context_str}\n"
            "---------------------\n"
            "Given the context information and not prior knowledge, "
            "follow any given instructions in query."
            "answer the query.\n"
            "Query: {query_str}\n"
            "you already have generated these answers \n"
            "Previous answers: {previous_answer}\n"
            "Previous answers was rejected for the reason: {reason}\n"
            "now try to answer it again.\n"
            "Answer: "
        )
        qa_prompt_tmpl = PromptTemplate(qa_prompt_tmpl_str)
        prompt = qa_prompt_tmpl.format(
            query_str=query,
            context_str=context,
            previous_answer=ev.old_answer,
            reason=ev.reason,
        )
        response = await self.llm.acomplete(prompt)

        print(f"RESynthesize Took {time.perf_counter()-start_time} units")
        return ReflectionEvent(answer=response.text)

    @step(pass_context=True)
    async def reflection(
        self, ctx: Context, ev: ReflectionEvent
    ) -> StopEvent | ReDoEvent:
        ans = ev.answer
        query = ctx.data["query"]

        reflection_prompt = """For the given question : \n
        
        question: "{query}" \n  
        generated answer: "{answer}" \n
        
        does the generated answer, answers the question satisfactorily,
        if the generated answer is not answering the question then return a json (with requires_regeneration:bool and reason:str key)
        with keys as requires_regeneration:bool (True if answer is not statisfying to the query and false if it is a good answer)
        another key is reason:str (reason for the regeneration if regeneration is required else empty string)

        so if the answer is not helpful then return this:
        
        "requires_regeneration":True, "reason": " some reason explaining the requirement of regeneration"
        
        or if the answer is helpful then set requires_regeneration to False with empty reason

       "requires_regeneration":False, "reason":" "
       
       it should be parsable dict to class
       @dataclass
       class ReflectionOutput:
            requires_regeneration:bool
            reason:str
        
        output should be a valid json        
        """
        reflection_tmpl = PromptTemplate(reflection_prompt)

        prompt = reflection_tmpl.format(query=query, answer=ans)
        response = await self.llm2.acomplete(prompt)
        try:
            response = json.loads(response.text)
        except JSONDecodeError:
            response = extract_info_from_json_string(response.text)

        if response.get("requires_regeneration") and self.redo <= self.redo_limit:
            return ReDoEvent(query=query, old_answer=ans, reason=response.get("reason"))
        else:

            return StopEvent(result=ans)

In [116]:
w = RAGWorkflow(llm=groq_llm, timeout=60)

In [117]:
# a = groq_llm.chat(messages=[
#     ChatMessage(role='system', content="you are a helpful assistant"),
#     ChatMessage(role='user',content="how old is sun, just give me the number")
#     ])
# a, type(a)

In [118]:
# draw_all_possible_flows(w)

In [119]:
# Ingest the documents
await w.run(dirname="data2")

Generating embeddings: 0it [00:00, ?it/s]

Ingestion Took 5.3032840879996 units





'Indexed 2 documents.'

In [120]:
# w.clean_db()

In [121]:
# await w.run(dirname="data3")

In [123]:
# Run a query
result = await w.run(query="how to contact tikendra to tell him he is selected for the job")
# async for chunk in result.async_response_gen():
#     print(chunk, end="", flush=True)
print(str(result))

Query the database with: how to contact tikendra to tell he is selected for the job
> Merging 2 nodes into parent node.
> Parent node id: 9fedb3be-9b90-43b1-9385-e4fbdf11311f.
> Parent node text: .
Coursera, GOOGLE•2022
   https://coursera.org/verify/829MQ3JFKEAQ •
DeepLearning.AI : Neural Ne...

> Merging 1 nodes into parent node.
> Parent node id: 4a41279e-2b37-4dd6-bc75-2fe3fb24a634.
> Parent node text: .
Coursera, GOOGLE•2022
   https://coursera.org/verify/829MQ3JFKEAQ •
DeepLearning.AI : Neural Ne...

Retrieved 2 nodes.
Retrieve Took 0.1168908729996474 units
type:  <class 'llama_index.core.base.llms.types.ChatResponse'>
Synthesize Took 1.043699363000087 units
Based on the provided context information, I can suggest a way to contact Tikendra Kumar Sahu.

You can contact Tikendra Kumar Sahu through the following methods:

1. Email: tikendraworks@gmail.com
2. Phone: 9926134994
3. LinkedIn: in/tikendraw (Note: You can send him a LinkedIn message or request to connect)

To inform him t

In [None]:
result

CompletionResponse(text="**Related Work in Video Understanding**\n\nThe provided text focuses on **spatiotemporal features for video analysis** as a key aspect of video understanding research. \n\nHere's a breakdown:\n\n* **Spatiotemporal Features:** These features capture information about both **spatial** (location within a frame) and **temporal** (changes over time) aspects of a video. They are crucial for understanding actions, events, and relationships within videos.\n\n* **Traditional Approaches:** While not explicitly mentioned, the text alludes to traditional methods using **3D convolutions**. These methods directly process video data as volumetric data, applying convolutional filters in both spatial and temporal dimensions.\n\n* **Proposed Approach (Implied):** The text emphasizes the advantages of **decomposing** spatiotemporal convolutions. This suggests the authors are introducing a novel method that separates spatial and temporal processing, potentially leading to:\n    * 

In [40]:
draw_most_recent_execution(w)

workflow_recent_execution.html


In [None]:
from llama_index.core.node_parser import SimpleFileNodeParser
from llama_index.readers.file import FlatReader
from pathlib import Path

dirname = "./data"
documents = SimpleDirectoryReader(dirname).load_data()
index = VectorStoreIndex.from_documents(documents=documents)

In [None]:
retriever = index.as_retriever(similarity_top_k=5)
nodes = retriever.retrieve("what is GAtt")

In [None]:
for node in nodes:
    print(node.text)
    print("*" * 33)

In ICCV , 2011. 5, 8
[21] I. Laptev and T. Lindeberg. Space-time interest points. In
ICCV , 2003. 2
[22] Q. V . Le, W. Y . Zou, S. Y . Yeung, and A. Y . Ng. Learn-
ing hierarchical invariant spatio-temporal features for action
recognition with independent subspace analysis. In CVPR ,
2011. 2
[23] P. Molchanov, X. Yang, S. Gupta, K. Kim, S. Tyree, and
J. Kautz. Online detection and classiﬁcation of dynamic hand
gestures with recurrent 3d convolutional neural network. In
CVPR , 2016. 2
[24] Y . Pan, T. Mei, T. Yao, H. Li, and Y . Rui. Jointly modeling
embedding and translation to bridge video and language. In
CVPR , 2016. 2
[25] Z. Qiu, T. Yao, , and T. Mei. Learning spatio-temporal repre-
sentation with pseudo-3d residual networks. In ICCV , 2017.
1, 2, 4, 7, 8
[26] S. Sadanand and J. Corso. Action bank: A high-level repre-
sentation of activity in video. In CVPR , 2012. 2
[27] P. Scovanner, S. Ali, and M. Shah. A 3-dimensional sift de-
scriptor and its application to action recognition

In [None]:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore

import qdrant_client

client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="demo")

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        # OpenAIEmbedding(),
        embedding_model3,
    ],
    vector_store=vector_store,
)

# Ingest directly into a vector db
pipeline.run(documents=[Document.example()])

# Create your index
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(vector_store)

Metadata length (9) is close to chunk size (25). Resulting chunks are less than 50 tokens. Consider increasing the chunk size or decreasing the size of your metadata to avoid this.


 20%|██        | 1/5 [00:11<00:47, 11.85s/it]

ResourceExhausted: 429 Resource has been exhausted (e.g. check quota).

In [None]:
a = index.as_retriever()

In [None]:
a.retrieve(str_or_query_bundle="hello")

[NodeWithScore(node=TextNode(id_='a7ce82cc-7615-4661-b8ed-c7e1fbcefcb5', embedding=None, metadata={'filename': 'README.md', 'category': 'codebase', 'document_title': '"Unlocking the Secrets of the Language Models: The Phenomenal Power of LLMs"'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='b5f9fa68-6b4b-42e5-917c-0f37e1428b66', node_type=<ObjectType.DOCUMENT: '4'>, metadata={'filename': 'README.md', 'category': 'codebase'}, hash='3183371414f6a23e9a61e11b45ec45f808b148f9973166cfed62226e3505eb05'), <NodeRelationship.PREVIOUS: '2'>: RelatedNodeInfo(node_id='42339ea9-6616-4871-930f-e4a3f264d732', node_type=<ObjectType.TEXT: '1'>, metadata={'filename': 'README.md', 'category': 'codebase'}, hash='f915a0b2115f78f5ea52d166fb73954e93e7986dd9080c0e1605b194df6a50e4'), <NodeRelationship.NEXT: '3'>: RelatedNodeInfo(node_id='2f75f4ba-53b1-4465-9a6b-0df9055f9039', node_type=<ObjectType.TEXT: '1'>, metadata={},