## Automate prompt engineering with DSPy and Haystack
---

In this example, we will take a look at how we can optimize and automate the prompt engineering process using `Haystack` (to create our generative AI pipeline) and `DSPy` (to optimize our prompts and LM weights).

In [None]:
# For each component, it is essential to know the names of the input and the output. There are several components for various steps
# of the generative AI pipeline that are provided out of the box. This includes components for document stores, embedders, prompt
# builders and generators. You can also build your own component.
!pip install -Uq pip
!pip install -Uq haystack
!pip install -Uq sentence-transformers
!pip install -Uq amazon-bedrock-haystack

In [None]:
# import libraries
import os
import json
import boto3
import pickle
import logging
import tempfile
import pandas as pd
import globals as g
from typing import List

In [None]:
# Set a logger
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

#### Load the existing haystack pipeline
---

In [None]:
# Initialize the S3 client
s3_client = boto3.client('s3')

# Create temporary file and download pipeline
with tempfile.NamedTemporaryFile(mode='wb', suffix='.yml', delete=False) as tmp_file:
    # Download from the same location where we uploaded
    s3_client.download_fileobj(
        g.HAYSTACK_PIPELINE_BUCKET, 
        g.HAYSTACK_PIPELINE_KEY, 
        tmp_file
    )
    logger.info(f"Downloaded the haystack pipeline from {g.HAYSTACK_PIPELINE_BUCKET}/{g.HAYSTACK_PIPELINE_KEY} to {tmp_file.name}")
    tmp_file_path = tmp_file.name

In [None]:
# Download and load document store
doc_store_key = "pipelines/document_store/documents.json"
with tempfile.NamedTemporaryFile(mode='wb', suffix='.pkl', delete=False) as tmp_file:
    s3_client.download_fileobj(
        g.HAYSTACK_PIPELINE_BUCKET, 
        doc_store_key, 
        tmp_file
    )
    print(f"Downloaded the document store from {g.HAYSTACK_PIPELINE_BUCKET}/{doc_store_key} to {tmp_file.name}")
    docstore_tmp_path = tmp_file.name

In [None]:
from haystack import Pipeline
# Now we will load the pipeline from the temporary file path
with open(tmp_file_path, 'r') as file:
    loaded_pipeline = Pipeline.load(file)
    print(f"Loaded the haystack pipeline from {tmp_file_path}")

In [None]:
# Now we will clean up the temporary file path and then see the contents of the pipeline
# that we had saved to s3
os.remove(tmp_file_path)
logger.info("Loaded Pipeline Structure:")
loaded_pipeline.show()

In [None]:
# load the documents stored earlier and get those documents in the doc store
import json
from haystack_integrations.document_stores.chroma import ChromaDocumentStore

# Load the documents from the JSON file
with open(docstore_tmp_path, 'r') as f:
    documents_dicts = json.load(f)

from haystack import Document
import numpy as np

# Convert embeddings back to NumPy arrays
for doc_dict in documents_dicts:
    if 'embedding' in doc_dict and isinstance(doc_dict['embedding'], list):
        doc_dict['embedding'] = np.array(doc_dict['embedding'])

# Reconstruct Document objects
documents = [Document.from_dict(doc_dict) for doc_dict in documents_dicts]
document_store = ChromaDocumentStore()
# Write the documents to the document store
document_store.write_documents(documents)


In [None]:
question: str = "What is a neurodegenerative disease? Give me examples"
document_store = loaded_pipeline.get_component('retriever')
# Directly query the document store to simulate what the BM25 retriever would retrieve
retrieved_docs = document_store.run(question) 

# Print the retrieved documents' content or metadata
print(f"Retrieved Documents: {retrieved_docs}")

# Now proceed with the full pipeline run to generate the final response
response = loaded_pipeline.run({
    "retriever": {"query": question},
    "prompt_builder": {"question": question}
})
print("LLM Response:")
print(response["llm"]["replies"][0])

