Prior to running this notebook run: `ollama serve &`. This will start the Ollama server and allow you to interact with it through this notebook

In [None]:
import dspy
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Dict, Union

from utils import json_to_dataframe, json_to_string_list

In [None]:
filepath = '../../data/vector_veterinary_imaging_2.json'

df = json_to_dataframe(filepath) 
rad_strings = json_to_string_list(filepath)

In [None]:
df

In [None]:
findings = list(df['findings'])
conclusions = list(df['conclusions_and_recommendations'])

## Retrieval

In [None]:
class SentenceTransformerRetriever(dspy.Retrieve):
    def __init__(self, model: str, findings: List[str], conclusions: List[str], k: int):
        self.model = model if isinstance(model, SentenceTransformer) else SentenceTransformer(model, trust_remote_code=True)
        self.findings = findings
        self.conclusions = conclusions
        self.k = k
        self.embeddings = None
        self.init_embeddings()

    def init_embeddings(self):
        self.embeddings = self.model.encode(self.findings)

    def forward(self, query: str, k: int) -> List[Dict[str, Union[str, float]]]:
        query_embedding = self.model.encode([query])
        similarities = cosine_similarity(query_embedding, self.embeddings)[0]
        top_k_indices = np.argsort(similarities)[-k:][::-1]

        results = []
        for idx in top_k_indices:
            results.append({
                'finding': self.findings[idx],
                'conclusion': self.conclusions[idx],
                'score': float(similarities[idx])
            })

        return results

In [None]:
vectorizer = "sentence-transformers/all-MiniLM-L6-v2"
# vectorizer = "dunzhang/stella_en_400M_v5"

In [None]:
retriever_model = SentenceTransformerRetriever(model=vectorizer, findings=findings, conclusions=conclusions, k=10)

In [None]:
findings[1]

In [None]:
retriever_model.forward(query=findings[1], k=2)

# Subtask breakdown

The idea here is to break the task of generating the `conclusions and recommendations` from the `findings` section down into smaller more manageable tasks and then recombining. This will also allow us to investigate each component and work on addressing specific issues.

In [None]:
import dspy
from typing import List, Dict
import re
from dataclasses import dataclass

In [None]:
@dataclass
class Finding:
    """Structure to hold parsed findings"""
    anatomical_location: str
    observation: str
    attributes: Dict[str, str]

class ExtractKeyFindings(dspy.Signature):
    """Extract and categorize key findings that require addressing in conclusions."""
    
    finding = dspy.InputField(desc="Complete findings section of radiology report")
    similar_examples = dspy.InputField(desc="Similar example reports for reference")
    
    abnormal_findings = dspy.OutputField(desc="List of abnormal findings that need to be addressed")
    normal_relevant = dspy.OutputField(desc="List of relevant normal findings that provide important context")
    
class AssessClinicalSignificance(dspy.Signature):
    """Assess the clinical significance of the identified findings."""
    
    abnormal_findings = dspy.InputField(desc="List of abnormal findings")
    normal_relevant = dspy.InputField(desc="List of relevant normal findings")
    similar_examples = dspy.InputField(desc="Similar example reports for reference")
    
    significance = dspy.OutputField(desc="Clinical significance of findings")
    risk_assessment = dspy.OutputField(desc="Assessment of any risks or concerns")

class GenerateRecommendations(dspy.Signature):
    """Generate specific recommendations based on findings and their significance."""
    
    abnormal_findings = dspy.InputField()
    significance = dspy.InputField()
    risk_assessment = dspy.InputField()
    similar_examples = dspy.InputField()
    
    recommendations = dspy.OutputField(desc="Specific, actionable recommendations")

class ValidateAndFormatConclusions(dspy.Signature):
    """Validate and format the final conclusions section."""
    
    original_finding = dspy.InputField()
    abnormal_findings = dspy.InputField()
    significance = dspy.InputField()
    recommendations = dspy.InputField()
    
    final_conclusions = dspy.OutputField(desc="Complete, formatted conclusions section")

