# Boiler plate

In [3]:
from src.adapters import db
from sqlalchemy import select
from src.db.models.document import Chunk

from typing import List
import logging
import json
import asyncio
import instructor
from openai import AsyncOpenAI
from pydantic import BaseModel
import pandas as pd

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

client = instructor.from_openai(AsyncOpenAI())

In [4]:
class QuestionAnswer(BaseModel):
    question: str
    answer: str


class ChunkEval(QuestionAnswer):
    chunk_id: str


class TextChunk(BaseModel):
    id: str
    content: str


class ChunkProcessingError(Exception):
    pass

async def generate_evals(
    chunk: TextChunk, n_questions: int, example_questions: List[str]
) -> List[ChunkEval]:

    prompt = f"""
        Generate `{n_questions}` question-answer pairs about specific public benefit programs. The answers should be derived from information in this content:

        <content>
        {chunk.content}
        </content>

        Example questions:
        {chr(10).join(f'- {q}' for q in example_questions)}

        Provide a concise and specific answer for each question.
        Do not use the exact example questions. Use them only as inspiration for the types of more specific questions to generate.
        Do not include answers that are not in the content.
        Questions should ask about specific (e.g., SNAP, WIC, etc.) public benefit programs (e.g., eligibility, completing SNAP forms) and answers should refer to the public benefit programs' policy and requirements.

        """

    try:
        pairs = client.chat.completions.create_iterable(
            model="gpt-4o-mini",
            response_model=QuestionAnswer,
            messages=[{"role": "user", "content": prompt}],
        )
        return [
            ChunkEval(question=pair.question, answer=pair.answer, chunk_id=chunk.id)
            async for pair in pairs
        ]
    except Exception as e:
        logger.error(f"Error generating evals: {str(e)}")
        return []


async def process_chunk(
    chunk: TextChunk,
    n_questions: int,
    example_questions: List[str],
    semaphore: asyncio.Semaphore,
) -> List[ChunkEval]:
    async with semaphore:
        try:
            return await generate_evals(chunk, n_questions, example_questions)
        except Exception as e:
            logger.error(f"Unexpected error processing chunk {chunk.id}: {str(e)}")
            raise ChunkProcessingError(f"Failed to process chunk {chunk.id}") from e


