In [1]:
%load_ext dotenv
%dotenv
%load_ext autoreload
%autoreload 2


In [None]:
import json
import wandb
import weave
from copy import deepcopy
import nest_asyncio
nest_asyncio.apply()

In [2]:
from utils import convert_contents_to_text, make_id, render_doc, printmd, chunk_simple, chunk_markdown, load_dataset, chunk_dataset
from retrieval_metrics import RetrievalScorer
from response_metrics import ResponseScorer
from retriever import TfidfSearchEngine, BM25SearchEngine, DenseSearchEngine, Retriever, RetrieverWithReranker, HybridRetrieverWithReranker, VectorStoreSearchEngine
from generation import SimpleResponseGenerator, QueryEnhancedResponseGenerator
from pipeline import SimpleRAGPipeline, QueryEnhancedRAGPipeline
from query_enhancer import QueryEnhancer

* 'fields' has been removed
Device set to use mps
Device set to use mps
Device set to use mps
Device set to use mps


In [3]:
wandb_api = wandb.Api()
docs_root = wandb_api.artifact("parambharat/rag-workshop/documentation:latest", type="dataset").download(root="data/docs")
docs_file =  f"{docs_root}/wandb_docs.jsonl"
with open(docs_file, "r") as f:
    docs = f.readlines()
docs = [json.loads(doc) for doc in docs]


