In [36]:
# import stuff
import numpy as np
import time

from datasets import load_dataset
import dspy
import weaviate

In [38]:
# dspy setup
lm = dspy.LM(
    "openai/gpt-4.1",
    cache=False,
    api_key=os.environ["OPENAI_API_KEY"]
)

dspy.configure(lm=lm, track_usage=True)

lm("say hello")

  PydanticSerializationUnexpectedValue(Expected 9 fields but got 6: Expected `Message` - serialized value may not be as expected [input_value=Message(content='Hello! ...: None}, annotations=[]), input_type=Message])
  PydanticSerializationUnexpectedValue(Expected `StreamingChoices` - serialized value may not be as expected [input_value=Choices(finish_reason='st...ider_specific_fields={}), input_type=Choices])
  return self.__pydantic_serializer__.to_python(


['Hello! ðŸ˜Š How can I help you today?']

In [10]:
class GenerateAnswerFromParameters(dspy.Signature):
    """Answer the question as well as you can."""

    question: str = dspy.InputField(description="The question to answer.")
    answer: str = dspy.OutputField(description="The answer to the question.")

qa_system = dspy.Predict(GenerateAnswerFromParameters)

qa_system(question="What is HyDE?")

  PydanticSerializationUnexpectedValue(Expected 9 fields but got 6: Expected `Message` - serialized value may not be as expected [input_value=Message(content='[[ ## an...: None}, annotations=[]), input_type=Message])
  PydanticSerializationUnexpectedValue(Expected `StreamingChoices` - serialized value may not be as expected [input_value=Choices(finish_reason='st...ider_specific_fields={}), input_type=Choices])
  return self.__pydantic_serializer__.to_python(


Prediction(
    answer='HyDE stands for Hydrogen Evolution (HyDE) and is a software tool designed for the automated identification and assessment of hydrogen bonds and other weak interactions in macromolecular structures, particularly those determined by X-ray crystallography or cryo-EM. Originally developed and used in structural biology and bioinformatics, HyDE analyzes protein-ligand interactions, emphasizing hydrogen bonding, hydrophobic contacts, and other intermolecular forces. The results from HyDE can help in drug design and understanding protein function.\n\nNote: In different scientific or technological contexts, "HyDE" could refer to other things. For example, in chemistry and drug discovery, HyDE may refer to a "Hydrogen bond and Dehydration scoring function" used for protein-ligand interaction assessment. Always consider the subject area to interpret "HyDE" correctly.'
)

In [53]:
# define RAG systems
import base64
from io import BytesIO
from PIL import Image
from typing import Any, Literal
from weaviate.classes.query import Filter

class GenerateAnswer(dspy.Signature):
    """Assess the context and answer the question."""

    question: str = dspy.InputField(description="The question to answer.")
    context: list[str] | list[dspy.Image] = dspy.InputField(description="The context to use to answer the question.")
    answer: str = dspy.OutputField(description="The answer to the question.")

class RAGSystem(dspy.Module):
    def __init__(self, collection: Any, images_or_text: Literal["images", "text"], k: int = 5):
        self.generate_answer = dspy.Predict(GenerateAnswer)
        self.collection = collection
        self.images_or_text = images_or_text
        self.k = k
    def _get_objects(self, question: str) -> list[str] | list[dspy.Image]:
        if self.images_or_text == "images":
            response = self.collection.query.near_text(
                query=question,
                return_properties=["base64_str"],
                limit=self.k
            )
            objects = []
            for o in response.objects:
                b64_str = o.properties["base64_str"]
                decoded_b64 = base64.b64decode(b64_str)
                pil_image = Image.open(BytesIO(decoded_b64))
                objects.append(dspy.Image(pil_image))
            return objects
        elif self.images_or_text == "text":
            response = self.collection.query.hybrid(
                query=question,
                return_properties=["content"],
                limit=self.k
            )
            objects = []
            for o in response.objects:
                objects.append(o.properties["content"])
            return objects
        
    def _fetch_oracle_context(
        self,
        oracle_context_id: str, 
    ) -> str | dspy.Image:
        if self.images_or_text == "images":
            response = self.collection.query.fetch_objects(
                filters=Filter.by_property("dataset_id").like(oracle_context_id),
                return_properties=["base64_str"]
            )
            b64_str = response.objects[0].properties["base64_str"]
            decoded_b64 = base64.b64decode(b64_str)
            pil_image = Image.open(BytesIO(decoded_b64))
            return dspy.Image(pil_image)
            
        elif self.images_or_text == "text":
            response = self.collection.query.fetch_objects(
                filters=Filter.by_property("dataset_id").like(oracle_context_id),
                return_properties=["content"]
            )
            return response.objects[0].properties["content"]

    def __call__(
        self, 
        question: str, 
        oracle_context_id: str = None
    ) -> str:
        if oracle_context_id is None:
            context = self._get_objects(question)
        else:
            context = self._fetch_oracle_context(oracle_context_id)
        return self.generate_answer(question=question, context=context)

In [54]:
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=os.environ["WEAVIATE_URL"],
    auth_credentials=weaviate.auth.AuthApiKey(os.environ["WEAVIATE_API_KEY"])
)

collection = weaviate_client.collections.get("IRPapersImages_Default")

rag_system = RAGSystem(collection, "images")

rag_system(question="What is HyDE?")


            Please make sure to close the connection using `client.close()`.
  rag_system = RAGSystem(collection, "images")


Prediction(
    answer='HyDE stands for Hypothetical Document Embeddings. It is a novel approach for building effective dense retrievers in information retrieval without needing any labeled data for relevance. Instead of relying on annotated examples of which documents are relevant to which queries, HyDE uses large language models (LLMs) like InstructGPT to generate "hypothetical" answers to a queryâ€”i.e., a plausible document that would answer the queryâ€”without requiring this generated document to be factually correct or real.\n\nThe dense retriever then encodes these generated hypothetical documents using an unsupervised contrastive encoder (like Contriever or mContriever), and compares them to real documents encoded in the same way. This similarity search retrieves the most relevant real documents in response to the query. \n\nHyDE has two main steps:\n1. For a given query, prompt the LLM to generate a hypothetical document answering the question.\n2. Use an unsupervised retrieve

In [41]:
# llm as judge
class AssessAlignmentScore(dspy.Signature):
    """You are an expert grader assessing if a system's answer is semantically aligned with the correct answer.
    Only return True if the system answer has essentially the same meaning as the correct answer.
    If the system answer misses key aspects or meaning, return False.
    """

    question: str = dspy.InputField(description="The question asked.")
    system_answer: str = dspy.InputField(description="The answer generated by the system.")
    correct_answer: str = dspy.InputField(description="The reference answer containing the correct and complete information.")
    score: bool = dspy.OutputField(description="True if system_answer is equivalent in meaning to correct_answer, otherwise False.")

judge = dspy.Predict(AssessAlignmentScore)

test_question = "What is HyDE?"
correct_answer = "HyDE stands for Hypothetical Document Embeddings, a technique for improving retrieval in AI systems by generating hypothetical answers and using their embeddings."

# System answer missing key aspect (embeddings)
incorrect_answer = "HyDE is a technique for improving retrieval in AI systems by generating hypothetical answers."
# System answer rewords but covers all key ideas
acceptable_answer = "Hypothetical Document Embeddings (HyDE) is a method to help AI retrieval by creating hypothetical documents as sample answers and using their vector representations."

response = judge(question=test_question, system_answer=incorrect_answer, correct_answer=correct_answer)
print(response)
response = judge(question=test_question, system_answer=acceptable_answer, correct_answer=correct_answer)
print(response)

Prediction(
    score=False
)


  PydanticSerializationUnexpectedValue(Expected 9 fields but got 6: Expected `Message` - serialized value may not be as expected [input_value=Message(content='[[ ## sc...: None}, annotations=[]), input_type=Message])
  PydanticSerializationUnexpectedValue(Expected `StreamingChoices` - serialized value may not be as expected [input_value=Choices(finish_reason='st...ider_specific_fields={}), input_type=Choices])
  return self.__pydantic_serializer__.to_python(


Prediction(
    score=True
)


In [42]:
response.get_lm_usage()

{'openai/gpt-4.1': {'completion_tokens': 12,
  'prompt_tokens': 349,
  'total_tokens': 361,
  'completion_tokens_details': {'accepted_prediction_tokens': 0,
   'audio_tokens': 0,
   'reasoning_tokens': 0,
   'rejected_prediction_tokens': 0,
   'text_tokens': None},
  'prompt_tokens_details': {'audio_tokens': 0,
   'cached_tokens': 0,
   'text_tokens': None,
   'image_tokens': None}}}

In [43]:
# load data

queries = load_dataset("weaviate/irpapers-queries")["train"]

In [None]:

alignment_scores = np.array([], dtype=np.float32)
input_tokens = np.array([], dtype=np.float32)
output_tokens = np.array([], dtype=np.float32)

K = 3

rag_system = RAGSystem(collection, "images", k=5)

start = time.time()
for idx, query in enumerate(queries):
    test_query, ground_truth_answer, oracle_context_id = query["question"], query["answer"], str(query["dataset_id"])
    qa_system_response = rag_system(
        question=test_query,
    )
    usage_dict = qa_system_response.get_lm_usage()["openai/gpt-4.1"]
    input_tokens = np.append(input_tokens, usage_dict["prompt_tokens"])
    output_tokens = np.append(output_tokens, usage_dict["completion_tokens"])

    ensemble_votes = 0
    for judge_predictions in range(K):
        lm_judge_response = judge(
            question=test_query,
            system_answer=qa_system_response.answer,
            correct_answer=ground_truth_answer
        )
        if lm_judge_response.score:
            ensemble_votes += 1
    if ensemble_votes >= K / 2:
        alignment_scores = np.append(alignment_scores, 1)
    else:
        alignment_scores = np.append(alignment_scores, 0)

    if idx % 5 == 4:
        print(f"Processed {idx+1} queries in {time.time() - start} seconds...")
        print("Alignment score running mean:", alignment_scores.mean())
        print("Input tokens running mean:", input_tokens.mean())
        print("Output tokens running mean:", output_tokens.mean())
        
print(alignment_scores.mean())
print(input_tokens.mean())
print(output_tokens.mean())


In [60]:
alignment_scores = np.array(alignment_scores)
input_tokens = np.array(input_tokens)
output_tokens = np.array(output_tokens)

print(alignment_scores.mean())
print(input_tokens.mean())
print(output_tokens.mean())

0.6923076923076923
5199.596153846154
178.1153846153846