class RadiologyReportGenerator(dspy.Module):
    """Complete pipeline for generating radiology report conclusions."""
    
    def __init__(self):
        super().__init__()
        
        self.extract = ExtractKeyFindings()
        self.assess = AssessClinicalSignificance()
        self.recommend = GenerateRecommendations()
        self.validate = ValidateAndFormatConclusions()
        
        # Validation rules
        self.validation_rules = [
            self.validate_findings_addressed,
            self.validate_recommendation_format,
            self.validate_medical_terminology,
            self.validate_length,
            self.validate_structure
        ]
    
    def forward(self, finding: str, similar_examples: List[dict]) -> str:
        # Extract key findings
        findings_output = self.extract(
            finding=finding,
            similar_examples=similar_examples
        )
        
        # Assess clinical significance
        significance_output = self.assess(
            abnormal_findings=findings_output.abnormal_findings,
            normal_relevant=findings_output.normal_relevant,
            similar_examples=similar_examples
        )
        
        # Generate recommendations
        recommendations = self.recommend(
            abnormal_findings=findings_output.abnormal_findings,
            significance=significance_output.significance,
            risk_assessment=significance_output.risk_assessment,
            similar_examples=similar_examples
        )
        
        # Validate and format final conclusions
        final_output = self.validate(
            original_finding=finding,
            abnormal_findings=findings_output.abnormal_findings,
            significance=significance_output.significance,
            recommendations=recommendations.recommendations
        )
        
        # Run validation checks
        self.run_validations(
            original_finding=finding,
            final_output=final_output.final_conclusions
        )
        
        return final_output.final_conclusions
    
    def validate_findings_addressed(self, finding: str, conclusion: str) -> bool:
        """Validate that all abnormal findings are addressed in conclusions."""
        # Extract key medical terms from finding
        finding_terms = self.extract_medical_terms(finding)
        
        # Check if key terms appear in conclusion
        conclusion_terms = self.extract_medical_terms(conclusion)
        
        missing_terms = [term for term in finding_terms if term not in conclusion_terms]
        if missing_terms:
            raise ValidationError(f"Findings not addressed: {missing_terms}")
        
        return True
    
    def validate_recommendation_format(self, conclusion: str) -> bool:
        """Validate recommendation format and actionability."""
        # Check for specific recommendation indicators
        recommendation_patterns = [
            r"recommend[s]?\s",
            r"consider[ing]?\s",
            r"warrant[s]?\s",
            r"suggest[s]?\s"
        ]
        
        has_recommendations = any(re.search(pattern, conclusion, re.IGNORECASE) 
                                for pattern in recommendation_patterns)
        
        if not has_recommendations:
            raise ValidationError("No clear recommendations found")
        
        return True
    
    def validate_medical_terminology(self, conclusion: str) -> bool:
        """Validate proper use of medical terminology."""
        # This would connect to a medical terminology database
        # For now, we'll use a simple check
        required_terms = ["examination", "findings", "indicates", "suggests"]
        
        missing_terms = [term for term in required_terms 
                        if term.lower() not in conclusion.lower()]
        
        if missing_terms:
            raise ValidationError(f"Missing professional medical terms: {missing_terms}")
        
        return True
    
    def validate_length(self, conclusion: str) -> bool:
        """Validate conclusion length is appropriate."""
        words = conclusion.split()
        if len(words) < 50 or len(words) > 300:
            raise ValidationError("Conclusion length outside acceptable range")
        
        return True
    
    def validate_structure(self, conclusion: str) -> bool:
        """Validate conclusion structure and formatting."""
        required_sections = [
            "findings",
            "significance",
            "recommendations"
        ]
        
        missing_sections = [section for section in required_sections 
                          if not self._has_section(conclusion, section)]
        
        if missing_sections:
            raise ValidationError(f"Missing required sections: {missing_sections}")
        
        return True
    
    def _has_section(self, text: str, section: str) -> bool:
        """Helper method to check if a section exists in the text."""
        section_patterns = [
            f"{section}:",
            f"{section.capitalize()}:",
            f"{section.upper()}:"
        ]
        
        return any(pattern in text for pattern in section_patterns)
    
    def extract_medical_terms(self, text: str) -> List[str]:
        """Extract medical terms from text.
        This is a simplified version - in practice, you'd want to use
        a medical NLP library or terminology database."""
        # Example implementation
        terms = []
        # Add actual medical term extraction logic
        return terms
    
    def run_validations(self, original_finding: str, final_output: str):
        """Run all validation checks on the output."""
        for validation_rule in self.validation_rules:
            if validation_rule.__name__ == 'validate_findings_addressed':
                validation_rule(original_finding, final_output)
            else:
                validation_rule(final_output)

