In [None]:
# | default_exp eval

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# |export
from cat_tools.client import SuperCatClient, LLMSettings, LLMSetting
from cheshire_cat_api.config import Config
import weave
from typing import Optional, Any
from pyprojroot import here
import pandas as pd
from weave.scorers import HallucinationFreeScorer, EmbeddingSimilarityScorer
from tqdm.auto import tqdm
from datetime import datetime

In [None]:
# |export
settings = LLMSettings()

In [None]:
# |export
conf_variants = {
    "openai_smallest": settings.openai.model_copy(update={"model_name": "gpt-5-nano"}),
    "openai_best": settings.openai.model_copy(update={"model_name": "gpt-5"}),
    "gemini_smallest": settings.gemini.model_copy(
        update={"model": "gemini-2.5-flash-lite"}
    ),
    "gemini_best": settings.gemini.model_copy(update={"model": "gemini-2.5-pro"}),
    "gemma_smallest": settings.ollama.model_copy(update={"model": "gemma3:1b"}),
    "gemma_best": settings.ollama.model_copy(update={"model": "gemma3:27b"}),
    "qwen_smallest": settings.ollama.model_copy(update={"model": "qwen3:0.6b"}),
    "qwen_best": settings.ollama.model_copy(update={"model": "qwen3:32b"}),
    "deepseek_smallest": settings.ollama.model_copy(
        update={"model": "deepseek-r1:1.5b"}
    ),
    "deepseek_best": settings.ollama.model_copy(update={"model": "deepseek-r1:32b"}),
}

In [None]:
#| export
import re

def remove_think(text):
    # Removes everything between <think> and </think>, including the tags
    return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL)

In [None]:
# |export
class CatModel(weave.Model):
    client: SuperCatClient
    has_declarative_memory: bool = True

    def __init__(
        self, model_name: str, llm_setting, has_declarative_memory:bool = True, client_config: Optional[Config] = None
    ):
        super().__init__(
            name=model_name, client=SuperCatClient(config=client_config or Config()), has_declarative_memory=has_declarative_memory
        )
        self.client.udpate_llm_setting(llm_setting.name, llm_setting.model_dump())

    @weave.op()
    async def predict(self, prompt: str) -> dict:
        response = self.client.send(prompt)
        self.client.wipe_episodic_memory()
        response["model_name"] = self.name
        response["text_clean"] = remove_think(response.get("text", ""))
        response['has_declarative_memory'] = self.has_declarative_memory
        return response

In [None]:
model = CatModel("openai_smallest", conf_variants["openai_smallest"], has_declarative_memory=False)

In [None]:
await model.predict("What is the capital of France?")

 (subsequent messages of this type will be suppressed)