Now we have a simple RAG application working. Now we want to optimize the prompts so that we can get more concise responses. We will achieve this using `DSPy`

`DSPy` is a framework designed to algorithmically optimize prompts for Language Models.

### Use DSPy to get concise answers

In [None]:
import dspy
from dspy.primitives.prediction import Prediction


lm = dspy.LM(g.BEDROCK_HAIKU_MODELID)
dspy.settings.configure(lm=lm)

### Create the dspy signature for question answering
---

We can make changes in the signature to create shorter outputs to questions as we desire

In [None]:
# Now we will create a signature to generate answers that are more concise
# and capture the key components of the question instead of giving lengthy
# responses
# Enhanced signature for generating concise responses
class GenerateConciseResponses(dspy.Signature):
    """Generate extremely concise, information-dense responses to questions along with examples."""

    context = dspy.InputField(desc="Relevant information to answer the question")
    question = dspy.InputField()
    response = dspy.OutputField(desc="Provide a concise answer in 15 words or less. Focus on key information only.")

### Create a RAG module to make predictions

In [None]:
class ConciseRAG(dspy.Module):
    def __init__(self):
        super().__init__()
        self.generate_answer = dspy.ChainOfThought(GenerateConciseResponses)

    def retrieve(self, question):
        """
        In this function, we make this so it is possible for us to 
        re use the haystack retriever that already contains all of the indexed 
        documents
        """
        
        results = document_store.run(query=question)
        passages = [res.content for res in results['documents']]
        return Prediction(passages=passages)

    def forward(self, question):
        context = self.retrieve(question).passages
        prediction = self.generate_answer(context=context, question=question)
        return dspy.Prediction(context=context, answer=prediction.response)

### Prepare training and test data
---

Now we will refer to the previous HF dataset we loaded, and use it to split into training and test examples. We will use these examples in our prompt optimization process

In [None]:
from datasets import load_dataset
dataset = load_dataset("vblagoje/PubMedQA_instruction")
# Shuffle and select 1000 samples from the 'train' split
train_data = dataset['train'].shuffle(seed=42).select(range(1000))
dataset

In [None]:
train_data

In [None]:
from typing import List, Optional
training_examples: List = []
test_examples: List = []
# Access the train split specifically
train_data = dataset['train']

for i, example in enumerate(train_data):
    try:
        # craft the dspy example
        dspy_example = dspy.Example(question=example['instruction'], answer=example['response']).with_inputs('question')
        if i < 20:
            training_examples.append(dspy_example)
        elif i < 70:
            test_examples.append(dspy_example)
        else:
            break
    except Exception as e:
        print(f"Error processing example {i}: {e}")
        continue

In [None]:
print("\nSample training example:")
print(f"Question: {training_examples[0].question}")
print(f"Answer: {training_examples[0].answer}")

### Define a metric to optimize for
---

In this section, we will add a metric. Adding and tracking a metric as a part of the prompt optimization process is essential. We will create an example metric to use the HayStack SAS evaluator for the semantic match.

1. `concise_metric` checks for the semantic similarity
1. Uses a custom tiering system to score the length of the outputs.
1. Gives the final score based on the tiered length * semantic similarity score

In [None]:
from haystack.components.evaluators import SASEvaluator

# Initialize the sas evaluator and warm it up
sas_evaluator = SASEvaluator()
sas_evaluator.warm_up()

In [None]:
def concise_metric(example, pred, trace=None) -> float:
    """
    Enhanced metric that strongly favors concise, information-dense responses.
    """
    semantic_similarity = sas_evaluator.run(
        ground_truth_answers=[example.answer], 
        predicted_answers=[pred.answer]
    )["score"]
    
    pred_words = len(pred.answer.split())
    target_words = len(example.answer.split())

    # Enhanced tiered scoring with information density consideration
    if pred_words <= 20:
        length_multiplier = 1.2 
    elif pred_words <= 25:
        length_multiplier = 1.0
    elif pred_words <= 30:
        length_multiplier = 0.7
    elif pred_words <= 35:
        length_multiplier = 0.4
    else:
        length_multiplier = 0.1
    
    # Penalize responses that are much shorter than target (might be incomplete)
    if pred_words < target_words * 0.5:
        completeness_penalty = 0.3
    else:
        completeness_penalty = 0.0
    final_score = (semantic_similarity * length_multiplier) - completeness_penalty
    # Normalize score to be between 0 and 1
    return max(min(final_score, 1.0), 0.0)