class ValidationError(Exception):
    """Custom exception for validation errors."""
    pass

# Example prompt templates for each component
EXTRACT_PROMPT = """
Analyze the radiological findings and identify:
1. All abnormal findings that require attention
2. Normal findings that provide important context

Format your response as two separate lists.
Use precise medical terminology and maintain professional tone.
"""

ASSESS_PROMPT = """
Based on the identified findings, provide:
1. Clinical significance of each abnormal finding
2. Overall risk assessment considering the combination of findings

Consider both immediate and potential long-term implications.
"""

RECOMMEND_PROMPT = """
Generate specific, actionable recommendations that:
1. Address each significant finding
2. Prioritize based on clinical urgency
3. Include clear next steps for follow-up

Recommendations should be concrete and specific.
"""

VALIDATE_PROMPT = """
Create a final conclusions section that:
1. Integrates all components coherently
2. Maintains professional medical tone
3. Follows standard radiology report format
4. Is concise and focused

Ensure all significant findings are addressed and recommendations are clear.
"""

## Language model

In [None]:
# language_model = "qwen2.5"
language_model = "gemma2"

In [None]:
ollama_model = dspy.OllamaLocal(
    base_url='http://127.0.0.1:11434',
    timeout_s=500,
    model=language_model,
    model_type='text',
    max_tokens=1024,
    num_ctx=1024,
    temperature=0.7,
    top_p=0.8
)

In [None]:
# Configure DSPy to use Ollama
dspy.settings.configure(lm=ollama_model, rm=retriever_model)

## Extract key findings

In [None]:
class KeyFindingsModule(dspy.Module):
    def __init__(self, retriever):
        super().__init__()
        self.extract_key_findings = dspy.Predict(ExtractKeyFindings)
        self.retriever = retriever

    def forward(self, finding: str) -> dict:
        # Retrieve similar examples
        retrieved = self.retriever(finding, k=3)
        
        # Format examples for prompt
        examples_text = ""
        for i, ex in enumerate(retrieved, 1):
            examples_text += f"Example {i}:\n"
            examples_text += f"Finding: {ex['finding']}\n"
            examples_text += f"Conclusion: {ex['conclusion']}\n\n"

        # Extract key findings
        prediction = self.extract_key_findings(
            finding=finding,
            similar_examples=examples_text
        )

        return {
            'finding': finding,
            'abnormal_findings': prediction.abnormal_findings,
            'normal_relevant': prediction.normal_relevant,
            'similar_examples': retrieved
        }

In [None]:
 # Load data
df = json_to_dataframe(filepath)
findings = list(df['findings'])
conclusions = list(df['conclusions_and_recommendations'])

# Initialize retriever
retriever = SentenceTransformerRetriever(
    model=vectorizer,
    findings=findings,
    conclusions=conclusions,
    k=3
)

In [None]:
key_findings_extractor = KeyFindingsModule(retriever)

In [None]:
findings[0]

In [None]:
key_findings_example = key_findings_extractor(findings[0])

In [None]:
print(key_findings_example['abnormal_findings'])

In [None]:
print(key_findings_example['normal_relevant'])

## Assess clinical significance

