# Knowledge eval using the kubernetes website data

In [5]:
from opsmate.libs.knowledge import (
    Runbook,
    get_runbooks_table,
    DatabaseConnection,
    DocumentIngester,
)
from opsmate.libs.core.types import DocumentIngestion, DocumentIngestionSpec, Metadata
from opsmate.libs.config import config
import pandas as pd
from pydantic import BaseModel
config.embeddings_db_path = "./opsmate-embedding"
config.embedding_registry_name = "openai"
config.embedding_model_name = "text-embedding-3-small"


In [2]:
%%bash

if [ ! -d "website" ]; then
    git clone git@github.com:kubernetes/website.git --depth=1
fi

Now let's import the kubernetes website data into the embedding database

In [38]:
runbook_table = get_runbooks_table()
runbook_table.delete("1 = 1")

ingestion = DocumentIngestion(
    metadata=Metadata(
        name="k8s-concepts",
        description="Kubernetes Concepts",
    ),
    spec=DocumentIngestionSpec(local_path="./website/content/en/docs/concepts/workloads/**/*.md"),
)

ingester = DocumentIngester()

ingester.document_ingestion(ingestion)


[2m2024-11-15 21:40:15[0m [[32m[1minfo     [0m] [1mbatch ingest runbooks         [0m [36mbatch_size[0m=[35m100[0m


[2m2024-11-15 21:40:17[0m [[32m[1minfo     [0m] [1mbatch ingest runbooks         [0m [36mbatch_size[0m=[35m100[0m
[2m2024-11-15 21:40:19[0m [[32m[1minfo     [0m] [1mbatch ingest runbooks         [0m [36mbatch_size[0m=[35m61[0m


In [40]:
class RunbookChunk(BaseModel):
    uuid: str
    heading: str
    content: str

runbook_table = get_runbooks_table()

sample_runbooks = runbook_table.to_pandas()

sample_chunks = [
    RunbookChunk(
        uuid=row["uuid"],
        heading=row["heading"],
        content=row["content"],
    )
    for idx, row in sample_runbooks.iterrows()
]

sample_chunks[0]


RunbookChunk(uuid='67620104-81bb-4054-a69a-5df93a8635f0', heading='', content='---\nreviewers:\n- enisoc\n- erictune\n- foxish\n- janetkuo\n- kow3ns\ntitle: DaemonSet\napi_metadata:\n- apiVersion: "apps/v1"\nkind: "DaemonSet"\ndescription: >-\nA DaemonSet defines Pods that provide node-local facilities. These might be fundamental to the operation of your cluster, such as a networking helper tool, or be part of an add-on.\ncontent_type: concept\nweight: 40\nhide_summary: true # Listed separately in section index\n---  \n<!-- overview -->  \nA _DaemonSet_ ensures that all (or some) Nodes run a copy of a Pod.  As nodes are added to the\ncluster, Pods are added to them.  As nodes are removed from the cluster, those Pods are garbage\ncollected.  Deleting a DaemonSet will clean up the Pods it created.  \nSome typical uses of a DaemonSet are:  \n- running a cluster storage daemon on every node\n- running a logs collection daemon on every node\n- running a node monitoring daemon on every node 

Let's generate some synthetic questions

In [41]:
from typing import List
import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel

client = instructor.from_openai(AsyncOpenAI())


example_questions = [
    "How to create a ephemeral pod?",
    "How can I make sure at least 50% replicas are always running for a deployment?",
]

class QuestionAnswer(BaseModel):
    question: str
    answer: str

class ChunkEval(QuestionAnswer):
    uuid: str
    question_with_context: str

async def generate_evals(runbook: RunbookChunk, n_questions: int, example_questions: List[str]) -> List[ChunkEval]:
    prompt = f"""
Generate `{n_questions}` question-answer pairs about {runbook.heading}. The answer should primarily derived from the information in the runbook content.

<content>
{runbook.content}
</content>

Example questions:
{"\n".join('f - {q}' for q in example_questions)}

Provide a concise and specific answer for each question.
Do not use the exact example questions. Use them only as inspiration for the types of more specific questions to generate.
Do not include answers that are not in the content.
Questions should ask about how to do certain things and the answer should refer to how to do certain things based on the technical knowledge in the runbook.
Answers should be based on the content.
Stylistically, the questions should resemble what people would ask a RAG-based answer bot on a technical documentation website. So they can be a little informal, messy or scattered.
"""


    def make_context(question: str) -> str:
        return f"""A user asked the following question:
Question: {question}
This is about the following runbook:
Runbook Title: {runbook.heading}
Runbook Content: {runbook.content}
"""

    try:
        pairs = client.chat.completions.create_iterable(
            model="gpt-4o-mini",
            response_model=QuestionAnswer,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.0,
        )

        return [
            ChunkEval(
                uuid=runbook.uuid,
                heading=runbook.heading,
                question=pair.question,
                answer=pair.answer,
                question_with_context=make_context(pair.question),
            )
            async for pair in pairs
        ]
    except Exception as e:
        print(f"Error generating evals: {str(e)}")
        return []

first_chunk_res = await generate_evals(sample_chunks[0], 3, example_questions)
first_chunk_res


[ChunkEval(question='What does a DaemonSet do in a Kubernetes cluster?', answer='A DaemonSet ensures that all (or some) Nodes run a copy of a Pod. As nodes are added to the cluster, Pods are added to them, and as nodes are removed, those Pods are garbage collected.', uuid='67620104-81bb-4054-a69a-5df93a8635f0', question_with_context='A user asked the following question:\nQuestion: What does a DaemonSet do in a Kubernetes cluster?\nThis is about the following runbook:\nRunbook Title: \nRunbook Content: ---\nreviewers:\n- enisoc\n- erictune\n- foxish\n- janetkuo\n- kow3ns\ntitle: DaemonSet\napi_metadata:\n- apiVersion: "apps/v1"\nkind: "DaemonSet"\ndescription: >-\nA DaemonSet defines Pods that provide node-local facilities. These might be fundamental to the operation of your cluster, such as a networking helper tool, or be part of an add-on.\ncontent_type: concept\nweight: 40\nhide_summary: true # Listed separately in section index\n---  \n<!-- overview -->  \nA _DaemonSet_ ensures that

In [43]:
import asyncio

class ChunkProcessingError(Exception):
    pass

async def process_chunk(chunk: RunbookChunk, n_questions: int, example_questions: List[str], semaphore: asyncio.Semaphore) -> List[ChunkEval]:
    async with semaphore:
        try:
            return await generate_evals(chunk, n_questions, example_questions)
        except Exception as e:
            raise ChunkProcessingError(f"Error processing chunk {chunk.uuid}: {str(e)}") from e


await process_chunk(sample_chunks[0], 3, example_questions, asyncio.Semaphore(1))


[ChunkEval(question='What does a DaemonSet do in a Kubernetes cluster?', answer='A DaemonSet ensures that all (or some) Nodes run a copy of a Pod, adding Pods as nodes are added to the cluster and garbage collecting them as nodes are removed.', uuid='67620104-81bb-4054-a69a-5df93a8635f0', question_with_context='A user asked the following question:\nQuestion: What does a DaemonSet do in a Kubernetes cluster?\nThis is about the following runbook:\nRunbook Title: \nRunbook Content: ---\nreviewers:\n- enisoc\n- erictune\n- foxish\n- janetkuo\n- kow3ns\ntitle: DaemonSet\napi_metadata:\n- apiVersion: "apps/v1"\nkind: "DaemonSet"\ndescription: >-\nA DaemonSet defines Pods that provide node-local facilities. These might be fundamental to the operation of your cluster, such as a networking helper tool, or be part of an add-on.\ncontent_type: concept\nweight: 40\nhide_summary: true # Listed separately in section index\n---  \n<!-- overview -->  \nA _DaemonSet_ ensures that all (or some) Nodes ru

Now let's call `process_chunk` with all chunks to build the full dataset

In [44]:
import json
import structlog
import random

logger = structlog.get_logger()

async def create_synthetic_dataset(
    chunks: List[RunbookChunk],
    n_questions: int,
    example_questions: List[str],
    max_workers: int = 10,
) -> List[ChunkEval]:
    semaphore = asyncio.Semaphore(max_workers)
    tasks = [process_chunk(chunk, n_questions, example_questions, semaphore) for chunk in chunks]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    dataset = []
    for result in results:
        if isinstance(result, ChunkProcessingError):
            print(f"Error processing chunk: {result}")
        elif isinstance(result, list):
            dataset.extend(result)
        else:
            print(f"Unknown result type: {type(result)}")
    return dataset

def save_eval_data(dataset: List[ChunkEval], filename: str):
    with open(filename, "w") as f:
        json.dump([e.model_dump() for e in dataset], f, indent=2)

def save_tf_data(dataset: List[ChunkEval], filename: str):
    df = runbook_table.to_pandas()
    with open(filename, "w") as f:
        for chunk_eval in dataset:
            content = chunk_eval.question
            f.write(json.dumps({
                "query": chunk_eval.question_with_context,
                "relevant_passages": [content]
            }) + "\n")

synthetic_dataset = await create_synthetic_dataset(sample_chunks, 3, example_questions)
random.shuffle(synthetic_dataset)


In [45]:
split_idx = len(synthetic_dataset) // 2
eval_dataset = synthetic_dataset[:split_idx]
ft_dataset = synthetic_dataset[split_idx:]

print(len(synthetic_dataset), len(eval_dataset), len(ft_dataset))
save_eval_data(eval_dataset, "synthetic_eval_dataset.json")
save_tf_data(ft_dataset, "synthetic_ft_dataset.jsonl")

logger.info("Synthetic eval dataset saved",
    dataset_len=len(synthetic_dataset),
    eval_len=len(eval_dataset),
    ft_len=len(ft_dataset),
)


783 391 392
[2m2024-11-15 21:46:11[0m [[32m[1minfo     [0m] [1mSynthetic eval dataset saved  [0m [36mdataset_len[0m=[35m783[0m [36meval_len[0m=[35m391[0m [36mft_len[0m=[35m392[0m


In [46]:
with open("synthetic_eval_dataset.json", "r") as f:
    eval_dataset = json.load(f)

eval_dataset_sample = eval_dataset[:10]

eval_questions = [ChunkEval(**e) for e in eval_dataset_sample]

eval_questions


[ChunkEval(question='When does the Job controller add terminal conditions in Kubernetes v1.31 and later?', answer='In Kubernetes v1.31 and later, the Job controller adds the terminal conditions `Failed` or `Complete` only after all of the Job Pods are terminated.', uuid='7c22487c-7361-4574-b313-371d70083d04', question_with_context='A user asked the following question:\nQuestion: When does the Job controller add terminal conditions in Kubernetes v1.31 and later?\nThis is about the following runbook:\nRunbook Title: Terminal Job conditions\nRunbook Content: Job termination and cleanupTerminal Job conditionsA Job has two possible terminal states, each of which has a corresponding Job\ncondition:\n* Succeeded:  Job condition `Complete`\n* Failed: Job condition `Failed`  \nJobs fail for the following reasons:\n- The number of Pod failures exceeded the specified `.spec.backoffLimit` in the Job\nspecification. For details, see [Pod backoff failure policy](#pod-backoff-failure-policy).\n- The 

In [47]:
def run_simple_request(q: ChunkEval, n_return_vals=5):
    results = (
        runbook_table.search(q.question_with_context).select(["uuid"]).limit(n_return_vals).to_list()
    )
    return [str(q.uuid) == str(r["uuid"]) for r in results]


In [48]:
def score(hits):
    n_retrieval_requests = len(hits)
    total_retrievals = sum(len(l) for l in hits)
    true_positives = sum(sum(sublist) for sublist in hits)

    logger.info("Score", n_retrieval_requests=n_retrieval_requests, total_retrievals=total_retrievals, true_positives=true_positives)
    precision = true_positives / total_retrievals if total_retrievals > 0 else 0
    recall = true_positives / n_retrieval_requests if n_retrieval_requests > 0 else 0
    return {"precision": precision, "recall": recall}


In [50]:
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict

def score_simple_search(n_to_retrieve: int) -> Dict[str, float]:
    # parallelize to speed this up 5-10X
    with ThreadPoolExecutor() as executor:
        hits = list(
            executor.map(lambda q: run_simple_request(q, n_to_retrieve), eval_questions)
        )
    return score(hits)

k_to_retrieve = [5, 10, 20, 100]
scores = pd.DataFrame([score_simple_search(n) for n in k_to_retrieve])
scores["n_retrieved"] = k_to_retrieve
scores

[[True, False, False, False, False], [True, False, False, False, False], [True, False, False, False, False], [True, False, False, False, False], [True, False, False, False, False], [True, False, False, False, False], [True, False, False, False, False], [True, False, False, False, False], [True, False, False, False, False], [True, False, False, False, False]]
[2m2024-11-15 21:47:31[0m [[32m[1minfo     [0m] [1mScore                         [0m [36mn_retrieval_requests[0m=[35m10[0m [36mtotal_retrievals[0m=[35m50[0m [36mtrue_positives[0m=[35m10[0m
[[True, False, False, False, False, False, False, False, False, False], [True, False, False, False, False, False, False, False, False, False], [True, False, False, False, False, False, False, False, False, False], [True, False, False, False, False, False, False, False, False, False], [True, False, False, False, False, False, False, False, False, False], [True, False, False, False, False, False, False, False, False, False], [Tru

Unnamed: 0,precision,recall,n_retrieved
0,0.2,1.0,5
1,0.1,1.0,10
2,0.05,1.0,20
3,0.01,1.0,100