{'user_id': 'user',
 'when': 1756901486.2709503,
 'who': 'AI',
 'text': 'Paris.',
 'image': None,
 'audio': None,
 'type': 'chat',
 'why': {'input': 'What is the capital of France?',
  'intermediate_steps': [],
  'memory': {'episodic': [{'id': 'c443baae-dfc5-48d2-b3d2-a0e959a63079',
     'metadata': {'source': 'user', 'when': 1756730808.924713},
     'page_content': 'What is the capital of France?',
     'type': 'Document',
     'score': 1.0000002}],
   'declarative': [{'id': '315768f2-055b-4cc7-8889-4901227b6978',
     'metadata': {'source': 'user', 'when': 1756801540.6302874},
     'page_content': "La resa del processo (o rapporto di conversione) viene utilizzata come un indicatore chiave di prestazione (KPI) per calcolare la quantità di materia prima necessaria per soddisfare un ordine di prodotto finito, per determinare i costi di produzione e per monitorare l'efficienza complessiva del processo di disidratazione.",
     'type': 'Document',
     'score': 0.77990127}],
   'procedura

In [None]:
# |export
def load_eval_dataset():
    path = here("eval/declarative_memory.csv")
    df = pd.read_csv(path)
    return [
        {
            "id": i,
            "prompt": row["domanda"],
            "input": row["domanda"],
            "context": row["risposta"],
            "target": row["risposta"],
        }
        for i, row in df.iterrows()
    ]


In [None]:
# |export
def repeat_dataset(dataset, n):
    id = 0
    for d in dataset:
        for _ in range(n):
            yield {**d, "id": id, "question_id": d["id"]}
            id += 1
            

In [None]:
list(repeat_dataset(load_eval_dataset(), 3))

[{'id': 0,
  'prompt': "Che cos'è la disidratazione a flusso di aria calda applicata agli alimenti?",
  'input': "Che cos'è la disidratazione a flusso di aria calda applicata agli alimenti?",
  'context': "La disidratazione a flusso di aria calda è un processo tecnologico che consiste nell'applicare calore a un alimento in condizioni controllate. Lo scopo è rimuovere gran parte dell'acqua contenuta al suo interno tramite evaporazione, trasformandola in vapore acqueo che viene poi allontanato.",
  'target': "La disidratazione a flusso di aria calda è un processo tecnologico che consiste nell'applicare calore a un alimento in condizioni controllate. Lo scopo è rimuovere gran parte dell'acqua contenuta al suo interno tramite evaporazione, trasformandola in vapore acqueo che viene poi allontanato.",
  'question_id': 0},
 {'id': 1,
  'prompt': "Che cos'è la disidratazione a flusso di aria calda applicata agli alimenti?",
  'input': "Che cos'è la disidratazione a flusso di aria calda applica

In [None]:
dataset = load_eval_dataset()

In [None]:
# |export
hallucination_scorer = HallucinationFreeScorer(
    model_id="vertex_ai/gemini-2.5-pro",
)


In [None]:
# |export
class CatEmbeddingSimilarityScorer(EmbeddingSimilarityScorer):
    @weave.op
    async def score(self, *, output: str, target: str, **kwargs: Any) -> Any:
        # Ensure the threshold is within the valid range for cosine similarity.
        assert -1 <= self.threshold <= 1, "`threshold` should be between -1 and 1"

        model_embedding, target_embedding = await self._compute_embeddings(
            output["text_clean"], target
        )
        return self._cosine_similarity(model_embedding, target_embedding)


similarity_scorer = CatEmbeddingSimilarityScorer(
    model_id="vertex_ai/gemini-embedding-001",
    threshold=0.8,
)

In [None]:
#| export
def read_sentences():
    path = here("eval/declarative_memory.csv")
    df = pd.read_csv(path)
    return df['risposta'].tolist()

In [None]:
#| export
def prepare_declarative_memory(client: SuperCatClient):
    client.wipe_declarative_memory()
    sentences = read_sentences()
    client.put_sentences(sentences)
    print(f"Added {len(sentences)} sentences to declarative memory")

In [None]:
#|export
async def eval_configs(dataset, n_rep=1, model_confs=conf_variants):
    client = SuperCatClient()
    time = datetime.now().strftime("%m-%d %H:%M")
    eval_name = f"{time} Eval"
    prepare_declarative_memory(client)
    evaluation = weave.Evaluation(
        dataset=list(repeat_dataset(dataset, n_rep)),
        scorers=[hallucination_scorer, similarity_scorer],
        name=eval_name,
    )
    for name, conf in tqdm(model_confs.items(), total=len(model_confs)):
        print(f"Evaluating {name} with memory")
        model = CatModel(name, conf)
        await evaluation.evaluate(model, __weave={"display_name": f"{eval_name} - {name} memory"})
    client.wipe_declarative_memory()
    for name, conf in tqdm(model_confs.items(), total=len(model_confs)):
        print(f"Evaluating {name} without memory")
        model = CatModel(name, conf, has_declarative_memory=False)
        await evaluation.evaluate(model, __weave={"display_name": f"{eval_name} - {name} NO memory"})

In [None]:
weave.init("dev-test")

<weave.trace.weave_client.WeaveClient>

In [None]:
await eval_configs(dataset[:2])