In [None]:
class AssessClinicalSignificanceModule(dspy.Module):
    def __init__(self, retriever):
        super().__init__()
        self.extract_key_findings = dspy.Predict(ExtractKeyFindings)
        self.assess_clinical_findings = dspy.Predict(AssessClinicalSignificance)
        self.retriever = retriever

    def forward(self, finding: str) -> dict:
        # Retrieve similar examples
        retrieved = self.retriever(finding, k=3)
        
        # Format examples for prompt
        examples_text = ""
        for i, ex in enumerate(retrieved, 1):
            examples_text += f"Example {i}:\n"
            examples_text += f"Finding: {ex['finding']}\n"
            examples_text += f"Conclusion: {ex['conclusion']}\n\n"

        # Extract key findings
        key_findings = self.extract_key_findings(
            finding=finding,
            similar_examples=examples_text
        )

        # Assess clinical significance
        clinical_significance = self.assess_clinical_findings(
            abnormal_findings = key_findings.abnormal_findings,
            normal_relevant = key_findings.normal_relevant,
            similar_examples = examples_text
        )

        return {
            'finding': finding,
            'abnormal_findings': key_findings.abnormal_findings,
            'normal_relevant': key_findings.normal_relevant,
            'significance': clinical_significance.significance,
            'risk_assessment': clinical_significance.risk_assessment,
            'similar_examples': retrieved
        }

In [None]:
assess_clinical_significance = AssessClinicalSignificanceModule(retriever)

In [None]:
assess_significance_example = assess_clinical_significance(findings[0])

In [None]:
print(assess_significance_example['significance'])

In [None]:
print(assess_significance_example['risk_assessment'])

## Generate recommendations

In [None]:
class GenerateRecommendationsModule(dspy.Module):
    def __init__(self, retriever):
        super().__init__()
        self.extract_key_findings = dspy.Predict(ExtractKeyFindings)
        self.assess_clinical_findings = dspy.Predict(AssessClinicalSignificance)
        self.generate_recommendations = dspy.Predict(GenerateRecommendations)
        self.retriever = retriever

    def forward(self, finding: str) -> dict:
        # Retrieve similar examples
        retrieved = self.retriever(finding, k=3)
        
        # Format examples for prompt
        examples_text = ""
        for i, ex in enumerate(retrieved, 1):
            examples_text += f"Example {i}:\n"
            examples_text += f"Finding: {ex['finding']}\n"
            examples_text += f"Conclusion: {ex['conclusion']}\n\n"

        # Extract key findings
        key_findings = self.extract_key_findings(
            finding=finding,
            similar_examples=examples_text
        )

        # Assess clinical significance
        clinical_significance = self.assess_clinical_findings(
            abnormal_findings = key_findings.abnormal_findings,
            normal_relevant = key_findings.normal_relevant,
            similar_examples = examples_text
        )

        # Generate recommendations
        generate_recommendations = self.generate_recommendations(
            abnormal_findings = key_findings.abnormal_findings,
            significance = clinical_significance.significance,
            risk_assessment = clinical_significance.risk_assessment,
            similar_examples = examples_text
        )

        return {
            'finding': finding,
            'abnormal_findings': key_findings.abnormal_findings,
            'normal_relevant': key_findings.normal_relevant,
            'significance': clinical_significance.significance,
            'risk_assessment': clinical_significance.risk_assessment,
            'recommendations': generate_recommendations.recommendations,
            'similar_examples': retrieved
        }

In [None]:
generate_recommendations = GenerateRecommendationsModule(retriever)

In [None]:
generate_recommendations_example = generate_recommendations(findings[0])

In [None]:
print(generate_recommendations_example['recommendations'])

In [None]:
# Initialize the generator
generator = RadiologyReportGenerator()

# Generate conclusions
try:
    conclusions = generator(finding=finding_text, similar_examples=examples)
    print("Successfully generated conclusions")
    print(conclusions)
except ValidationError as e:
    print(f"Validation failed: {str(e)}")

In [None]:
finding_text

In [None]:
class GenerateConclusions(dspy.Signature):
    """Given a radiology finding and similar examples, generate an appropriate conclusions and recommendations section.
    The response should maintain a professional medical tone and follow the style of the examples."""

    finding = dspy.InputField(desc="Findings section of the radiology report.")
    similar_examples = dspy.InputField(desc="Similar examples of findings and corresponding conclusions and recommendations sections.")
    conclusions = dspy.OutputField(desc="The conclusions and recommendations section. Give the findings section above.")