async def create_synthetic_dataset(
    chunks: List[TextChunk],
    n_questions: int,
    example_questions: List[str],
    max_concurrency: int = 10,
) -> List[ChunkEval]:
    semaphore = asyncio.Semaphore(max_concurrency)
    tasks = [
        process_chunk(chunk, n_questions, example_questions, semaphore)
        for chunk in chunks
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    dataset = []
    for result in results:
        if isinstance(result, ChunkProcessingError):
            logger.error(str(result))
        elif isinstance(result, list):
            dataset.extend(result)
        else:
            logger.error(f"Unexpected result type: {type(result)}")

    return dataset


def save_dataset(dataset: List[ChunkEval], filename: str):
    with open(filename, "w") as f:
        json.dump([chunk_eval.dict() for chunk_eval in dataset], f, indent=2)

# Generate the synthetic data set

In [56]:
with db.PostgresDBClient().get_session() as db_session:
    db_chunks = db_session.execute(select(Chunk)).scalars().all()
chunks = [TextChunk(id=str(db_chunk.id), content=db_chunk.content) for db_chunk in db_chunks]

# Uncomment to limit to subset of data
#chunks = chunks[:10]

n_questions = 2  # number of questions to get in each LLM call
example_questions = [
    "Am I eligible for SNAP if I'm enrolled as a full-time student?",
    "Can I enroll in SNAP concurrently with unemployment insurance?",
]

try:
    # Generate the dataset
    synthetic_dataset = await create_synthetic_dataset(
        chunks, n_questions, example_questions
    )

    # Save the dataset
    save_dataset(synthetic_dataset, "synthetic_eval_questions.json")

    logger.info(f"Generated {len(synthetic_dataset)} ChunkEvals.")
    logger.info("Dataset saved as 'synthetic_eval_questions.json'")
except Exception as e:
    logger.error(f"An error occurred during dataset creation: {str(e)}")

INFO:src.adapters.db.clients.postgres_config:Constructed database configuration
INFO:src.adapters.db.clients.postgres_client:connected to postgres db
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST h

# Inspect the dataset

In [11]:
with open("synthetic_eval_questions.json", 'r') as file:
    synthetic_questions = json.load(file)

# Convert the JSON data to a Pandas DataFrame
df = pd.DataFrame(synthetic_questions)

synthetic_questions[:5]


[{'question': 'What form should a client fill out if they were a homeowner and occupied their home for less than a full year?',
  'answer': 'The client should fill out Form A.',
  'chunk_id': '71ee78c1-1bcd-41e5-b161-a34d5d62ac39'},
 {'question': 'Which form is required for tenants who received public cash assistance last year?',
  'answer': 'Tenants who received public cash assistance last year should fill out Form D.',
  'chunk_id': '71ee78c1-1bcd-41e5-b161-a34d5d62ac39'},
 {'question': 'Are vaccinations covered under Medicare Part D?',
  'answer': 'Some vaccinations are covered under Part D, but some are covered by Part B and would not be included.',
  'chunk_id': '4291b488-9e5a-4978-b38d-16db1600de1a'},
 {'question': 'Who should I contact for questions regarding PACE coverage?',
  'answer': 'Clients should contact PACE Cardholder Services for specific questions about coverage.',
  'chunk_id': '4291b488-9e5a-4978-b38d-16db1600de1a'},
 {'question': 'What are the eligibility requireme

# Evaluate precision and recall

In [76]:
from typing import Dict
from concurrent.futures import ThreadPoolExecutor

from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer("/app/models/multi-qa-mpnet-base-dot-v1")

class EvalQuestion(BaseModel):
    question: str
    answer: str
    chunk_id: str


eval_questions = [EvalQuestion(**question) for question in synthetic_questions]

def run_simple_request(db_session, q: EvalQuestion, n_return_vals=5):

    query_embedding = embedding_model.encode(q.question, show_progress_bar=False)
    k = n_return_vals
    results = db_session.execute(
        select(Chunk)
        .order_by(Chunk.mpnet_embedding.max_inner_product(query_embedding)).limit(k)
    ).scalars().all()

    return [str(q.chunk_id) == str(r.id) for r in results]

def score(hits):
    # This implementation assumes
    n_retrieval_requests = len(hits)
    total_retrievals = sum(len(l) for l in hits)
    true_positives = sum(sum(sublist) for sublist in hits)
    precision = true_positives / total_retrievals if total_retrievals > 0 else 0
    recall = true_positives / n_retrieval_requests if n_retrieval_requests > 0 else 0
    return {"precision": precision, "recall": recall}


def score_simple_search(n_to_retrieve: List[int]) -> Dict[str, float]:
    # parallelize to speed this up 5-10X
    with db.PostgresDBClient().get_session() as db_session:
        with ThreadPoolExecutor() as executor:
            hits = list(
                executor.map(lambda q: run_simple_request(db_session, q, n_to_retrieve), eval_questions)
            )
    return score(hits)


k_to_retrieve = [5, 10, 20]
scores = pd.DataFrame([score_simple_search(n) for n in k_to_retrieve])
scores["n_retrieved"] = k_to_retrieve
scores

INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cpu
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: /app/models/multi-qa-mpnet-base-dot-v1
INFO:src.adapters.db.clients.postgres_config:Constructed database configuration
INFO:src.adapters.db.clients.postgres_client:connected to postgres db
INFO:src.adapters.db.clients.postgres_config:Constructed database configuration
INFO:src.adapters.db.clients.postgres_client:connected to postgres db
INFO:src.adapters.db.clients.postgres_config:Constructed database configuration
INFO:src.adapters.db.clients.postgres_client:connected to postgres db


Unnamed: 0,precision,recall,n_retrieved
0,0.174359,0.871795,5
1,0.091142,0.911422,10
2,0.046445,0.928904,20


# Evaluate re-ranked precision and recall

In [9]:
from concurrent.futures import ThreadPoolExecutor

from sentence_transformers import SentenceTransformer
from typing import Dict
embedding_model = SentenceTransformer("/app/models/multi-qa-mpnet-base-dot-v1")


class EvalQuestion(BaseModel):
    question: str
    answer: str
    chunk_id: str


eval_questions = [EvalQuestion(**question) for question in synthetic_questions]

def score(hits):
    # This implementation assumes
    n_retrieval_requests = len(hits)
    total_retrievals = sum(len(l) for l in hits)
    true_positives = sum(sum(sublist) for sublist in hits)
    precision = true_positives / total_retrievals if total_retrievals > 0 else 0
    recall = true_positives / n_retrieval_requests if n_retrieval_requests > 0 else 0
    return {"precision": precision, "recall": recall}
    
try:
    import cohere
    from diskcache import Cache
    cohere_api_key = "API_KEY_HERE"

    # Use diskcache to reduce re-running in case of error (or addition of new data)
    cache = Cache("./cohere_cache")
    
    def run_reranked_request(db_session, q: EvalQuestion, n_return_vals=5, n_to_rerank=40) -> List[bool]:
        # First, get more results than we need
        query_embedding = embedding_model.encode(q.question, show_progress_bar=False)
        k = n_to_rerank
        initial_results = db_session.execute(
            select(Chunk)
            .order_by(Chunk.mpnet_embedding.max_inner_product(query_embedding)).limit(k)
        ).scalars().all()
        
        # Prepare texts for reranking
        texts = [r.content for r in initial_results]
        
        cache_key = f"{q.question}_{n_return_vals}".replace("?", "")
        # Try to get the result from cache
        cached_result = cache.get(cache_key)
        if cached_result is not None:
            return cached_result
        
        # Rerank using Cohere
        co = cohere.Client(cohere_api_key)
        reranked = co.rerank(
            query=q.question,
            documents=texts,
            top_n=n_return_vals
        )
        
        # Map reranked results back to original IDs
        reranked_ids = [initial_results[r.index].id for r in reranked.results]
        result = [str(q.chunk_id) == str(r) for r in reranked_ids]
        cache.set(cache_key, result)
        return result

    def score_reranked_search(db_session, n_to_retrieve: List[int], n_to_rerank: int = 40) -> Dict[str, float]:
        with ThreadPoolExecutor() as executor:
            hits = list(executor.map(
                lambda q: run_reranked_request(db_session, q, n_to_retrieve, n_to_rerank), 
                eval_questions
            ))
        return score(hits)

    k_to_retrieve = [5, 10, 20]
    with db.PostgresDBClient().get_session() as db_session:
        reranked_scores = pd.DataFrame([score_reranked_search(db_session, n) for n in k_to_retrieve])
    reranked_scores["n_retrieved"] = k_to_retrieve
    print(reranked_scores)
except Exception as e:
    print(f"Could not run reranker.\n{e}")
    print("Ensure COHERE_API_KEY env is set... and cohere library diskcache are installed.")
    print("Connection reset by peer is likely rate limiting from Cohere")

INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cpu
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: /app/models/multi-qa-mpnet-base-dot-v1
INFO:src.adapters.db.clients.postgres_config:Constructed database configuration
INFO:src.adapters.db.clients.postgres_client:connected to postgres db
INFO:httpx:HTTP Request: POST https://api.cohere.com/v1/rerank "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.cohere.com/v1/rerank "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.cohere.com/v1/rerank "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.cohere.com/v1/rerank "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.cohere.com/v1/rerank "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.cohere.com/v1/rerank "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.cohere.com/v1/rerank "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.cohere.com/v1/rerank "HTTP/1.1 200 OK"
INF

   precision    recall  n_retrieved
0   0.170629  0.853147            5
1   0.089744  0.897436           10
2   0.046329  0.926573           20


   precision    recall  n_retrieved
0   0.170629  0.853147            5
1   0.089744  0.897436           10
2   0.046329  0.926573           20

No reranking:
0	0.174359  0.871795	          5
1	0.091142  0.911422	         10
2	0.046445  0.928904	         20