In [3]:
%pip install -qq "fastrepl==0.0.17" "langfuse>=1.0.35" langchain chromadb datasets

Note: you may need to restart the kernel to use updated packages.


In [1]:
import fastrepl

In [2]:
from datasets import load_dataset

ds = load_dataset("repllabs/questions_how_to_do_great_work", split="processed")
ds = ds.remove_columns(["model"])
ds = ds.shuffle(seed=12)
ds = ds.select(range(30))
ds = fastrepl.Dataset.from_hf(ds)
ds

fastrepl.Dataset({
    features: ['question'],
    num_rows: 30
})

In [None]:
ENV_HOST = "https://cloud.langfuse.com"
ENV_SECRET_KEY = ""
ENV_PUBLIC_KEY = ""

%env OPENAI_API_KEY=
ENV_OPENAI_API_KEY = ""

from langfuse import Langfuse

langfuse = Langfuse(ENV_PUBLIC_KEY, ENV_SECRET_KEY, ENV_HOST)

In [4]:
DS_NAME = "how-to-do-great-work4"

In [None]:
from langfuse.model import CreateDatasetRequest, CreateDatasetItemRequest

langfuse.create_dataset(CreateDatasetRequest(name=DS_NAME))

for row in ds:
    langfuse.create_dataset_item(
        CreateDatasetItemRequest(
            datasetName=DS_NAME,
            input=row["question"],
        )
    )

langfuse.flush()

In [6]:
langfuse_ds = langfuse.get_dataset(name=DS_NAME)
ds = fastrepl.Dataset.from_langfuse(langfuse_ds)

ds = ds.rename_column("inputs", "question")
ds = ds.remove_column("expected_outputs")
ds

fastrepl.Dataset({
    features: ['_lf_item_index', 'question'],
    num_rows: 30
})

In [7]:
def create_docs(url: str) -> list[str]:
    from langchain.document_loaders import WebBaseLoader

    loader = WebBaseLoader(url)
    data = loader.load()

    from langchain.text_splitter import RecursiveCharacterTextSplitter

    splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
    docs = splitter.split_documents(data)
    return [doc.page_content for doc in docs[1:]]


def create_collection(name: str, docs: list[str]):
    import chromadb
    from chromadb.utils import embedding_functions

    client = chromadb.EphemeralClient()
    collection = client.create_collection(
        name=name,
        get_or_create=True,
        embedding_function=embedding_functions.OpenAIEmbeddingFunction(
            api_key=ENV_OPENAI_API_KEY
        ),
    )

    collection.add(documents=docs, ids=[str(i) for i in range(len(docs))])
    return collection

In [8]:
from typing import Optional, List

from langfuse.model import CreateTrace, InitialSpan, InitialGeneration, Usage

import litellm
from datetime import datetime
from chromadb import Collection


class QA:
    def __init__(self, collection: Collection) -> None:
        self.collection = collection
        self.trace_ids = []
        self.generation_ids = []

    def retrieve_docs(self, question: str, traceId: Optional[str] = None) -> List[str]:
        startRetrieval = datetime.now()
        result = self.collection.query(query_texts=[question], n_results=1)
        contexts = result["documents"][0]
        endRetrieval = datetime.now()

        if traceId is not None:
            langfuse.span(
                InitialSpan(
                    traceId=traceId,
                    name="contexts-retrieval",
                    startTime=startRetrieval,
                    endTime=endRetrieval,
                    input=question,
                    output=contexts,
                )
            )
        return contexts

    def generate(
        self, question: str, contexts: List[str], traceId: Optional[str] = None
    ) -> str:
        model = "gpt-3.5-turbo"
        messages = [
            {
                "role": "system",
                "content": f"""Answer the question using the contexts if needed. Contexts: {contexts}
Do not make things up. Use less than 30 words.""",
            },
            {
                "role": "user",
                "content": f"Question: {question}",
            },
        ]

        startGeneration = datetime.now()
        res = litellm.completion(
            model=model,
            messages=messages,
            api_key=ENV_OPENAI_API_KEY,
        )
        endGeneration = datetime.now()

        if traceId is not None:
            generation = langfuse.generation(
                InitialGeneration(
                    traceId=traceId,
                    name="answer-generation",
                    startTime=startGeneration,
                    endTime=endGeneration,
                    model=model,
                    prompt=messages,
                    completion=res["choices"][0]["message"]["content"],
                    usage=Usage(
                        prompt_tokens=res["usage"]["prompt_tokens"],
                        completion_tokens=res["usage"]["completion_tokens"],
                    ),
                )
            )
            self.generation_ids.append(generation.id)

        return res["choices"][0]["message"]["content"]

    def run(self, question: str, trace=True):
        traceId = (
            langfuse.trace(CreateTrace(name="qa-pg-how-to-do-great-work")).id
            if trace
            else None
        )
        self.trace_ids.append(traceId)

        contexts = self.retrieve_docs(question, traceId)
        response = self.generate(question, contexts, traceId)
        return response

In [9]:
docs = create_docs("http://paulgraham.com/greatwork.html")
collection = create_collection("how-to-do-great-work", docs)

qa = QA(collection)

In [10]:
fn = qa.run
args_list = [(q,) for q in ds["question"]]
result = fastrepl.local_runner(fn=fn, output_feature="answer").run(args_list=args_list)

ds = ds.add_column("answer", result["answer"])
ds

Output()

fastrepl.Dataset({
    features: ['_lf_item_index', 'question', 'answer'],
    num_rows: 30
})

In [11]:
ds = ds.add_column("_lf_trace_id", qa.trace_ids)
ds = ds.add_column("_lf_generation_id", qa.generation_ids)
ds

fastrepl.Dataset({
    features: ['_lf_item_index', 'question', 'answer', '_lf_trace_id', '_lf_generation_id'],
    num_rows: 30
})

In [12]:
fn = qa.retrieve_docs
args_list = [(q,) for q in ds["question"]]
result = fastrepl.local_runner(fn=fn, output_feature="contexts").run(
    args_list=args_list
)

ds.add_column("contexts", result["contexts"])
ds

Output()

fastrepl.Dataset({
    features: ['_lf_item_index', 'question', 'answer', '_lf_trace_id', '_lf_generation_id', 'contexts'],
    num_rows: 30
})

In [13]:
evaluator = fastrepl.RAGEvaluator(
    node=fastrepl.RAGAS(metric="Faithfulness"),
)

result = fastrepl.local_runner(evaluator=evaluator, dataset=ds).run()

Output()

In [14]:
result

fastrepl.Dataset({
    features: ['_lf_item_index', 'question', 'answer', '_lf_trace_id', '_lf_generation_id', 'contexts', 'result'],
    num_rows: 30
})

In [15]:
from langfuse.model import InitialScore


def run_experiment(experiment_name: str, metric_name: str):
    for row in result:
        index: int = row["_lf_item_index"]
        traceId: str = str(row["_lf_trace_id"])
        generationId: str = row["_lf_generation_id"]
        score: float = row["result"]

        langfuse_ds.items[index].link(generationId, experiment_name)
        langfuse.score(
            InitialScore(
                name=metric_name,
                traceId=traceId,
                observationId=generationId,
                value=score,
            )
        )


run_experiment("test_experiment-2", "ragas-faithfulness")
langfuse.flush()