In [None]:
class RadiologyModule(dspy.Module):
    def __init__(self, retriever):
        super().__init__()
        self.generate_conclusion = dspy.Predict(GenerateConclusions)
        self.retriever = retriever

    def forward(self, finding: str) -> dict:
        # Retrieve similar examples
        retrieved = self.retriever(finding, k=3)
        
        # Format examples for prompt
        examples_text = ""
        for i, ex in enumerate(retrieved, 1):
            examples_text += f"Example {i}:\n"
            examples_text += f"Finding: {ex['finding']}\n"
            examples_text += f"Conclusion: {ex['conclusion']}\n\n"

        # Generate new conclusion
        prediction = self.generate_conclusion(
            finding=finding,
            similar_examples=examples_text
        )

        return {
            'finding': finding,
            'generated_conclusion': prediction.conclusions,
            'similar_examples': retrieved
        }

## Full pipeline

In [None]:
def setup_rad_pipeline(filepath: str, vectorizer: str = "sentence-transformers/all-MiniLM-L6-v2", k: int = 3):
    """
    Set up the complete radiology report generation pipeline
    """
    # Load data
    df = json_to_dataframe(filepath)
    findings = list(df['findings'])
    conclusions = list(df['conclusions_and_recommendations'])
    
    # Initialize retriever
    retriever = SentenceTransformerRetriever(
        model=vectorizer,
        findings=findings,
        conclusions=conclusions,
        k=k
    )
    
    # Create and return the radiology module
    return RadiologyModule(retriever)

## Example

In [None]:
filepath = '../../data/vector_veterinary_imaging_2.json'
rad_pipeline = setup_rad_pipeline(filepath)

In [None]:
# Example finding
test_finding = """
The thoracic cavity demonstrates normal cardiac silhouette size and shape. 
The pulmonary vasculature appears within normal limits. 
There is a mild interstitial pattern noted in the caudodorsal lung fields.
No evidence of pleural effusion is noted.
"""

In [None]:
result = rad_pipeline(test_finding)
    
print("Generated Conclusion:")
print(result['generated_conclusion'])
print("\nSimilar Examples Used:")
for i, example in enumerate(result['similar_examples'], 1):
    print(f"\nExample {i} (Similarity Score: {example['score']:.3f}):")
    print(f"Finding: {example['finding']}")
    print(f"Conclusion: {example['conclusion']}")

## Remove the example in question from retrieval

As we go through a handful of examples, we want to make sure we don't include the example itself in the retrieval set. But it is fine to include all other examples. 

In [None]:
import random
from typing import List, Dict, Union, Optional
import pandas as pd

class SentenceTransformerRetrieverWithExclusion(dspy.Retrieve):
    def __init__(self, model: str, findings: List[str], conclusions: List[str], k: int):
        self.model = model if isinstance(model, SentenceTransformer) else SentenceTransformer(model, trust_remote_code=True)
        self.findings = findings
        self.conclusions = conclusions
        self.k = k
        self.embeddings = None
        self.excluded_indices = set()
        self.init_embeddings()

    def init_embeddings(self):
        self.embeddings = self.model.encode(self.findings)
        
    def set_excluded_indices(self, indices: Optional[List[int]] = None):
        """Set indices to exclude from retrieval"""
        self.excluded_indices = set(indices or [])
        
    def clear_excluded_indices(self):
        """Clear all excluded indices"""
        self.excluded_indices = set()

    def forward(self, query: str, k: int) -> List[Dict[str, Union[str, float]]]:
        query_embedding = self.model.encode([query])
        similarities = cosine_similarity(query_embedding, self.embeddings)[0]
        
        # Create mask for excluded indices
        mask = np.ones_like(similarities, dtype=bool)
        if self.excluded_indices:
            mask[list(self.excluded_indices)] = False
        
        # Get top k indices excluding masked indices
        masked_similarities = similarities.copy()
        masked_similarities[~mask] = -np.inf
        top_k_indices = np.argsort(masked_similarities)[-k:][::-1]

        results = []
        for idx in top_k_indices:
            results.append({
                'finding': self.findings[idx],
                'conclusion': self.conclusions[idx],
                'score': float(similarities[idx])
            })

        return results