In [None]:
# check for how the uncompiled ConciseRAG module performs on the dataset before any optimizations
uncompiled_rag = ConciseRAG()
from dspy.evaluate.evaluate import Evaluate

evaluate = Evaluate(
    metric=concise_metric, devset=test_examples, num_threads=1, display_progress=True, display_table=5
)
evaluate(uncompiled_rag)

Now we can see that the metric was evaluated as 42.35, which is pretty low. Now that we have an idea of what the overall dataset it like, we can compile the `ConciseRAG` module using several optimization metrics.

Here, we will use the `BootstrapFewShot` teleprompter/optimizer. 

### Begin module compilation with the `BootstrapFewShot` Optimizer
---

In this example, we will compile our `ConciseRAG()` module with `BootstrapFewShot` optimizer from DSPy (~3 minutes)

View more about this optimizer here: https://dspy.ai/deep-dive/optimizers/bootstrap-fewshot/

In [None]:
from dspy.teleprompt import BootstrapFewShot

optimizer = BootstrapFewShot(metric=concise_metric)

# Now we will compile our RAG module
compiled_rag = optimizer.compile(ConciseRAG(), trainset=training_examples)

In [None]:
# Once our module is compiled, let's re-evaluate all of the prompts
evaluate = Evaluate(
    metric=concise_metric, devset=test_examples, num_threads=1, display_progress=True, display_table=5
)
evaluate(compiled_rag)

Now we can see a significant jump in the metric from `26.82` to `47.96`!

In [None]:
import io
import contextlib

# Step 1: Capture `lm.inspect_history()` output to a string
output_buffer = io.StringIO()
with contextlib.redirect_stdout(output_buffer):
    lm.inspect_history(n=1)
captured_output = output_buffer.getvalue()

In [None]:
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack import Pipeline
from haystack_integrations.components.generators.amazon_bedrock import AmazonBedrockGenerator

# Initialize the amazon bedrock generator that is used to generate responses to questions
# at the inference step of the RAG pipeline
new_generator = AmazonBedrockGenerator(model=BEDROCK_HAIKU_MODELID)


template = captured_output +"""
---

Context:
{% for document in documents %}
    «{{ document.content }}»
{% endfor %}

Question: {{question}}
Reasoning: Let's think step by step in order to
"""

new_prompt_builder = PromptBuilder(template=template)

new_retriever = InMemoryBM25Retriever(document_store, top_k=3)

answer_builder = AnswerBuilder(pattern="Answer: (.*)")


optimized_rag_pipeline = Pipeline()
optimized_rag_pipeline.add_component("retriever", new_retriever)
optimized_rag_pipeline.add_component("prompt_builder", new_prompt_builder)
optimized_rag_pipeline.add_component("llm", new_generator)
optimized_rag_pipeline.add_component("answer_builder", answer_builder)

optimized_rag_pipeline.connect("retriever", "prompt_builder.documents")
optimized_rag_pipeline.connect("prompt_builder", "llm")
optimized_rag_pipeline.connect("llm.replies", "answer_builder.replies")


In [None]:
optimized_rag_pipeline.show()

In [None]:
question = "What is a neurodegenerative disease? Give me examples"
response = optimized_rag_pipeline.run({"retriever": {"query": question}, "prompt_builder": {"question": question}, "answer_builder": {"query": question}})

print(response["answer_builder"]["answers"][0].data) # to do