# Knowledge eval using the kubernetes website data

In [5]:
from opsmate.libs.knowledge import (
    Runbook,
    get_runbooks_table,
    DatabaseConnection,
    DocumentIngester,
)
from opsmate.libs.core.types import DocumentIngestion, DocumentIngestionSpec, Metadata
from opsmate.libs.config import config
import pandas as pd
from pydantic import BaseModel
config.embeddings_db_path = "./opsmate-embedding"
config.embedding_registry_name = "openai"
config.embedding_model_name = "text-embedding-3-small"


In [2]:
%%bash

if [ ! -d "website" ]; then
    git clone git@github.com:kubernetes/website.git --depth=1
fi

Now let's import the kubernetes website data into the embedding database

In [6]:
runbook_table = get_runbooks_table()
runbook_table.delete("1 = 1")

ingestion = DocumentIngestion(
    metadata=Metadata(
        name="k8s-concepts",
        description="Kubernetes Concepts",
    ),
    spec=DocumentIngestionSpec(local_path="./website/content/en/docs/concepts/workloads/pods/*.md"),
)

ingester = DocumentIngester()

ingester.document_ingestion(ingestion)


[2m2024-11-15 15:02:59[0m [[32m[1minfo     [0m] [1mbatch ingest runbooks         [0m [36mbatch_size[0m=[35m97[0m


In [7]:
class RunbookChunk(BaseModel):
    uuid: str
    heading: str
    content: str

runbook_table = get_runbooks_table()

sample_runbooks = runbook_table.to_pandas()

sample_chunks = [
    RunbookChunk(
        uuid=row["uuid"],
        heading=row["heading"],
        content=row["content"],
    )
    for idx, row in sample_runbooks.iterrows()
]

sample_chunks[0]


RunbookChunk(uuid='109e520d-5bc7-4bb5-9ea0-9bca5e39e52c', heading='', content='---\nreviewers:\n- verb\n- yujuhong\ntitle: Ephemeral Containers\ncontent_type: concept\nweight: 60\n---  \n<!-- overview -->  \n{{< feature-state state="stable" for_k8s_version="v1.25" >}}  \nThis page provides an overview of ephemeral containers: a special type of container\nthat runs temporarily in an existing {{< glossary_tooltip term_id="pod" >}} to\naccomplish user-initiated actions such as troubleshooting. You use ephemeral\ncontainers to inspect services rather than to build applications.  \n<!-- body -->')

Let's generate some synthetic questions

In [8]:
from typing import List
import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel

client = instructor.from_openai(AsyncOpenAI())


example_questions = [
    "How to create a pod?",
    "How to scale a deployment from 1 replica to 3 replicas?",
]

class QuestionAnswer(BaseModel):
    question: str
    answer: str

class ChunkEval(QuestionAnswer):
    uuid: str
    question_with_context: str

async def generate_evals(runbook: RunbookChunk, n_questions: int, example_questions: List[str]) -> List[ChunkEval]:
    prompt = f"""
Generate `{n_questions}` question-answer pairs about {runbook.heading}. The answer should primarily derived from the information in the runbook content.

<content>
{runbook.content}
</content>

Example questions:
{"\n".join('f - {q}' for q in example_questions)}

Provide a concise and specific answer for each question.
Do not use the exact example questions. Use them only as inspiration for the types of more specific questions to generate.
Do not include answers that are not in the content.
Questions should ask about how to do certain things and the answer should refer to how to do certain things based on the technical knowledge in the runbook.
Stylistically, the questions should resemble what people would ask a RAG-based answer bot on a technical documentation website. So they can be a little informal, messy or scattered.
"""


    def make_context(question: str) -> str:
        return f"""A user asked the following question:
Question: {question}
This is about the following runbook:
Runbook Title: {runbook.heading}
Runbook Content: {runbook.content}
"""

    try:
        pairs = client.chat.completions.create_iterable(
            model="gpt-4o-mini",
            response_model=QuestionAnswer,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.0,
        )

        return [
            ChunkEval(
                uuid=runbook.uuid,
                heading=runbook.heading,
                question=pair.question,
                answer=pair.answer,
                question_with_context=make_context(pair.question),
            )
            async for pair in pairs
        ]
    except Exception as e:
        print(f"Error generating evals: {str(e)}")
        return []

first_chunk_res = await generate_evals(sample_chunks[0], 3, example_questions)
first_chunk_res


[ChunkEval(question='What are ephemeral containers used for in Kubernetes?', answer='Ephemeral containers are used to run temporarily in an existing pod to accomplish user-initiated actions such as troubleshooting.', uuid='109e520d-5bc7-4bb5-9ea0-9bca5e39e52c', question_with_context='A user asked the following question:\nQuestion: What are ephemeral containers used for in Kubernetes?\nThis is about the following runbook:\nRunbook Title: \nRunbook Content: ---\nreviewers:\n- verb\n- yujuhong\ntitle: Ephemeral Containers\ncontent_type: concept\nweight: 60\n---  \n<!-- overview -->  \n{{< feature-state state="stable" for_k8s_version="v1.25" >}}  \nThis page provides an overview of ephemeral containers: a special type of container\nthat runs temporarily in an existing {{< glossary_tooltip term_id="pod" >}} to\naccomplish user-initiated actions such as troubleshooting. You use ephemeral\ncontainers to inspect services rather than to build applications.  \n<!-- body -->\n'),
 ChunkEval(quest

In [9]:
import asyncio

class ChunkProcessingError(Exception):
    pass

async def process_chunk(chunk: RunbookChunk, n_questions: int, example_questions: List[str], semaphore: asyncio.Semaphore) -> List[ChunkEval]:
    async with semaphore:
        try:
            return await generate_evals(chunk, n_questions, example_questions)
        except Exception as e:
            raise ChunkProcessingError(f"Error processing chunk {chunk.id}: {str(e)}") from e


await process_chunk(sample_chunks[0], 3, example_questions, asyncio.Semaphore(1))


[ChunkEval(question='What are ephemeral containers used for in Kubernetes?', answer='Ephemeral containers are used to run temporarily in an existing pod to accomplish user-initiated actions such as troubleshooting.', uuid='109e520d-5bc7-4bb5-9ea0-9bca5e39e52c', question_with_context='A user asked the following question:\nQuestion: What are ephemeral containers used for in Kubernetes?\nThis is about the following runbook:\nRunbook Title: \nRunbook Content: ---\nreviewers:\n- verb\n- yujuhong\ntitle: Ephemeral Containers\ncontent_type: concept\nweight: 60\n---  \n<!-- overview -->  \n{{< feature-state state="stable" for_k8s_version="v1.25" >}}  \nThis page provides an overview of ephemeral containers: a special type of container\nthat runs temporarily in an existing {{< glossary_tooltip term_id="pod" >}} to\naccomplish user-initiated actions such as troubleshooting. You use ephemeral\ncontainers to inspect services rather than to build applications.  \n<!-- body -->\n'),
 ChunkEval(quest

Now let's call `process_chunk` with all chunks to build the full dataset

In [10]:
import json
import structlog
import random

logger = structlog.get_logger()

async def create_synthetic_dataset(
    chunks: List[RunbookChunk],
    n_questions: int,
    example_questions: List[str],
    max_workers: int = 10,
) -> List[ChunkEval]:
    semaphore = asyncio.Semaphore(max_workers)
    tasks = [process_chunk(chunk, n_questions, example_questions, semaphore) for chunk in chunks]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    dataset = []
    for result in results:
        if isinstance(result, ChunkProcessingError):
            print(f"Error processing chunk: {result}")
        elif isinstance(result, list):
            dataset.extend(result)
        else:
            print(f"Unknown result type: {type(result)}")
    return dataset

def save_eval_data(dataset: List[ChunkEval], filename: str):
    with open(filename, "w") as f:
        json.dump([e.model_dump() for e in dataset], f, indent=2)

def save_tf_data(dataset: List[ChunkEval], filename: str):
    df = runbook_table.to_pandas()
    with open(filename, "w") as f:
        for chunk_eval in dataset:
            content = chunk_eval.question
            f.write(json.dumps({
                "query": chunk_eval.question_with_context,
                "relevant_passages": [content]
            }) + "\n")

synthetic_dataset = await create_synthetic_dataset(sample_chunks, 3, example_questions)
random.shuffle(synthetic_dataset)


In [11]:
split_idx = len(synthetic_dataset) // 2
eval_dataset = synthetic_dataset[:split_idx]
ft_dataset = synthetic_dataset[split_idx:]

print(len(synthetic_dataset), len(eval_dataset), len(ft_dataset))
save_eval_data(eval_dataset, "synthetic_eval_dataset.json")
save_tf_data(ft_dataset, "synthetic_ft_dataset.jsonl")

logger.info("Synthetic eval dataset saved",
    dataset_len=len(synthetic_dataset),
    eval_len=len(eval_dataset),
    ft_len=len(ft_dataset),
)


291 145 146
[2m2024-11-15 15:04:25[0m [[32m[1minfo     [0m] [1mSynthetic eval dataset saved  [0m [36mdataset_len[0m=[35m291[0m [36meval_len[0m=[35m145[0m [36mft_len[0m=[35m146[0m


In [33]:
with open("synthetic_eval_dataset.json", "r") as f:
    eval_dataset = json.load(f)

eval_dataset_sample = eval_dataset[:10]

eval_questions = [ChunkEval(**e) for e in eval_dataset_sample]

eval_questions


[ChunkEval(question='Can I delay the scheduling of a Pod, and if so, how?', answer='Yes, you can use Pod Scheduling Readiness to delay scheduling for a Pod until all its scheduling gates are removed.', uuid='e655dc1b-3e0a-48ef-a6d3-0cebf97f6b18', question_with_context="A user asked the following question:\nQuestion: Can I delay the scheduling of a Pod, and if so, how?\nThis is about the following runbook:\nRunbook Title: Pod lifetime\nRunbook Content: Pod lifetimeWhilst a Pod is running, the kubelet is able to restart containers to handle some\nkind of faults. Within a Pod, Kubernetes tracks different container\n[states](#container-states) and determines what action to take to make the Pod\nhealthy again.  \nIn the Kubernetes API, Pods have both a specification and an actual status. The\nstatus for a Pod object consists of a set of [Pod conditions](#pod-conditions).\nYou can also inject [custom readiness information](#pod-readiness-gate) into the\ncondition data for a Pod, if that is u

In [34]:
def run_simple_request(q: ChunkEval, n_return_vals=5):
    results = (
        runbook_table.search(q.question_with_context).select(["uuid"]).limit(n_return_vals).to_list()
    )
    return [str(q.uuid) == str(r["uuid"]) for r in results]


In [35]:
def score(hits):
    n_retrieval_requests = len(hits)
    total_retrievals = sum(len(l) for l in hits)
    true_positives = sum(sum(sublist) for sublist in hits)

    logger.info("Score", n_retrieval_requests=n_retrieval_requests, total_retrievals=total_retrievals, true_positives=true_positives)
    precision = true_positives / total_retrievals if total_retrievals > 0 else 0
    recall = true_positives / n_retrieval_requests if n_retrieval_requests > 0 else 0
    return {"precision": precision, "recall": recall}


In [37]:
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict

def score_simple_search(n_to_retrieve: int) -> Dict[str, float]:
    # parallelize to speed this up 5-10X
    with ThreadPoolExecutor() as executor:
        hits = list(
            executor.map(lambda q: run_simple_request(q, n_to_retrieve), eval_questions)
        )
    return score(hits)

k_to_retrieve = [5, 10, 20, 100]
scores = pd.DataFrame([score_simple_search(n) for n in k_to_retrieve])
scores["n_retrieved"] = k_to_retrieve
scores

[2m2024-11-15 15:26:47[0m [[32m[1minfo     [0m] [1mScore                         [0m [36mn_retrieval_requests[0m=[35m10[0m [36mtotal_retrievals[0m=[35m50[0m [36mtrue_positives[0m=[35m10[0m
[2m2024-11-15 15:26:47[0m [[32m[1minfo     [0m] [1mScore                         [0m [36mn_retrieval_requests[0m=[35m10[0m [36mtotal_retrievals[0m=[35m100[0m [36mtrue_positives[0m=[35m10[0m
[2m2024-11-15 15:26:48[0m [[32m[1minfo     [0m] [1mScore                         [0m [36mn_retrieval_requests[0m=[35m10[0m [36mtotal_retrievals[0m=[35m200[0m [36mtrue_positives[0m=[35m10[0m
[2m2024-11-15 15:26:51[0m [[32m[1minfo     [0m] [1mScore                         [0m [36mn_retrieval_requests[0m=[35m10[0m [36mtotal_retrievals[0m=[35m970[0m [36mtrue_positives[0m=[35m10[0m


Unnamed: 0,precision,recall,n_retrieved
0,0.2,1.0,5
1,0.1,1.0,10
2,0.05,1.0,20
3,0.010309,1.0,100