def setup_rad_pipeline_with_exclusion(filepath: str, vectorizer: str = "sentence-transformers/all-MiniLM-L6-v2", k: int = 3):
    """
    Set up the radiology pipeline with exclusion capability
    """
    df = json_to_dataframe(filepath)
    findings = list(df['findings'])
    conclusions = list(df['conclusions_and_recommendations'])
    
    retriever = SentenceTransformerRetrieverWithExclusion(
        model=vectorizer,
        findings=findings,
        conclusions=conclusions,
        k=k
    )
    
    return RadiologyModule(retriever), df

def run_evaluation_with_exclusion(filepath: str, num_examples: int = 5, seed: Optional[int] = None):
    """
    Run inference on a specified number of examples, excluding each example from its own retrieval set
    
    Args:
        filepath: Path to the JSON data file
        num_examples: Number of examples to evaluate
        seed: Random seed for reproducibility
        
    Returns:
        DataFrame containing the evaluation results
    """
    if seed is not None:
        random.seed(seed)
        
    # Setup pipeline with exclusion capability
    rad_pipeline, df = setup_rad_pipeline_with_exclusion(filepath)
    
    # Randomly select examples
    total_examples = len(df)
    selected_indices = random.sample(range(total_examples), min(num_examples, total_examples))
    
    results = []
    
    for idx in selected_indices:
        # Get the example
        finding = df['findings'].iloc[idx]
        actual_conclusion = df['conclusions_and_recommendations'].iloc[idx]
        
        # Set the current example to be excluded from retrieval
        rad_pipeline.retriever.set_excluded_indices([idx])
        
        # Run inference
        result = rad_pipeline(finding)
        
        # Clear exclusion for next iteration
        rad_pipeline.retriever.clear_excluded_indices()
        
        # Store results
        results.append({
            'index': idx,
            'finding': finding,
            'actual_conclusion': actual_conclusion,
            'generated_conclusion': result['generated_conclusion'],
            'similar_examples': result['similar_examples']
        })
        
    return pd.DataFrame(results)

## TODO 

Pull out a handful of examples of prompts, actual conclusions, and predicted conclusions and have the model judge how close the actual and predicted responses are and also to come up with a new set of prompts that might work better.

In [None]:
filepath = '../../data/vector_veterinary_imaging_2.json'

# Run evaluation on 3 random examples
results_df = run_evaluation_with_exclusion(filepath, num_examples=3, seed=42)

# Print results
for idx, row in results_df.iterrows():
    print(f"\nExample {idx + 1}:")
    print("Finding:")
    print(row['finding'])
    print("\nActual Conclusion:")
    print(row['actual_conclusion'])
    print("\nGenerated Conclusion:")
    print(row['generated_conclusion'])
    print("\nSimilar Examples Used:")
    for i, example in enumerate(row['similar_examples'], 1):
        print(f"\nReference {i} (Similarity Score: {example['score']:.3f}):")
        print(f"Finding: {example['finding']}")
        print(f"Conclusion: {example['conclusion']}")
    print("\n" + "="*80)

## TODO 

incorporate the code below