[34m[1mwandb[0m:   6 of 6 files downloaded.  


In [None]:
weave_client = weave.init("rag-workshop")

In [5]:
## A simple Information Retrieval

In [6]:
document_chunks = []
for doc in docs:
    chunks = chunk_simple(doc["content"], chunk_size=500)
    doc_id = make_id(doc["content"])
    for chunk in chunks:
        doc_chunk = deepcopy(doc)
        doc_chunk["chunk"] = chunk
        doc_chunk["text"] = convert_contents_to_text(chunk)
        doc_chunk["chunk_id"] = make_id(chunk)
        doc_chunk["doc_id"] = doc_id
        document_chunks.append(doc_chunk)

In [7]:
tfidf_search_engine = await TfidfSearchEngine().fit(document_chunks)

class TFIDFRetriever(Retriever):
    pass

tfidf_retriever = TFIDFRetriever(search_engine=tfidf_search_engine)
# retrieved_docs = await tfidf_search_engine.search(query="What are Artifacts?", top_k=5)
# for doc in retrieved_docs:
#     render_doc(doc)

In [8]:
# docs_data = [{"document": item["text"]} for item in retrieved_docs]
simple_response_generator = SimpleResponseGenerator()
# response = await simple_response_generator.invoke(
#     query="What are Artifacts?", documents=docs_data
# )
# printmd(response["choices"][0]["message"]["content"])

In [9]:
class TFIDFRAGPipeline(SimpleRAGPipeline):
    pass
tfidf_rag_pipeline = TFIDFRAGPipeline(
    retriever=tfidf_retriever,
    generator=simple_response_generator
)
# response = await tfidf_rag_pipeline.invoke(query="What are Artifacts?",)
# printmd(response["answer"])

In [10]:
## Evaluating the RAG Pipeline



In [None]:
### Evaluating the Retrieval

In [None]:
# with open("data/eval_dataset.jsonl", "r") as f:
#     eval_dataset = [json.loads(line) for line in f.readlines()]
# eval_dataset = weave.Dataset(name="eval_dataset", description="Evaluation dataset for RAG Pipeline", rows=eval_dataset)
# weave.publish(eval_dataset, name="eval_dataset")
# eval_dataset = weave.ref("weave:///parambharat/rag-workshop/object/eval_dataset:DkS8t0EnHiHQAmdmd9M9UzeJSyiOTNIr924v62abT4Q").get()

In [None]:
eval_dataset = weave.ref("weave:///parambharat/rag-workshop/object/eval_dataset:9aTli8u1UJBYQD6aBWnYMUpokMjNUrXjQBspuAWBbYE").get()

print("Number of evaluation samples: ", len(eval_dataset.rows))

In [13]:
retrieval_evaluation = weave.Evaluation(
    name="Retrieval_Evaluation",
    dataset=eval_dataset,
    scorers=[RetrievalScorer(name="retrieval_scorer", description="Retrieval metrics")],
    preprocess_model_input=lambda x: {"query": x["question"], "top_k": 5},
)

In [None]:
tfidf_retrieval_scores = await retrieval_evaluation.evaluate(
    model=tfidf_retriever,
    __weave={"display_name": "TFIDF Retrieval"}
)


In [None]:
### Evaluationg the Response Generation

In [15]:


response_evaluation = weave.Evaluation(
    name="Response_Evaluation",
    dataset=eval_dataset,
    scorers=[ResponseScorer(name="response_scorer", description="Response metrics")],
    preprocess_model_input=lambda x: {"query": x["question"]},
)


In [None]:
tfidf_response_scores = await response_evaluation.evaluate(
    model=tfidf_rag_pipeline,
    __weave={"display_name": "TFIDF RAG Pipeline"}
)

In [None]:
bm25_search_engine = await BM25SearchEngine().fit(document_chunks)

class BM25Retriever(Retriever):
    pass

bm25_retriever = BM25Retriever(search_engine=bm25_search_engine)

# retrieved_docs = await bm25_retriever.invoke(query="What are Artifacts?", top_k=5)

# for doc in retrieved_docs:
#     render_doc(doc)


In [None]:
retrieval_scores = await retrieval_evaluation.evaluate(
    model=bm25_retriever,
    __weave={"display_name": "BM25 Retrieval"}
)

In [20]:

class BM25RAGPipeline(SimpleRAGPipeline):
    pass

bm25_rag_pipeline = BM25RAGPipeline(
    retriever=bm25_retriever,
    generator=simple_response_generator
)

# response = await bm25_rag_pipeline.invoke(query="What are Artifacts?",)

# printmd(response["answer"])

In [None]:
response_scores = await response_evaluation.evaluate(
    model=bm25_rag_pipeline,
    __weave={"display_name": "BM25 RAG Pipeline"}
)

In [22]:
dense_search_engine = DenseSearchEngine()
dense_search_engine = await dense_search_engine.fit(document_chunks)

class DenseRetriever(Retriever):
    pass

dense_retriever = DenseRetriever(search_engine=dense_search_engine)
# retrieved_docs = await dense_retriever.invoke(query="What are Artifacts?", top_k=5)
# for doc in retrieved_docs:
#     render_doc(doc)


In [None]:
retrieval_scores = await retrieval_evaluation.evaluate(
    model=dense_retriever,
    __weave={"display_name": "Dense Retrieval"}
)


In [24]:
class DenseRAGPipeline(SimpleRAGPipeline):
    pass

dense_rag_pipeline = DenseRAGPipeline(
    retriever=dense_retriever,
    generator=simple_response_generator
)

# response = await dense_rag_pipeline.invoke(query="What are Artifacts?",)

# printmd(response["answer"])

In [None]:
response_scores = await response_evaluation.evaluate(
    model=dense_rag_pipeline,
    __weave={"display_name": "Dense RAG Pipeline"}
)

In [26]:

class DenseRerankedRetriever(RetrieverWithReranker):
    pass

dense_reranked_retriever = DenseRerankedRetriever(search_engine=dense_search_engine,)

# retrieved_docs = await dense_reranked_retriever.invoke(query="What are Artifacts?", top_k=10, top_n=5)

# for doc in retrieved_docs:
#     render_doc(doc)


In [None]:
retrieval_scores = await retrieval_evaluation.evaluate(
    model=dense_reranked_retriever,
    __weave={"display_name": "Dense Reranked Retrieval"}
)

In [None]:
class DenseRerankedRAGPipeline(SimpleRAGPipeline):
    pass

dense_reranked_rag_pipeline = DenseRerankedRAGPipeline(
    retriever=dense_reranked_retriever,
    generator=simple_response_generator
)

# response = await dense_reranked_rag_pipeline.invoke(query="What are Artifacts?",)

# printmd(response["answer"])
response_scores = await response_evaluation.evaluate(
    model=dense_reranked_rag_pipeline,
    __weave={"display_name": "Dense Reranked RAG Pipeline"}
)


In [29]:
hybrid_retriever = HybridRetrieverWithReranker(
    sparse_search_engine=tfidf_search_engine,
    dense_search_engine=dense_search_engine
)

# retrieved_docs = await hybrid_retriever.invoke(query="What are Artifacts?", top_k=10, top_n=5)

# for doc in retrieved_docs:
#     render_doc(doc)

In [None]:
hybrid_retrieval_scores = await retrieval_evaluation.evaluate(
    model=hybrid_retriever,
    __weave={"display_name": "Hybrid Retrieval"}
)


In [None]:
class HybridRAGPipeline(SimpleRAGPipeline):
    pass

hybrid_rag_pipeline = HybridRAGPipeline(
    retriever=hybrid_retriever,
    generator=simple_response_generator
)

# response = await hybrid_rag_pipeline.invoke(query="What are Artifacts?",)

# printmd(response["answer"])
hybrid_response_scores = await response_evaluation.evaluate(
    model=hybrid_rag_pipeline,
    __weave={"display_name": "Hybrid RAG Pipeline"}
)


In [None]:
# Structured Chunking

In [None]:
chunked_docs = chunk_markdown(docs[0]["content"], chunk_size=500)
chunked_docs

In [33]:
document_chunks = []
for doc in docs:
    chunks = chunk_markdown(doc["content"], chunk_size=500)
    doc_id = make_id(doc["content"])
    for chunk in chunks:
        doc_chunk = deepcopy(doc)
        doc_chunk["chunk"] = chunk
        doc_chunk["text"] = convert_contents_to_text(chunk)
        doc_chunk["chunk_id"] = make_id(chunk)
        doc_chunk["doc_id"] = doc_id
        document_chunks.append(doc_chunk)

In [34]:
bm25_search_engine = await BM25SearchEngine().fit(document_chunks)
dense_search_engine = await DenseSearchEngine().fit(document_chunks)
hybrid_retriever = HybridRetrieverWithReranker(
    sparse_search_engine=bm25_search_engine,
    dense_search_engine=dense_search_engine
)

# retrieved_docs = await hybrid_retriever.invoke(query="What are Artifacts?", top_k=10, top_n=5)

# for doc in retrieved_docs:
#     render_doc(doc)


In [None]:
retrieval_evaluation = weave.Evaluation(
    name="Retrieval_Evaluation",
    dataset=eval_dataset,
    scorers=[RetrievalScorer(name="retrieval_scorer", description="Retrieval metrics")],
    preprocess_model_input=lambda x: {"query": x["question"], "top_k": 10, "top_n": 5},
)
hybrid_retrieval_scores = await retrieval_evaluation.evaluate(
    model=hybrid_retriever,
    __weave={"display_name": "Hybrid Retrieval With Structured Chunking"}
)

In [36]:

class HybridRAGPipeline(SimpleRAGPipeline):
    pass

hybrid_rag_pipeline = HybridRAGPipeline(
    retriever=hybrid_retriever,
    generator=simple_response_generator)

# response = await hybrid_rag_pipeline.invoke(query="What are Artifacts?",)

# printmd(response["answer"])

In [None]:
hybrid_response_scores = await response_evaluation.evaluate(
    model=hybrid_rag_pipeline,
    __weave={"display_name": "Hybrid RAG Pipeline With Structured Chunking"}
)
hybrid_response_scores


In [None]:
## More data, vector store

In [4]:
full_dataset = load_dataset(docs_root)
chunked_dataset = chunk_dataset(full_dataset, chunk_size=500)

  validate(nb)


In [5]:
vectorstore_search_engine = VectorStoreSearchEngine()
vectorstore_search_engine = await vectorstore_search_engine.fit(chunked_dataset)



In [None]:
# results = await vectorstore_search_engine.search(
#     query="What are Artifacts?",
#     top_k=5,
#     filters="file_type in ('notebook', 'markdown')")
# for doc in results:
#     render_doc(doc)

In [6]:
vectorstore_retriever = RetrieverWithReranker(
    search_engine=vectorstore_search_engine
)

# results = await vectorstore_retriever.invoke(query="What are Artifacts?", top_k=10, top_n=5, filters="file_type in ('notebook', 'markdown')")
# for doc in results:
#     render_doc(doc)

In [7]:
## Query Enhancement



In [8]:
query_enhancer = QueryEnhancer()
results = await query_enhancer.invoke("What are W&B Artifacts?")

In [15]:
query_enhanced_rag_pipeline = QueryEnhancedRAGPipeline(
    query_enhancer=query_enhancer,
    retriever=vectorstore_retriever,
    response_generator=QueryEnhancedResponseGenerator()
)
response = await query_enhanced_rag_pipeline.invoke("What are Artifacts?")