In [None]:
def save_evaluation_results(results_df: pd.DataFrame, output_path: str, format: str = 'json'):
    """
    Save evaluation results for LLM analysis
    
    Args:
        results_df: DataFrame containing evaluation results
        output_path: Path to save the results
        format: Either 'json' or 'csv'
    """
    # Prepare data for export
    export_data = []
    for _, row in results_df.iterrows():
        export_row = {
            'finding': row['finding'],
            'actual_conclusion': row['actual_conclusion'],
            'generated_conclusion': row['generated_conclusion'],
            'reference_examples': [
                {
                    'finding': ex['finding'],
                    'conclusion': ex['conclusion'],
                    'similarity_score': float(ex['score'])
                } for ex in row['similar_examples']
            ]
        }
        export_data.append(export_row)
    
    # Save in specified format
    if format.lower() == 'json':
        with open(output_path, 'w') as f:
            json.dump(export_data, f, indent=2)
    else:  # csv
        # Flatten the reference examples
        flat_data = []
        for item in export_data:
            flat_item = {
                'finding': item['finding'],
                'actual_conclusion': item['actual_conclusion'],
                'generated_conclusion': item['generated_conclusion']
            }
            for i, ref in enumerate(item['reference_examples'], 1):
                flat_item[f'ref_{i}_finding'] = ref['finding']
                flat_item[f'ref_{i}_conclusion'] = ref['conclusion']
                flat_item[f'ref_{i}_similarity'] = ref['similarity_score']
            flat_data.append(flat_item)
        pd.DataFrame(flat_data).to_csv(output_path, index=False)

In [None]:
filepath = '../../data/vector_veterinary_imaging_2.json'
    
# Run evaluation and save results
results_df = run_evaluation_with_exclusion(filepath, num_examples=20, seed=42)
save_evaluation_results(results_df, 'evaluation_results.json')

In [None]:
results_df

In [None]:
import json
from typing import List, Dict, Union
import pandas as pd
import dspy
from dspy.teleprompt import BootstrapFewShot, ValueEstimator
import numpy as np

class ConclusionScorer(dspy.Signature):
    """Rate the quality and accuracy of generated radiology conclusions compared to actual conclusions."""
    
    finding = dspy.InputField()
    generated_conclusion = dspy.InputField()
    actual_conclusion = dspy.InputField()
    
    score = dspy.OutputField(desc="Score from 0-1 indicating similarity of content and style")
    reasoning = dspy.OutputField(desc="Explanation of the score and suggestions for improvement")

class RadiologyValueEstimator(ValueEstimator):
    def __init__(self):
        super().__init__()
        self.scorer = dspy.Predict(ConclusionScorer)
    
    def forward(self, example, pred, trace=None):
        score = self.scorer(
            finding=example.finding,
            generated_conclusion=pred.conclusions,
            actual_conclusion=example.actual_conclusion
        )
        return float(score.score)

def optimize_rad_prompt(filepath: str, num_bootstrap_examples: int = 10):
    """
    Use DSPy's teleprompter to optimize the radiology prompt
    
    Args:
        filepath: Path to the dataset
        num_bootstrap_examples: Number of examples to use for bootstrapping
    """
    # Load data
    df = json_to_dataframe(filepath)
    
    # Prepare training data
    train_data = [
        dspy.Example(
            finding=row['findings'],
            actual_conclusion=row['conclusions_and_recommendations']
        ).with_inputs('finding')
        for _, row in df.iterrows()
    ]
    
    # Initialize teleprompter components
    bootstrapper = BootstrapFewShot(
        demo_retriever='bm25',  # or 'sbert' for semantic search
        k=num_bootstrap_examples
    )
    
    value_estimator = RadiologyValueEstimator()
    
    # Create compiler configuration
    config = dspy.TelepromptConfig(
        metric=value_estimator,
        max_bootstrapping_iterations=3,
        max_rounds=5
    )
    
    # Initialize and run teleprompter
    teleprompter = dspy.Teleprompter(
        GenerateConclusions,
        bootstrapper=bootstrapper,
        config=config
    )
    
    # Optimize the prompt
    optimized_program = teleprompter.compile(
        train_data=train_data,
        eval_data=train_data[:100]  # Use subset for evaluation
    )
    
    return optimized_program

In [None]:
# Example usage:
if __name__ == "__main__":
    filepath = '../../data/vector_veterinary_imaging_2.json'
    
    # Run evaluation and save results
    results_df = run_evaluation_with_exclusion(filepath, num_examples=20, seed=42)
    save_evaluation_results(results_df, 'evaluation_results.json')
    
    # Optimize prompt using teleprompter
    optimized_program = optimize_rad_prompt(filepath)
    
    # Print optimized prompt
    print("Optimized Prompt:")
    print(optimized_program.signature.instructions)