# DSPy 3.0.3 Medical Data Extraction Pipeline

This notebook demonstrates a comprehensive data extraction pipeline using DSPy 3.0.3 to extract structured dichotomous outcomes from medical research papers in markdown format.

## Objective
Extract structured data from medical research markdown  into the target format matching `dichotomous_outcomes.json`.


## 1. Setup and Dependencies


In [80]:
import dspy
import json
import pandas as pd
import numpy as np
import re
import os
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.metrics.pairwise import cosine_similarity
import warnings
import asyncio
from datetime import datetime
import aiofiles
import traceback

import tiktoken
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv
import hashlib
from scipy.optimize import linear_sum_assignment
import diskcache as dc

# Suppress warnings
warnings.filterwarnings('ignore')

# Load environment variables
load_dotenv()

# Print DSPy version
print(f"DSPy version: {dspy.__version__}")

DSPy version: 3.0.3


## 2. Configure DSPy Language Model


In [101]:
# Set your API key (uncomment and add your key)
# os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# Configure DSPy with OpenAI GPT-4o-mini for cost efficiency
#lm = dspy.LM('gemini/gemini-2.5-pro', max_tokens=20000, temperature=1.0)
#lm = dspy.LM('openai/gpt-5-mini-2025-08-07', max_tokens=20000, temperature=1.0)
lm = dspy.LM("anthropic/claude-sonnet-4-20250514")
dspy.configure(lm=lm)

print("Language model configured successfully")


Language model configured successfully


In [None]:
response = lm("")
print(response)

In [104]:
lm.history

[]

### DSpy History Details


In [102]:
# Global variables to track processed calls
_processed_hashes = set()
_csv_path = "dspy_history.csv"

def set_log_file(csv_path: str):
    """Set the CSV file path for logging."""
    global _csv_path, _processed_hashes
    _csv_path = csv_path
    
    # Load existing hashes from CSV if it exists
    if Path(csv_path).exists():
        try:
            df = pd.read_csv(csv_path)
            if 'call_hash' in df.columns:
                _processed_hashes = set(df['call_hash'].tolist())
                print(f"Loaded {len(_processed_hashes)} existing records from {csv_path}")
        except Exception as e:
            print(f"Warning: Could not load existing CSV: {e}")
    else:
        print(f"New log file will be created: {csv_path}")

def log_history():
    """Log current DSPy history to CSV. Call this after running DSPy operations."""
    global _processed_hashes, _csv_path
    
    # Get LM from dspy settings
    try:
        lm = dspy.settings.lm
    except:
        print("Error: No LM found in dspy.settings")
        return 0
    
    if not hasattr(lm, 'history') or not lm.history:
        print("No history found in language model")
        return 0
    
    new_records = []
    
    for call_data in lm.history:
        # Generate unique hash
        hash_content = {
            'messages': call_data.get('messages', []),
            'timestamp': call_data.get('timestamp', ''),
            'uuid': call_data.get('uuid', ''),
        }
        call_hash = hashlib.md5(json.dumps(hash_content, sort_keys=True, default=str).encode()).hexdigest()
        
        # Skip if already processed
        if call_hash in _processed_hashes:
            continue
        
        # Extract call info
        messages = call_data.get('messages', [])
        system_msg = next((m.get('content', '') for m in messages if m.get('role') == 'system'), '')
        user_msg = next((m.get('content', '') for m in messages if m.get('role') == 'user'), '')
        
        # Extract response
        response_obj = call_data.get('response', {})
        assistant_response = ""
        if hasattr(response_obj, 'choices') and response_obj.choices:
            assistant_response = response_obj.choices[0].message.content
        
        # Extract usage
        usage = call_data.get('usage', {})
        if isinstance(usage, dict):
            prompt_tokens = usage.get('prompt_tokens', 0)
            completion_tokens = usage.get('completion_tokens', 0) 
            total_tokens = usage.get('total_tokens', 0)
        else:
            prompt_tokens = completion_tokens = total_tokens = 0
        
        record = {
            'call_hash': call_hash,
            'timestamp': call_data.get('timestamp', datetime.now().isoformat()),
            'uuid': call_data.get('uuid', ''),
            'model': call_data.get('model', ''),
            'cost': call_data.get('cost', 0.0),
            'prompt_tokens': prompt_tokens,
            'completion_tokens': completion_tokens,
            'total_tokens': total_tokens,
            'system_msg_length': len(system_msg),
            'user_msg_preview': user_msg[:200] if user_msg else '',
            'response_preview': assistant_response[:200] if assistant_response else '',
            'cache_hit': getattr(response_obj, 'cache_hit', False) if response_obj else False,
            'logged_at': datetime.now().isoformat()
        }
        
        new_records.append(record)
        _processed_hashes.add(call_hash)
    
    if not new_records:
        print("No new records to add")
        return 0
    
    # Save to CSV
    new_df = pd.DataFrame(new_records)
    
    if Path(_csv_path).exists():
        new_df.to_csv(_csv_path, mode='a', header=False, index=False)
    else:
        Path(_csv_path).parent.mkdir(parents=True, exist_ok=True)
        new_df.to_csv(_csv_path, index=False)
    
    print(f"Added {len(new_records)} new records to {_csv_path}")
    return len(new_records)

def show_stats():
    """Show statistics from the logged history."""
    global _csv_path
    
    if not Path(_csv_path).exists():
        print("No history file found")
        return
    
    try:
        df = pd.read_csv(_csv_path)
        
        print(f"\nDSPy History Stats from {_csv_path}:")
        print("=" * 50)
        print(f"Total calls: {len(df)}")
        
        if 'model' in df.columns:
            print(f"Unique models: {df['model'].nunique()}")
            print("Model breakdown:")
            for model, count in df['model'].value_counts().head().items():
                print(f"  {model}: {count} calls")
        
        if 'cost' in df.columns:
            # If prompt_tokens == 0, set cost = 0 for those rows
            df.loc[df['prompt_tokens'] == 0, 'cost'] = 0

            # Recompute totals
            total_cost = df['cost'].sum()
            avg_cost = df['cost'].mean()

            print(f"Total cost: ${total_cost:.4f}")
            print(f"Average cost per call: ${avg_cost:.4f}")

        
        if 'total_tokens' in df.columns:
            total_tokens = df['total_tokens'].sum()
            avg_tokens = df['total_tokens'].mean()
            print(f"Total tokens: {total_tokens:,}")
            print(f"Average tokens per call: {avg_tokens:.1f}")
        
        if 'cache_hit' in df.columns:
            cache_rate = df['cache_hit'].mean() * 100
            print(f"Cache hit rate: {cache_rate:.1f}%")
        
        if 'timestamp' in df.columns:
            print(f"Date range: {df['timestamp'].min()} to {df['timestamp'].max()}")
        
    except Exception as e:
        print(f"Error reading history: {e}")

def view_recent(n=5):
    """View the most recent n logged calls."""
    global _csv_path
    
    if not Path(_csv_path).exists():
        print("No history file found")
        return
    
    try:
        df = pd.read_csv(_csv_path)
        recent = df.tail(n)
        
        print(f"\nMost Recent {n} DSPy Calls:")
        print("=" * 60)
        
        for _, row in recent.iterrows():
            print(f"Time: {row['timestamp']}")
            print(f"Model: {row['model']}")
            print(f"Tokens: {row['total_tokens']} | Cost: ${row['cost']:.4f}")
            print(f"User: {row['user_msg_preview'][:100]}...")
            print(f"Response: {row['response_preview'][:100]}...")
            print("-" * 40)
            
    except Exception as e:
        print(f"Error reading history: {e}")

def clear_cache():
    """Clear the processed hashes cache (will reprocess all history next time)."""
    global _processed_hashes
    _processed_hashes.clear()
    print("Cleared processed hashes cache")

def export_full_history(output_file: str = "full_dspy_history.json"):
    """Export complete DSPy history with full messages to JSON."""
    try:
        lm = dspy.settings.lm
        if hasattr(lm, 'history') and lm.history:
            with open(output_file, 'w') as f:
                json.dump(lm.history, f, indent=2, default=str)
            print(f"Exported full history to {output_file}")
        else:
            print("No history found to export")
    except Exception as e:
        print(f"Error exporting history: {e}")

# Initialize with default file
set_log_file("dspy_history.csv")

print("DSPy History Logger for Notebooks Ready!")
print("Usage:")
print("  set_log_file('my_file.csv')  # Set custom log file")
print("  log_history()                # Log current DSPy history") 
print("  show_stats()                 # Show summary statistics")
print("  view_recent(5)               # View recent 5 calls")
print("  clear_cache()                # Clear processed cache")
print("  export_full_history()        # Export complete history to JSON")

# Cell 1: Set your log file
set_log_file("/nlp/data/karthik9/Sprint1/Dental/Data/csvs/extraction_history.csv")

New log file will be created: dspy_history.csv
DSPy History Logger for Notebooks Ready!
Usage:
  set_log_file('my_file.csv')  # Set custom log file
  log_history()                # Log current DSPy history
  show_stats()                 # Show summary statistics
  view_recent(5)               # View recent 5 calls
  clear_cache()                # Clear processed cache
  export_full_history()        # Export complete history to JSON
Loaded 4104 existing records from /nlp/data/karthik9/Sprint1/Dental/Data/csvs/extraction_history.csv


In [103]:
#Log the history  
log_history()

#Check stats anytime
show_stats()

No history found in language model

DSPy History Stats from /nlp/data/karthik9/Sprint1/Dental/Data/csvs/extraction_history.csv:
Total calls: 4104
Unique models: 3
Model breakdown:
  gemini/gemini-2.5-pro: 4016 calls
  openai/gpt-5-mini-2025-08-07: 58 calls
  openai/gpt-4o-mini: 30 calls
Total cost: $16.5535
Average cost per call: $0.0040
Total tokens: 3,020,487
Average tokens per call: 736.0
Cache hit rate: 75.3%
Date range: 2025-09-16T10:33:45.131530 to 2025-09-18T18:49:54.231071


### Safe Json Parser

In [84]:
def safe_json_parse(json_string, fallback=None):
    """Robust JSON parser with multiple recovery strategies."""
    if fallback is None:
        fallback = {}
    
    if not json_string or not isinstance(json_string, str):
        return fallback
    
    # Clean markdown fences first
    import re
    json_string = re.sub(r"```[a-zA-Z]*\n?", "", json_string).replace("```", "")
    json_string = json_string.strip()
    
    # Strategy 1: Direct parsing
    try:
        result = json.loads(json_string)
        if isinstance(result, str) and result.strip().startswith(("{", "[")):
            return safe_json_parse(result, fallback)
        return result
    except json.JSONDecodeError:
        pass
    
    # Strategy 2: Clean common issues and handle nested single quotes
    try:
        cleaned = json_string.strip()
        cleaned = cleaned.replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
        cleaned = re.sub(r',(\s*[}\]])', r'\1', cleaned)
        
        if cleaned.startswith("'") or "': '" in cleaned or "': {'" in cleaned:
            cleaned = cleaned.replace("'", '"')
            cleaned = cleaned.replace('""', '"')
        else:
            cleaned = re.sub(r"'([^']*)':", r'"\1":', cleaned)
            cleaned = re.sub(r":\s*'([^']*)'", r': "\1"', cleaned)
        
        result = safe_json_parse(cleaned)
        if isinstance(result, str) and result.strip().startswith(("{", "[")):
            return safe_json_parse(result, fallback)
        return result
    except (json.JSONDecodeError, AttributeError):
        pass
    
    # Strategy 3: Extract key-value pairs manually
    try:
        data = {}
        for match in re.finditer(r'"([^"]+)":\s*(\d+(?:\.\d+)?)', json_string):
            key, value = match.groups()
            data[key] = float(value) if '.' in value else int(value)
        
        for match in re.finditer(r'"([^"]+)":\s*"([^"]*)"', json_string):
            key, value = match.groups()
            data[key] = value
        
        for match in re.finditer(r'"([^"]+)":\s*(true|false)', json_string):
            key, value = match.groups()
            data[key] = value == 'true'
        
        if data:
            return data
    except:
        pass
    
    return fallback


## 3. Data Loading and Exploration


In [85]:
# Load source file
source_file = "/nlp/data/karthik9/Sprint1/Dental/Data/acute_pain_mds/2467_Malmstrom_md/2467_Malmstrom_md.json"
target_file = "/nlp/data/karthik9/Sprint1/Dental/Data/jsons/dichotomous_outcomes.json"

with open(source_file, 'r') as f:
    source_data = json.load(f)

with open(target_file, 'r') as f:
    target_data = json.load(f)

print(source_data.keys())

# Extract markdown content
markdown_content = source_data['marker']['markdown']

# Use OpenAI tokenizer (cl100k_base is the same one GPT-4/4o/5 use)
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(markdown_content)

print(f"Markdown content length: {len(markdown_content)} characters")
print(f"Token count: {len(tokens)}")
print(f"Target data contains {len(target_data)} records")

dict_keys(['id', 'pdf_path', 'unique_filename', 'marker', 'status', 'processing_timestamp'])
Markdown content length: 32729 characters
Token count: 8122
Target data contains 1691 records


In [86]:
# Filter target data for  study to understand expected output
one_study_records = [record for record in target_data if record.get('filename') == '2467_Malmstrom']
print(f"Found {len(one_study_records)} records in target data")

# Show example record structure
if one_study_records:
    print("\nExample target record structure:")
    for key, value in one_study_records[0].items():
        print(f"{key}: {value}")


Found 6 records in target data

Example target record structure:
Ref_ID: 2467
First_Author: Malmstrom
Trial_Name: NR
Population: 2
Intervention_Code: 6
Intervention_Description: Ibuprofen 400 mg
Outcome_Type: 5
Outcome_Other_Specify: 
Follow_Up_Time: 24 hours
N_Analyzed: 46
Adverse_Effect_Specify: Nausea
Adverse_Effects_All_Study: 
N_Events_Number: 8
N_Events_Percentage: 17.4
Comments: single dose, 0-24 hours
filename: 2467_Malmstrom


## 4. DSPy Signature Definitions

We'll define specialized signatures for each extraction task:


In [87]:
class ExtractStudyMetadata(dspy.Signature):
    """Extract basic study metadata from medical research paper markdown.
    
    This extracts core identifying information about the dental pain management study.
    """
    
    markdown_content: str = dspy.InputField(desc="Full markdown content of the medical research paper")

    first_author: str = dspy.OutputField(
        desc="Last name of the first author (e.g., 'Cooper'). Extract only the surname."
    )
    
    population_code: str = dspy.OutputField(
        desc="Numeric code representing the study population type. Codes: 1=simple tooth extraction, 2=surgical tooth extraction (third molar/wisdom teeth), 3=surgical tooth extraction (other teeth), 4=pulpitis or its complications. Can be multiple codes separated by commas (e.g., '2, 3')"
    )
    


class ExtractInterventions(dspy.Signature):
    """Extract intervention details from medical research paper markdown.
    
    This extracts information about pain management interventions used in dental studies.
    Focus on medication types, dosages, and participant counts.
    """
    
    markdown_content: str = dspy.InputField(desc="Full markdown content of the medical research paper")
    
    interventions_json: str = dspy.OutputField(
        desc="""JSON string containing list of interventions. Each intervention object must have:
        - intervention_code (integer): Numeric code 1-11 where:
          1=Ibuprofen 200-400mg + Acetaminophen 500-1000mg
          2=Oxycodone 5mg or Codeine 60mg  
          3=Acetaminophen 650mg + Oxycodone 10mg
          4=Ibuprofen 200mg + Hydrocodone 5mg
          5=Hydrocodone 5mg + Acetaminophen 300-325mg
          6=Ibuprofen 400mg (fast acting or acid)
          7=Tramadol 37.5mg + Acetaminophen 325mg
          8=Acetaminophen 500-1000mg
          9=Acetaminophen 600-650mg + Codeine 60mg
          10=Naproxen 400-440mg
          11=Placebo/NA (If its not mentioned as a placebo, then it is NA)
          #12=OTHER
        - intervention_description (string): Full description with medication name and exact dose (e.g., "Ibuprofen 400mg", "Naproxen sodium 440mg")
        - n_analyzed (integer): Number of participants analyzed for this intervention group
        
        Example: [{"intervention_code": 6, "intervention_description": "Ibuprofen 400mg", "n_analyzed": 40}]"""
    )


class ExtractAllOutcomes(dspy.Signature):
    """Extract ALL outcomes from medical research paper for systematic review.
    
    This implements COMPLETE DATA CAPTURE methodology - extract every data point
    including rescue analgesia, adverse events, and other outcomes at all time points.
    Focus on dichotomous outcomes from ALL data sources: main text, figures, tables,
    and supplementary materials. Include zero-event outcomes (0/N patients).
    """
    
    markdown_content: str = dspy.InputField(desc="Full markdown content of the medical research paper including supplementary materials")
    intervention_description: str = dspy.InputField(desc="Specific intervention to extract outcomes for (e.g., 'Ibuprofen 400mg', 'Placebo')")
    
    all_outcomes_json: str = dspy.OutputField(
        desc="""JSON string containing list of ALL outcomes for the specified intervention. Each outcome object must have:
        
        MANDATORY FIELDS FOR ALL OUTCOMES:
        - outcome_type (integer): Outcome type code where:
          1=Rescue analgesia at 6 hours
          2=Rescue analgesia at 4 hours  
          4=Rescue analgesia for pulpitis population
          5=Adverse effects (nausea, vomiting, drowsiness, dizziness, headache, etc.)
          6=Other outcomes (pain relief, time to onset, etc.)
        - follow_up_time (string): Exact time point when outcome was measured (e.g., "6 hours", "24hrs", "4 hours", "7 days")
        - n_analyzed (integer): Number of participants analyzed for this specific outcome
        - n_events_number (integer): Number of patients who experienced this outcome
        - n_events_percentage (float): Percentage of patients who experienced this outcome (e.g., 17.5, 0.6, 2.4, 0.0)
        
        CONDITIONAL FIELDS:
        - adverse_effect_specify (string): Specific adverse effect name if outcome_type=5 (e.g., "Drowsiness (sleepy, tired)", "Paraesthesia oral", "Vomiting"). Use "NA" if outcome_type≠5
        - other_outcome_specify (string): Detailed description if outcome_type=6 (e.g., "Time to meaningful pain relief", "Pain intensity difference"). Use "NA" if outcome_type≠6
        - adverse_effects_all_study (string): List of all adverse effects if not reported per study arm, or "NA" if reported per arm
        
        DOCUMENTATION FIELDS:
        - extraction_notes (string): Technical documentation including data source ("From Table 2", "From Figure 5", "From Supplementary Table 3"), extraction method ("Direct from table", "Interpreted from Kaplan-Meier curve"), and population used ("Per-protocol population", "Safety population", "ITT population")
        - comments (string): Study-specific information including single vs multiple dose design, surgical techniques mentioned, methodological features, dropout rates, calculation details
        
        EXTRACTION REQUIREMENTS:
        - Extract EVERY outcome reported, including zero-event outcomes (0/N)
        - Create separate entries for each time point assessment
        - Include outcomes from ALL data sources (main text, figures, supplements)
        - Use appropriate analysis populations (efficacy vs safety)
        - Document any calculations or interpretations performed
        
        Example: [
          {"outcome_type": 1, "follow_up_time": "6 hours", "n_analyzed": 40, "n_events_number": 15, "n_events_percentage": 37.5, "adverse_effect_specify": "NA", "other_outcome_specify": "NA", "adverse_effects_all_study": "NA", "extraction_notes": "From Table 3, per-protocol population", "comments": "single dose study with overnight monitoring"},
          {"outcome_type": 5, "follow_up_time": "24 hours", "n_analyzed": 40, "n_events_number": 7, "n_events_percentage": 17.5, "adverse_effect_specify": "Drowsiness (sleepy, tired)", "other_outcome_specify": "NA", "adverse_effects_all_study": "NA", "extraction_notes": "From safety table, safety population", "comments": "mild to moderate severity"}
        ]"""
    )


class StructureComprehensiveOutcome(dspy.Signature):
    """Structure extracted data into the final comprehensive dichotomous outcome format.
    
    This combines study metadata, intervention details, and any outcome data (rescue analgesia,
    adverse events, or other outcomes) into the standardized format used for systematic review
    and meta-analysis. Each record represents one outcome for one intervention in one study.
    """
    
    study_metadata_json: str = dspy.InputField(desc="Study metadata as JSON string with first_author,  population_code")
    intervention_json: str = dspy.InputField(desc="Single intervention details as JSON string with intervention_code, intervention_description, n_analyzed")
    outcome_json: str = dspy.InputField(desc="Single outcome details as JSON string with all outcome fields including outcome_type, follow_up_time, n_events_number, etc.")
    
    structured_record_json: str = dspy.OutputField(
        desc="""Complete structured record as JSON string with exactly these fields:
        - First_Author (string): First author last name (e.g., "Cooper")
        - Population (integer): Population code (1-4)
        - Intervention_Code (integer): Intervention code (1-11)
        - Intervention_Description (string): Full intervention description with dose
        - Outcome_Type (integer): Outcome type (1=rescue analgesia 6h, 2=rescue analgesia 4h, 4=rescue analgesia pulpitis, 5=adverse effects, 6=other)
        - Outcome_Other_Specify (string): Detailed outcome description for type 6, or empty string for other types
        - Follow_Up_Time (string): Time point (e.g., "24hrs", "6 hours")
        - N_Analyzed (integer): Number of participants analyzed
        - Adverse_Effect_Specify (string): Specific adverse effect name for type 5, or empty string for other types
        - Adverse_Effects_All_Study (string): All study adverse effects if not reported per arm, or empty string
        - N_Events_Number (integer): Number of patients with this outcome
        - N_Events_Percentage (float): Percentage of patients with this outcome
        - Comments (string): Study-specific methodology, design notes, and extraction details
        
        FIELD MAPPING RULES:
        - For outcome_type 1,2,4 (rescue analgesia): Adverse_Effect_Specify="" and Outcome_Other_Specify=""
        - For outcome_type 5 (adverse effects): Outcome_Other_Specify="" and Adverse_Effect_Specify=specific adverse event name
        - For outcome_type 6 (other outcomes): Adverse_Effect_Specify="" and Outcome_Other_Specify=detailed outcome description
        - Always include extraction methodology and data source information in Comments
        - Ensure mathematical validation: (N_Events_Number/N_Analyzed)*100 = N_Events_Percentage
        - Use appropriate analysis populations (efficacy vs safety) based on outcome type
        
        Example: {"First_Author": "Cooper",  "Population": 2, "Intervention_Code": 10, "Intervention_Description": "Naproxen sodium 440mg", "Outcome_Type": 5, "Outcome_Other_Specify": "", "Follow_Up_Time": "24hrs", "N_Analyzed": 166, "Adverse_Effect_Specify": "Paraesthesia oral", "Adverse_Effects_All_Study": "", "N_Events_Number": 1, "N_Events_Percentage": 0.6, "Comments": "extracted from supplementary table 3, safety population, single dose study"}"""
    )

## 5. DSPy Module Implementation

Now we'll create DSPy modules that use these signatures with reasoning patterns:


In [88]:
class AsyncStudyMetadataExtractor(dspy.Module):
    """Async module to extract study metadata using chain of thought reasoning."""
    
    def __init__(self):
        super().__init__()
        self.extract_metadata = dspy.ChainOfThought(ExtractStudyMetadata)
    
    async def __call__(self, markdown_content: str) -> Dict[str, Any]:
        """Async call method that runs DSPy call in thread pool."""
        loop = asyncio.get_running_loop()
        
        def _extract():
            return self.extract_metadata(markdown_content=markdown_content)
        
        # Run DSPy call in thread pool to avoid blocking event loop
        result = await loop.run_in_executor(None, _extract)
        
        return {
            "first_author": result.first_author,
            "population_code": result.population_code
        }
    
    def forward_sync(self, markdown_content: str) -> Dict[str, Any]:
        """Fallback sync method for backwards compatibility."""
        result = self.extract_metadata(markdown_content=markdown_content)
        return {
            "first_author": result.first_author,
            "population_code": result.population_code
        }


class AsyncInterventionExtractor(dspy.Module):
    """Async module to extract intervention details."""
    
    def __init__(self):
        super().__init__()
        self.extract_interventions = dspy.ChainOfThought(ExtractInterventions)
    
    async def __call__(self, markdown_content: str) -> List[Dict[str, Any]]:
        """Async call method with error handling."""
        loop = asyncio.get_running_loop()
        
        def _extract():
            return self.extract_interventions(markdown_content=markdown_content)
        
        try:
            result = await loop.run_in_executor(None, _extract)
            return safe_json_parse(result.interventions_json)
        except json.JSONDecodeError as e:
            print(f"Error parsing interventions JSON: {e}, returning empty list")
            return []
        except Exception as e:
            print(f"Error in intervention extraction: {e}, returning empty list")
            return []
    
    def forward_sync(self, markdown_content: str) -> List[Dict[str, Any]]:
        """Fallback sync method."""
        result = self.extract_interventions(markdown_content=markdown_content)
        try:
            return safe_json_parse(result.interventions_json)
        except json.JSONDecodeError:
            print("Error parsing interventions JSON, returning empty list")
            return []


class AsyncOutcomeExtractor(dspy.Module):
    """Async module to extract all outcomes for a specific intervention."""
    
    def __init__(self):
        super().__init__()
        self.extract_all_outcomes = dspy.ChainOfThought(ExtractAllOutcomes)
    
    async def __call__(self, markdown_content: str, intervention_description: str) -> List[Dict[str, Any]]:
        """Async call method with robust error handling."""
        loop = asyncio.get_running_loop()
        
        def _extract():
            return self.extract_all_outcomes(
                markdown_content=markdown_content,
                intervention_description=intervention_description
            )
        
        try:
            result = await loop.run_in_executor(None, _extract)
            return safe_json_parse(result.all_outcomes_json)
        except json.JSONDecodeError as e:
            print(f"Error parsing outcomes JSON for '{intervention_description}': {e}")
            return []
        except Exception as e:
            print(f"Error in outcome extraction for '{intervention_description}': {e}")
            return []
    
    def forward_sync(self, markdown_content: str, intervention_description: str) -> List[Dict[str, Any]]:
        """Fallback sync method."""
        result = self.extract_all_outcomes(
            markdown_content=markdown_content,
            intervention_description=intervention_description
        )
        try:
            return safe_json_parse(result.all_outcomes_json)
        except json.JSONDecodeError:
            print("Error parsing outcomes JSON, returning empty list")
            return []


class AsyncDataStructurer(dspy.Module):
    """Async module to structure data into final format."""
    
    def __init__(self):
        super().__init__()
        self.structure_data = dspy.ChainOfThought(StructureComprehensiveOutcome)
    
    async def __call__(self, study_metadata: Dict, intervention: Dict, outcome: Dict) -> Dict[str, Any]:
        """Async call method with comprehensive error handling."""
        loop = asyncio.get_running_loop()
        
        def _structure():
            return self.structure_data(
                study_metadata_json=json.dumps(study_metadata),
                intervention_json=json.dumps(intervention),
                outcome_json=json.dumps(outcome)
            )
        
        try:
            result = await loop.run_in_executor(None, _structure)
            return safe_json_parse(result.structured_record_json)
        except json.JSONDecodeError as e:
            print(f"Error parsing structured record JSON: {e}, using fallback")
            return self._create_fallback_record(study_metadata, intervention, outcome)
        except Exception as e:
            print(f"Error in data structuring: {e}, using fallback")
            return self._create_fallback_record(study_metadata, intervention, outcome)
    
    def _create_fallback_record(self, study_metadata: Dict, intervention: Dict, outcome: Dict) -> Dict[str, Any]:
        """Create fallback structured record when DSPy fails."""
        return {
            "First_Author": study_metadata.get("first_author", ""),
            "Population": study_metadata.get("population_code", ""),
            "Intervention_Code": intervention.get("intervention_code", ""),
            "Intervention_Description": intervention.get("intervention_description", ""),
            "Outcome_Type": outcome.get("outcome_type", 5),
            "Outcome_Other_Specify": outcome.get("other_outcome_specify", ""),
            "Follow_Up_Time": outcome.get("follow_up_time", ""),
            "N_Analyzed": intervention.get("n_analyzed", ""),
            "Adverse_Effect_Specify": outcome.get("adverse_effect_specify", ""),
            "Adverse_Effects_All_Study": outcome.get("adverse_effects_all_study", ""),
            "N_Events_Number": outcome.get("n_events_number", ""),
            "N_Events_Percentage": outcome.get("n_events_percentage", ""),
            "Comments": f"{outcome.get('extraction_notes', '')} {outcome.get('comments', '')}".strip(),
        }
    
    def forward_sync(self, study_metadata: Dict, intervention: Dict, outcome: Dict) -> Dict[str, Any]:
        """Fallback sync method."""
        result = self.structure_data(
            study_metadata_json=json.dumps(study_metadata),
            intervention_json=json.dumps(intervention),
            outcome_json=json.dumps(outcome)
        )
        try:
            return safe_json_parse(result.structured_record_json)
        except json.JSONDecodeError:
            print("Error parsing structured record JSON, returning basic structure")
            return self._create_fallback_record(study_metadata, intervention, outcome)


print("Async DSPy modules defined successfully")

Async DSPy modules defined successfully


## 6. Complete Extraction Pipeline


In [89]:
class AsyncMedicalDataExtractionPipeline(dspy.Module):
    """Complete async pipeline for extracting structured data from medical research papers."""
    
    def __init__(self, max_concurrent: int = 5):
        super().__init__()
        self.metadata_extractor = AsyncStudyMetadataExtractor()
        self.intervention_extractor = AsyncInterventionExtractor()
        self.outcome_extractor = AsyncOutcomeExtractor()
        self.data_structurer = AsyncDataStructurer()
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def forward(self, markdown_content: str):
        """Extract all structured records from markdown content asynchronously."""
        
        # Step 1 & 2: Extract metadata and interventions concurrently
        print("Extracting metadata and interventions concurrently...")
        
        metadata_task = self.metadata_extractor(markdown_content)
        interventions_task = self.intervention_extractor(markdown_content)
        
        study_metadata, interventions = await asyncio.gather(metadata_task, interventions_task)
        
        print(f"Study metadata: {study_metadata}")
        print(f"Found {len(interventions)} interventions")
        
        # Step 3: Process all interventions concurrently
        all_records = []
        
        if interventions:
            outcome_tasks = []
            for intervention in interventions:
                task = self._process_intervention_outcomes(
                    markdown_content, study_metadata, intervention
                )
                outcome_tasks.append(task)
            
            # Gather all intervention results
            intervention_results = await asyncio.gather(*outcome_tasks)
            
            # Flatten results
            for records in intervention_results:
                all_records.extend(records)
        
        print(f"Total records extracted: {len(all_records)}")
        return dspy.Prediction(extracted_records=all_records)
        #return all_records
    
    async def _process_intervention_outcomes(self, markdown_content: str, 
                                           study_metadata: Dict, intervention: Dict) -> List[Dict]:
        """Process outcomes for a single intervention with semaphore control."""
        async with self.semaphore:
            intervention_desc = intervention.get('intervention_description', '')
            print(f"Processing intervention: {intervention_desc}")
            
            outcomes = await self.outcome_extractor(markdown_content, intervention_desc)
            print(f"Found {len(outcomes)} outcomes for {intervention_desc}")
            
            # Structure all outcomes for this intervention concurrently
            if outcomes:
                structure_tasks = [
                    self.data_structurer(study_metadata, intervention, outcome)
                    for outcome in outcomes
                ]
                structured_records = await asyncio.gather(*structure_tasks)
                return structured_records
            
            return []
    
   

print("Async extraction pipeline defined successfully")

Async extraction pipeline defined successfully


In [90]:
class SyncPipelineWrapper(dspy.Module):
    """Synchronous wrapper for async pipeline to work with DSPy optimizers."""
    
    def __init__(self):
        super().__init__()
        self.async_pipeline = None
    
    def forward(self, markdown_content: str):
        """Synchronous forward method that runs async pipeline."""
        if self.async_pipeline is None:
            # Create your async pipeline instance
            self.async_pipeline = AsyncMedicalDataExtractionPipeline()
        
        # Handle event loop situations
        try:
            # Check if there's already an event loop running
            loop = asyncio.get_running_loop()
            if loop.is_running():
                # If loop is running, we need nest_asyncio
                try:
                    import nest_asyncio
                    nest_asyncio.apply()
                except ImportError:
                    raise ImportError("Please install nest_asyncio: pip install nest_asyncio")
        except RuntimeError:
            # No event loop exists, which is fine
            pass
        
        # Run the async pipeline synchronously
        result = asyncio.run(self.async_pipeline.forward(markdown_content))
        return result


# Create the sync wrapper
sync_pipeline = SyncPipelineWrapper()
# Run the pipeline (this calls your async pipeline synchronously)
print("Initializing AsyncMedicalDataExtractionPipeline through SyncPipelineWrapper...")

#     result = sync_pipeline.forward(markdown_content) 
#     print(f"Pipeline completed successfully!")
#     print(f"Number of records extracted: {len(result.extracted_records)}")
#     # Print first few records
#     for i, record in enumerate(result.extracted_records[:1]):
#         print(f"\nRecord {i+1}:")
#         for key, value in record.items():
#             print(f"  {key}: {value}") 
#     print(f"\nTotal records: {len(result.extracted_records)}")

Initializing AsyncMedicalDataExtractionPipeline through SyncPipelineWrapper...


## 7. Evaluation Setup


#### The 4 Core Outcomes in Classification

When evaluating a system’s predictions against the **ground truth**, each result falls into one of four categories:

#### 1. **True Positive (TP)**
- System says **“Yes”**
- Ground truth is **“Yes”**
- ✅ Correct detection  
- **Example:** System extracts `First_Author = Cooper`, and ground truth really has `Cooper`.



### 2. **False Positive (FP)**
- System says **“Yes”**
- Ground truth is **“No”**
- ❌ Wrong detection (system “hallucinated”)  
- **Example:** System extracts `First_Author = Jones`, but ground truth has `Smith`.



#### 3. **True Negative (TN)**
- System says **“No”**
- Ground truth is **“No”**
- ✅ Correct rejection  
- **Example:** Ground truth has no `Adverse_Effect_Specify`, and system also leaves it empty.



#### 4. **False Negative (FN)**
- System says **“No”**
- Ground truth is **“Yes”**
- ❌ Missed detection (system failed to extract)  
- **Example:** Ground truth has `Trial_Name = MOLAR`, but system extracts nothing (or extracts wrong value).



#### In `MedicalExtractionEvaluator` Context

- **TP (True Positive)** → A field value was extracted **and** it matched the ground truth.  
- **FP (False Positive)** → A field value was extracted, but it was **wrong** (mismatch) or **extra** (system filled something that shouldn’t exist).  
- **FN (False Negative)** → A ground-truth field existed, but the system didn’t produce it (missing record or missing field).  
- **TN (True Negative)** → Neither system nor ground truth had a value for a field.  

**Note:** TNs are **not explicitly tracked** in the evaluator, because in information extraction tasks the number of “true negatives” is usually very large and not informative. This is common in IR/NLP evaluation — most focus only on TP, FP, FN.


In [91]:
# class AsyncMedicalExtractionEvaluator:
#     """Async evaluator for medical data extraction with semantic matching and caching."""
    
#     def __init__(self, use_semantic=True, semantic_threshold=0.8, max_concurrent=10):
#         self.required_fields = [
#             'First_Author', 'Population', 'Intervention_Code', 'Intervention_Description', 
#             'Outcome_Type', 'Follow_Up_Time', 'N_Analyzed', 'Adverse_Effect_Specify',
#             'N_Events_Number', 'N_Events_Percentage', 'Comments'
#         ]
        
#         self.use_semantic = use_semantic
#         self.semantic_threshold = semantic_threshold
#         self.semaphore = asyncio.Semaphore(max_concurrent)
        
#         # Caches
#         self._matching_cache = {}
#         self._embedding_cache = {}
        
#         if self.use_semantic:
#             self.semantic_model = SentenceTransformer('all-MiniLM-L6-v2')
#             self.semantic_fields = ['Intervention_Description', 'Adverse_Effect_Specify', 'Follow_Up_Time']
#             self.exact_fields = ['First_Author', 'Population', 'Intervention_Code', 'Outcome_Type', 
#                                'N_Analyzed', 'N_Events_Number', 'N_Events_Percentage']
    
#     async def _get_embedding(self, text: str):
#         """Get embedding for text with async caching."""
#         if text in self._embedding_cache:
#             return self._embedding_cache[text]
        
#         # Run embedding generation in thread pool
#         loop = asyncio.get_running_loop()
#         embedding = await loop.run_in_executor(None, self.semantic_model.encode, text)
        
#         self._embedding_cache[text] = embedding
#         return embedding
    
#     async def semantic_similarity(self, text1: str, text2: str) -> float:
#         """Calculate semantic similarity asynchronously."""
#         if not self.use_semantic or not text1.strip() or not text2.strip():
#             return 1.0 if text1.strip() == text2.strip() else 0.0
        
#         # Get embeddings concurrently
#         embedding1_task = self._get_embedding(text1)
#         embedding2_task = self._get_embedding(text2)
        
#         embedding1, embedding2 = await asyncio.gather(embedding1_task, embedding2_task)
        
#         # Calculate similarity in thread pool
#         loop = asyncio.get_running_loop()
#         similarity = await loop.run_in_executor(
#             None, lambda: cosine_similarity([embedding1], [embedding2])[0][0]
#         )
        
#         return float(similarity)
    
#     async def field_match_score(self, extracted_value: Any, ground_truth_value: Any, field_name: str) -> float:
#         """Calculate field-level match score asynchronously."""
#         ext_val = str(extracted_value).strip() if extracted_value is not None else ""
#         gt_val = str(ground_truth_value).strip() if ground_truth_value is not None else ""
        
#         if not self.use_semantic or field_name in self.exact_fields:
#             return 1.0 if ext_val.lower() == gt_val.lower() else 0.0
        
#         if field_name in self.semantic_fields:
#             if not ext_val and not gt_val:
#                 return 1.0
#             if not ext_val or not gt_val:
#                 return 0.0
            
#             similarity = await self.semantic_similarity(ext_val, gt_val)
#             return 1.0 if similarity >= self.semantic_threshold else 0.0
        
#         return 1.0 if ext_val.lower() == gt_val.lower() else 0.0
    
#     async def calculate_record_similarity(self, extracted_record: Dict, ground_truth_record: Dict) -> float:
#         """Calculate similarity between two records asynchronously."""
#         async with self.semaphore:
#             # Gather all field comparisons concurrently
#             field_tasks = []
#             for field in self.required_fields:
#                 if field in extracted_record and field in ground_truth_record:
#                     task = self.field_match_score(
#                         extracted_record[field], 
#                         ground_truth_record[field], 
#                         field
#                     )
#                     field_tasks.append(task)
            
#             if not field_tasks:
#                 return 0.0
            
#             match_scores = await asyncio.gather(*field_tasks)
#             return sum(match_scores) / len(match_scores)
    
#     async def _compute_similarity_matrix(self, extracted_records: List[Dict], ground_truth_records: List[Dict]):
#         """Compute similarity matrix asynchronously."""
#         n_extracted = len(extracted_records)
#         n_ground_truth = len(ground_truth_records)
        
#         # Create all similarity computation tasks
#         tasks = []
#         for i in range(n_extracted):
#             for j in range(n_ground_truth):
#                 task = self.calculate_record_similarity(extracted_records[i], ground_truth_records[j])
#                 tasks.append((i, j, task))
        
#         # Execute all similarity computations concurrently
#         results = await asyncio.gather(*[task for _, _, task in tasks])
        
#         # Build similarity matrix
#         similarity_matrix = np.zeros((n_extracted, n_ground_truth))
#         for idx, (i, j, _) in enumerate(tasks):
#             similarity_matrix[i][j] = results[idx]
        
#         return similarity_matrix
    
#     async def hungarian_matching(self, extracted_records: List[Dict], ground_truth_records: List[Dict]) -> List[Tuple[int, int, float]]:
#         """Compute Hungarian matching asynchronously with caching."""
#         cache_key = f"{len(extracted_records)}_{len(ground_truth_records)}_{hash(str(extracted_records) + str(ground_truth_records))}"
        
#         if cache_key in self._matching_cache:
#             return self._matching_cache[cache_key]
        
#         if not extracted_records or not ground_truth_records:
#             return []
        
#         # Compute similarity matrix asynchronously
#         similarity_matrix = await self._compute_similarity_matrix(extracted_records, ground_truth_records)
        
#         # Apply Hungarian algorithm in thread pool
#         loop = asyncio.get_running_loop()
#         cost_matrix = 1.0 - similarity_matrix
        
#         # Pad matrix to square if needed
#         n_extracted, n_ground_truth = similarity_matrix.shape
#         max_size = max(n_extracted, n_ground_truth)
#         if max_size > max(n_extracted, n_ground_truth):
#             padded_cost = np.ones((max_size, max_size))
#             padded_cost[:n_extracted, :n_ground_truth] = cost_matrix
#             cost_matrix = padded_cost
        
#         row_indices, col_indices = await loop.run_in_executor(None, linear_sum_assignment, cost_matrix)
        
#         # Extract valid matches
#         matches = []
#         for i, j in zip(row_indices, col_indices):
#             if i < n_extracted and j < n_ground_truth:
#                 similarity = similarity_matrix[i][j]
#                 if similarity > 0.0:
#                     matches.append((i, j, similarity))
        
#         # Cache results
#         self._matching_cache[cache_key] = matches
#         return matches
    
#     async def evaluate_record_metrics(self, extracted_records: List[Dict], ground_truth: List[Dict]) -> Dict[str, float]:
#         """Calculate record-level metrics asynchronously."""
#         matches = await self.hungarian_matching(extracted_records, ground_truth)
#         valid_matches = [match for match in matches if match[2] >= 0.5]
        
#         true_positives = len(valid_matches)
#         false_positives = len(extracted_records) - true_positives
#         false_negatives = len(ground_truth) - true_positives
        
#         precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0.0
#         recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0.0
#         f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
        
#         return {
#             'precision': precision, 'recall': recall, 'f1': f1,
#             'TP': true_positives, 'FP': false_positives, 'FN': false_negatives
#         }
    
#     def evaluate_completeness(self, extracted_records: List[Dict]) -> float:
#         """Evaluate field completeness (synchronous)."""
#         if not extracted_records:
#             return 0.0
        
#         total_fields = len(self.required_fields) * len(extracted_records)
#         filled_fields = sum(
#             1 for record in extracted_records 
#             for field in self.required_fields 
#             if field in record and record[field] is not None and str(record[field]).strip()
#         )
        
#         return filled_fields / total_fields if total_fields > 0 else 0.0
    
#     async def evaluate_accuracy(self, extracted_records: List[Dict], ground_truth: List[Dict]) -> Dict[str, float]:
#         """Evaluate extraction accuracy asynchronously."""
#         if not extracted_records or not ground_truth:
#             return {"precision": 0.0, "recall": 0.0, "f1": 0.0}
        
#         record_metrics = await self.evaluate_record_metrics(extracted_records, ground_truth)
        
#         return {
#             **record_metrics,
#             "completeness": self.evaluate_completeness(extracted_records)
#         }
    
#     async def evaluate(self, extracted_records: List[Dict], ground_truth: List[Dict] = None) -> Dict[str, Any]:
#         """Complete evaluation asynchronously."""
#         results = {
#             "num_extracted": len(extracted_records),
#             "completeness": self.evaluate_completeness(extracted_records),
#             "semantic_enabled": self.use_semantic,
#             "semantic_threshold": self.semantic_threshold if self.use_semantic else None
#         }
        
#         if ground_truth:
#             accuracy_results = await self.evaluate_accuracy(extracted_records, ground_truth)
#             results.update(accuracy_results)
#             results["num_ground_truth"] = len(ground_truth)
        
#         return results
        
# print("Async medical extraction evaluator defined successfully")

In [92]:
import asyncio
import numpy as np
from typing import Dict, List, Any, Tuple
from scipy.optimize import linear_sum_assignment
import dspy


class SemanticMatcher(dspy.Signature):
    """Determine if two medical texts are semantically equivalent for data extraction purposes."""
    text1: str = dspy.InputField(desc="First medical text to compare")
    text2: str = dspy.InputField(desc="Second medical text to compare")
    field_context: str = dspy.InputField(desc="Medical field being compared (e.g., Intervention_Description)")
    is_equivalent: str = dspy.OutputField(desc="Yes/No/True/False if texts are equivalent")


class AsyncMedicalExtractionEvaluator:
    """Async evaluator for medical data extraction with DSPy-based semantic matching and caching."""

    def __init__(self, use_semantic=True, max_concurrent=10):
        self.required_fields = [
            'First_Author', 'Population', 'Intervention_Code', 'Intervention_Description',
            'Outcome_Type', 'Follow_Up_Time', 'N_Analyzed', 'Adverse_Effect_Specify',
            'N_Events_Number', 'N_Events_Percentage', 'Comments'
        ]

        self.use_semantic = use_semantic
        self.semaphore = asyncio.Semaphore(max_concurrent)

        # Caches
        self._matching_cache = {}
        self._semantic_cache = {}

        # Field types
        self.semantic_fields = ['Intervention_Description', 'Adverse_Effect_Specify', 'Follow_Up_Time']
        self.exact_fields = [
            'First_Author', 'Population', 'Intervention_Code', 'Outcome_Type',
            'N_Analyzed', 'N_Events_Number', 'N_Events_Percentage'
        ]

    # ----------------------------
    # Core Semantic Matching
    # ----------------------------
    async def _run_dspy_semantic_match(self, text1: str, text2: str, field_name: str) -> bool:
        """Run DSPy semantic matching using GPT-4o-mini, normalized to boolean."""
        try:
            with dspy.context(lm=dspy.LM("openai/gpt-4o-mini")):
                matcher = dspy.Predict(SemanticMatcher)
                result = matcher(text1=text1, text2=text2, field_context=field_name)
                log_history()

            raw = getattr(result, "is_equivalent", "")
            normalized = str(raw).strip().lower()
            is_equiv = normalized in ["true", "yes", "1"]

            print(f"[SemanticMatch] {field_name}: '{text1}' vs '{text2}' → {raw} → {is_equiv}")
            return is_equiv

        except Exception as e:
            print(f"[SemanticMatch ERROR] {field_name}: {e}")
            # Fallback exact match
            return text1.strip().lower() == text2.strip().lower()

    async def semantic_similarity(self, text1: str, text2: str, field_name: str) -> float:
        """Calculate semantic similarity (0/1) with caching."""
        if not self.use_semantic or not text1.strip() or not text2.strip():
            return 1.0 if text1.strip() == text2.strip() else 0.0

        cache_key = f"{text1.strip()}||{text2.strip()}||{field_name}"
        if cache_key in self._semantic_cache:
            return self._semantic_cache[cache_key]

        is_equiv = await self._run_dspy_semantic_match(text1, text2, field_name)
        score = 1.0 if is_equiv else 0.0

        self._semantic_cache[cache_key] = score
        return score

    # ----------------------------
    # Field-Level Matching
    # ----------------------------
    async def field_match_score(self, extracted_value: Any, ground_truth_value: Any, field_name: str) -> float:
        ext_val = str(extracted_value).strip() if extracted_value is not None else ""
        gt_val = str(ground_truth_value).strip() if ground_truth_value is not None else ""

        # Exact fields
        if not self.use_semantic or field_name in self.exact_fields:
            return 1.0 if ext_val.lower() == gt_val.lower() else 0.0

        # Semantic fields
        if field_name in self.semantic_fields:
            if not ext_val and not gt_val:
                return 1.0
            if not ext_val or not gt_val:
                return 0.0
            return await self.semantic_similarity(ext_val, gt_val, field_name)

        # Default to exact
        return 1.0 if ext_val.lower() == gt_val.lower() else 0.0

    # ----------------------------
    # Record-Level Matching
    # ----------------------------
    async def calculate_record_similarity(self, extracted_record: Dict, ground_truth_record: Dict) -> float:
        async with self.semaphore:
            field_tasks = [
                self.field_match_score(extracted_record[field], ground_truth_record[field], field)
                for field in self.required_fields
                if field in extracted_record and field in ground_truth_record
            ]
            if not field_tasks:
                return 0.0
            scores = await asyncio.gather(*field_tasks)
            return sum(scores) / len(scores)

    async def _compute_similarity_matrix(self, extracted_records: List[Dict], ground_truth_records: List[Dict]):
        n_extracted, n_ground_truth = len(extracted_records), len(ground_truth_records)
        tasks = [(i, j, self.calculate_record_similarity(extracted_records[i], ground_truth_records[j]))
                 for i in range(n_extracted) for j in range(n_ground_truth)]

        results = await asyncio.gather(*[t for _, _, t in tasks])
        matrix = np.zeros((n_extracted, n_ground_truth))
        for idx, (i, j, _) in enumerate(tasks):
            matrix[i][j] = results[idx]
        return matrix

    async def hungarian_matching(self, extracted_records: List[Dict], ground_truth_records: List[Dict]):
        if not extracted_records or not ground_truth_records:
            return []

        cache_key = f"{len(extracted_records)}_{len(ground_truth_records)}_{hash(str(extracted_records)+str(ground_truth_records))}"
        if cache_key in self._matching_cache:
            return self._matching_cache[cache_key]

        sim_matrix = await self._compute_similarity_matrix(extracted_records, ground_truth_records)
        cost_matrix = 1.0 - sim_matrix

        row_idx, col_idx = linear_sum_assignment(cost_matrix)
        matches = [(i, j, sim_matrix[i][j]) for i, j in zip(row_idx, col_idx) if sim_matrix[i][j] > 0.0]

        self._matching_cache[cache_key] = matches
        return matches

    # ----------------------------
    # Metrics
    # ----------------------------
    async def evaluate_record_metrics(self, extracted_records: List[Dict], ground_truth: List[Dict]):
        matches = await self.hungarian_matching(extracted_records, ground_truth)
        valid_matches = [m for m in matches if m[2] >= 0.5]

        TP = len(valid_matches)
        FP = len(extracted_records) - TP
        FN = len(ground_truth) - TP

        precision = TP / (TP + FP) if TP + FP > 0 else 0.0
        recall = TP / (TP + FN) if TP + FN > 0 else 0.0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

        return {"precision": precision, "recall": recall, "f1": f1, "TP": TP, "FP": FP, "FN": FN}

    def evaluate_completeness(self, extracted_records: List[Dict]) -> float:
        if not extracted_records:
            return 0.0
        total = len(self.required_fields) * len(extracted_records)
        filled = sum(
            1 for r in extracted_records for f in self.required_fields
            if f in r and r[f] is not None and str(r[f]).strip()
        )
        return filled / total if total > 0 else 0.0

    async def evaluate_accuracy(self, extracted_records: List[Dict], ground_truth: List[Dict]):
        if not extracted_records or not ground_truth:
            return {"precision": 0.0, "recall": 0.0, "f1": 0.0}
        rec_metrics = await self.evaluate_record_metrics(extracted_records, ground_truth)
        return {**rec_metrics, "completeness": self.evaluate_completeness(extracted_records)}

    async def evaluate(self, extracted_records: List[Dict], ground_truth: List[Dict] = None) -> Dict[str, Any]:
        results = {
            "num_extracted": len(extracted_records),
            "completeness": self.evaluate_completeness(extracted_records),
            "semantic_enabled": self.use_semantic,
        }
        if ground_truth:
            acc = await self.evaluate_accuracy(extracted_records, ground_truth)
            results.update(acc)
            results["num_ground_truth"] = len(ground_truth)
        return results


In [93]:
# Create an instance of your evaluator
evaluator = AsyncMedicalExtractionEvaluator(use_semantic=True)

def medical_extraction_metric(example, pred, trace=None):
    """Wrapper function for DSPy optimizer."""
    # Extract the data from DSPy format
    extracted_records = pred.extracted_records  # however your prediction is structured
    ground_truth = example.ground_truth  # however your ground truth is stored
    
    # Run the async evaluation synchronously
    import asyncio
    
    # Handle event loop - DSPy might already have one running
    try:
        loop = asyncio.get_running_loop()
        # If we're in an async context, create a new thread
        import concurrent.futures
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(asyncio.run, evaluator.evaluate_accuracy(extracted_records, ground_truth))
            results = future.result()
    except RuntimeError:
        # No event loop running, safe to use asyncio.run
        results = asyncio.run(evaluator.evaluate_accuracy(extracted_records, ground_truth))
    
    # Return the metric DSPy should optimize (higher is better)
    return results['f1']


## 8. MedicalFileHandler


In [94]:
import asyncio
import json
import aiofiles
import pandas as pd
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any


class AsyncMedicalFileHandler:
    """Async file handler for medical data extraction pipeline."""
    
    def __init__(self, default_output_dir: str = None, default_csv_dir: str = "/nlp/data/karthik9/Sprint1/Dental/Data/csvs", 
                 default_json_path: str = "/nlp/data/karthik9/Sprint1/Dental/Data/ev_jsons/do_evaluation_results.json"):
        self.default_output_dir = default_output_dir
        self.default_csv_dir = default_csv_dir
        self.default_json_path = default_json_path
    
    def _generate_output_filename(self, source_file_path: str) -> str:
        """Generate output filename from source filename."""
        source_path = Path(source_file_path)
        source_name = source_path.stem
        
        if source_name.endswith('_md'):
            output_name = source_name[:-3] + '_do'
        else:
            output_name = source_name + '_do'
        
        return output_name + '.json'
    
    async def save_extracted_results(self, extracted_records: List[Dict], 
                                   source_file_path: str, 
                                   output_dir: str = None, 
                                   override: bool = False) -> str:
        """Save extracted results to JSON file asynchronously."""
        try:
            output_filename = self._generate_output_filename(source_file_path)
            
            if output_dir is None:
                if self.default_output_dir:
                    output_dir = Path(self.default_output_dir)
                else:
                    output_dir = Path(source_file_path).parent
            else:
                output_dir = Path(output_dir)
            
            output_dir.mkdir(parents=True, exist_ok=True)
            output_path = output_dir / output_filename
            
            if output_path.exists() and not override:
                print(f"Output file already exists: {output_path}")
                print("Use override=True to overwrite, or file will be skipped")
                return None
            
            save_data = {
                "metadata": {
                    "source_file": str(source_file_path),
                    "extraction_timestamp": datetime.now().isoformat(),
                    "total_records": len(extracted_records),
                    "pipeline_version": "DSPy_Async_1.0"
                },
                "extracted_records": extracted_records
            }
            
            async with aiofiles.open(output_path, 'w', encoding='utf-8') as f:
                await f.write(json.dumps(save_data, indent=2, ensure_ascii=False))
            
            print(f"Successfully saved {len(extracted_records)} records to: {output_path}")
            return str(output_path)
            
        except Exception as e:
            print(f"Error saving results: {e}")
            return None
    
    async def save_evaluation_to_csv(self, baseline_results: List[Dict], ground_truth: List[Dict], 
                                   source_file: str, matches: List[tuple], csv_dir: str = None, 
                                   override: bool = False):
        """Save evaluation results to CSV asynchronously."""
        csv_dir = csv_dir or self.default_csv_dir
        Path(csv_dir).mkdir(parents=True, exist_ok=True)
        csv_path = Path(csv_dir) / "do_evaluation_results.csv"
        
        # Prepare data rows
        rows = []
        matched_gt_indices = set()
        matched_ext_indices = set()
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        # Add matched pairs (TP) - FIXED: Process ground truth and extracted separately but consecutively
        for ext_idx, gt_idx, score in matches:
            if score >= 0.5:
                matched_gt_indices.add(gt_idx)
                matched_ext_indices.add(ext_idx)
                
                # Ground truth row (TP - correctly found)
                gt_row = ground_truth[gt_idx].copy()
                gt_row.update({
                    'data_type': 'ground_truth', 
                    'source_file': source_file,
                    'match_score': score, 
                    'pair_id': f"{source_file}_{gt_idx}",
                    'classification': 'TP', 
                    'timestamp': timestamp
                })
                rows.append(gt_row)
                
                # Extracted row (TP - correct extraction)
                ext_row = baseline_results[ext_idx].copy()
                ext_row.update({
                    'data_type': 'extracted', 
                    'source_file': source_file,
                    'match_score': score, 
                    'pair_id': f"{source_file}_{gt_idx}",
                    'classification': 'TP', 
                    'timestamp': timestamp
                })
                rows.append(ext_row)
        
        # Add unmatched ground truth (FN)
        for gt_idx, gt_record in enumerate(ground_truth):
            if gt_idx not in matched_gt_indices:
                row = gt_record.copy()
                row.update({
                    'data_type': 'ground_truth', 
                    'source_file': source_file,
                    'match_score': 0.0, 
                    'pair_id': f"{source_file}_{gt_idx}_missing",
                    'classification': 'FN', 
                    'timestamp': timestamp
                })
                rows.append(row)
        
        # Add unmatched extractions (FP)
        for ext_idx, ext_record in enumerate(baseline_results):
            if ext_idx not in matched_ext_indices:
                row = ext_record.copy()
                row.update({
                    'data_type': 'extracted', 
                    'source_file': source_file,
                    'match_score': 0.0, 
                    'pair_id': f"{source_file}_fp_{ext_idx}",
                    'classification': 'FP', 
                    'timestamp': timestamp
                })
                rows.append(row)
        
        # Save to CSV asynchronously
        new_df = pd.DataFrame(rows)
        
        if not new_df.empty:
            print("Length of new_df", len(new_df))
            if csv_path.exists() and not override:
                # Load existing data asynchronously
                async with aiofiles.open(csv_path, 'r') as f:
                    content = await f.read()
                existing_df = pd.read_csv(pd.io.common.StringIO(content))
                print("Length of existing_df", len(existing_df))
                
                if override:
                    existing_df = existing_df[existing_df['source_file'] != source_file]
                final_df = pd.concat([existing_df, new_df], ignore_index=True)
                
            else:
                final_df = new_df
            
            # Save asynchronously
            csv_content = final_df.to_csv(index=False)
            async with aiofiles.open(csv_path, 'w') as f:
                await f.write(csv_content)
            
            print(f"Results saved to: {csv_path}")
            print(f"Added {len(new_df)} rows for file: {source_file}")
            print(f"Total rows in CSV: {len(final_df)}")
        
        return str(csv_path)
        
    async def save_evaluation_to_json(self, evaluation_results: Dict, source_file: str, json_path: str = None):
        """Save evaluation results to JSON file asynchronously."""
        json_path = json_path or self.default_json_path
        
        new_entry = {
            "source_file": source_file,
            "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            **{k: v for k, v in evaluation_results.items() if k != 'field_accuracies'}
        }
        
        # Ensure directory exists
        json_path_obj = Path(json_path)
        json_path_obj.parent.mkdir(parents=True, exist_ok=True)
        
        # Load existing data or create empty list
        data = []
        if json_path_obj.exists():
            async with aiofiles.open(json_path, 'r') as f:
                content = await f.read()
                try:
                    data = json.loads(content) if content.strip() else []
                except json.JSONDecodeError:
                    data = []
        
        # Check if source_file already exists and replace/append
        existing_index = next((i for i, entry in enumerate(data) if entry.get('source_file') == source_file), None)
        
        if existing_index is not None:
            data[existing_index] = new_entry
            print(f"Updated existing results for {source_file}")
        else:
            data.append(new_entry)
            print(f"Added new results for {source_file}")
        
        # Save asynchronously
        async with aiofiles.open(json_path, 'w') as f:
            await f.write(json.dumps(data, indent=2))
        
        print(f"Results saved to: {json_path}")
        return json_path
    
    async def run_and_save(self, pipeline, markdown_content: str, source_file_path: str, 
                          output_dir: str = None, override: bool = False):
        """Run pipeline and save results asynchronously."""
        prediction = await pipeline.forward(markdown_content)
        extracted_records = prediction if isinstance(prediction, list) else prediction.extracted_records
        
        result_path = await self.save_extracted_results(
            extracted_records, source_file_path, output_dir, override
        )
        return result_path


print("Async Medical File Handler defined successfully")

Async Medical File Handler defined successfully


## 9. Initial Baseline Extraction Test


In [95]:
async def run_async_extraction_and_evaluation(markdown_content: str, source_file: str, 
                                             one_study_records: List[Dict], 
                                             override: bool = False):
    """Run the complete async extraction and evaluation pipeline (using AsyncMedicalFileHandler for all I/O)."""
    print("Running async baseline extraction...")
    print("=" * 50)

    try:
        # Initialize pipeline, evaluator, and *file handler*
        async_pipeline = AsyncMedicalDataExtractionPipeline(max_concurrent=5)
        async_evaluator = AsyncMedicalExtractionEvaluator(use_semantic=True, max_concurrent=10)
        file_handler = AsyncMedicalFileHandler()  # uses your default paths/dirs

        # ---- Extraction ----
        baseline_prediction = await async_pipeline(markdown_content)
        baseline_results = (
            baseline_prediction 
            if isinstance(baseline_prediction, list) 
            else getattr(baseline_prediction, "extracted_records", [])
        )

        print(f"\nBaseline extraction completed. Extracted {len(baseline_results)} records.")
        if baseline_results:
            print("\nFirst extracted record:")
            print("-" * 30)
            for key, value in list(baseline_results[0].items())[:5]:
                print(f"{key}: {value}")
            print("...")

        # ---- Evaluation + Saves ----
        print("\n" + "=" * 50)
        print("RUNNING EVALUATION AND SAVING RESULTS...")
        print("=" * 50)

        # Run evaluation first (so we can include metrics + matches in saves)
        baseline_evaluation = await async_evaluator.evaluate(baseline_results, one_study_records)
        

        # Pull matches if evaluator returns them ([(ext_idx, gt_idx, score), ...]); fall back to empty
        #matches = baseline_evaluation.get("matches", [])
        matches = await async_evaluator.hungarian_matching(baseline_results, one_study_records)
        print(f"DEBUG: Got {len(matches)} matches from evaluation")
        # Launch all saves concurrently via the file handler
        pipeline_save_task = file_handler.save_extracted_results(
            extracted_records=baseline_results,
            source_file_path=source_file,
            output_dir=None,      # default to same dir or file_handler.default_output_dir
            override=override
        )

        json_save_task = file_handler.save_evaluation_to_json(
            evaluation_results=baseline_evaluation,
            source_file=source_file,
            json_path=None        # will use file_handler.default_json_path
        )

        csv_save_task = file_handler.save_evaluation_to_csv(
            baseline_results=baseline_results,
            ground_truth=one_study_records,
            source_file=source_file,
            matches=matches,      # needed for TP/FP/FN marking
            csv_dir=None,         # will use file_handler.default_csv_dir
            override=override
        )

        pipeline_path, json_path, csv_path = await asyncio.gather(
            pipeline_save_task, json_save_task, csv_save_task
        )

        # ---- Print Summary ----
        print("\n" + "=" * 50)
        print("BASELINE EVALUATION RESULTS:")
        print("=" * 50)
        for key, value in baseline_evaluation.items():
            if key != "field_accuracies":
                print(f"{key}: {value}")

        print(f"\nResults saved to:")
        print(f"  - Pipeline results: {pipeline_path}")
        print(f"  - Evaluation JSON: {json_path}")
        print(f"  - Evaluation CSV:  {csv_path}")

        return {
            "baseline_results": baseline_results,
            "baseline_evaluation": baseline_evaluation,
            "files_saved": {
                "pipeline": pipeline_path,
                "json": json_path,
                "csv": csv_path
            }
        }

    except Exception as e:
        print(f"Error in async extraction: {e}")
        traceback.print_exc()
        return {
            "baseline_results": [],
            "baseline_evaluation": {"completeness": 0.0, "precision": 0.0, "recall": 0.0, "f1": 0.0},
            "files_saved": None
        }

result = await run_async_extraction_and_evaluation( markdown_content=markdown_content, source_file=source_file, one_study_records=one_study_records, override=False )

Running async baseline extraction...


Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Malmstrom', 'population_code': '2'}
Found 4 interventions
Processing intervention: placebo
Processing intervention: rofecoxib 50 mg
Processing intervention: celecoxib 200 mg
Processing intervention: ibuprofen 400 mg
Found 7 outcomes for placebo
Found 7 outcomes for ibuprofen 400 mg
Found 7 outcomes for rofecoxib 50 mg
Found 7 outcomes for celecoxib 200 mg
Total records extracted: 28

Baseline extraction completed. Extracted 28 records.

First extracted record:
------------------------------
First_Author: Malmstrom
Population: 2
Intervention_Code: 11
Intervention_Description: placebo
Outcome_Type: 6
...

RUNNING EVALUATION AND SAVING RESULTS...
Added 1 new records to /nlp/data/karthik9/Sprint1/Dental/Data/csvs/extraction_history.csv
[SemanticMatch] Intervention_Description: 'placebo' vs 'Ibuprofen 400 mg' → No → False
Added 1 new records to /nlp/data/karthik9/Sprint1/Dental/Data/csvs/extraction_histo

In [98]:
#Log the history  
log_history()

#Check stats anytime
show_stats()

No new records to add

DSPy History Stats from /nlp/data/karthik9/Sprint1/Dental/Data/csvs/extraction_history.csv:
Total calls: 4104
Unique models: 3
Model breakdown:
  gemini/gemini-2.5-pro: 4016 calls
  openai/gpt-5-mini-2025-08-07: 58 calls
  openai/gpt-4o-mini: 30 calls
Total cost: $88.7986
Average cost per call: $0.0216
Total tokens: 3,020,487
Average tokens per call: 736.0
Cache hit rate: 75.3%
Date range: 2025-09-16T10:33:45.131530 to 2025-09-18T18:49:54.231071


In [20]:
import json
import dspy
from datetime import datetime

def save_complete_history(lm, filename=None):
    if filename is None:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"complete_lm_history_{timestamp}.json"
    
    # Convert history to JSON-serializable format
    complete_data = {
        "metadata": {
            "timestamp": datetime.now().isoformat(),
            "total_calls": len(lm.history),
            "model_info": str(type(lm).__name__),
        },
        "history": []
    }
    
    for i, entry in enumerate(lm.history):
        call_data = {
            "call_number": i + 1,
            "entry_type": str(type(entry).__name__),
            "raw_entry": {}
        }
        
        # Handle different types of entries
        if hasattr(entry, '__dict__'):
            # If it's an object, get all attributes
            for key, value in entry.__dict__.items():
                try:
                    # Try to serialize the value
                    json.dumps(value)
                    call_data["raw_entry"][key] = value
                except (TypeError, ValueError):
                    # If not serializable, convert to string
                    call_data["raw_entry"][key] = str(value)
        elif isinstance(entry, dict):
            # If it's already a dict, copy all keys
            for key, value in entry.items():
                try:
                    json.dumps(value)
                    call_data["raw_entry"][key] = value
                except (TypeError, ValueError):
                    call_data["raw_entry"][key] = str(value)
        else:
            # If it's something else, convert to string
            call_data["raw_entry"]["content"] = str(entry)
        
        complete_data["history"].append(call_data)
    
    # Save to file
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(complete_data, f, indent=2, ensure_ascii=False, default=str)
    
    print(f"Complete history saved to: {filename}")
    return filename

# Usage
save_complete_history(lm)

Complete history saved to: complete_lm_history_20250918_181204.json


'complete_lm_history_20250918_181204.json'

## 10. Example Generation and Few-Shot Learning Setup


In [63]:
import os
import json
from typing import List, Dict
import dspy

def create_dspy_examples(markdown_content: str, one_study_records: List[Dict]) -> List[dspy.Example]:
    """
    Create one dspy.Example per study, with markdown_content as input
    and all structured records as the expected output.
    """
    if not markdown_content or not one_study_records:
        return []
    
    example = dspy.Example(
        markdown_content=markdown_content,
        extracted_records=one_study_records
    ).with_inputs("markdown_content")
    
    return [example]


def create_examples_for_all_studies(md_dir: str, target_file: str) -> List[dspy.Example]:
    """
    Loop through all study _md folders and build DSPy examples.
    
    Args:
        md_dir: directory containing *_md study folders
        target_file: path to dichotomous_outcomes.json
    
    Returns:
        List of dspy.Example objects (one per study)
    """
    # Load global target data once
    with open(target_file, "r") as f:
        target_data = json.load(f)

    all_examples = []
    missing = []

    # Loop over each study folder
    for folder in os.listdir(md_dir):
        if not folder.endswith("_md"):
            continue
        
        study_id = folder.replace("_md", "")  # e.g., "3477_Dolci"
        source_file = os.path.join(md_dir, folder, f"{folder}.json")
        
        if not os.path.exists(source_file):
            print(f"⚠️ Missing source file for {folder}, skipping...")
            missing.append((study_id, "no_json"))
            continue
        
        # Load source JSON and extract markdown
        with open(source_file, "r") as f:
            source_data = json.load(f)
        markdown_content = source_data.get("marker", {}).get("markdown", "")
        
        # Get ground truth records for this study
        one_study_records = [rec for rec in target_data if rec.get("filename") == study_id]
        
        if not one_study_records:
            print(f"⚠️ No ground truth records found for {study_id}, skipping...")
            missing.append((study_id, "no_records"))
            continue
        
        # Build examples for this study
        examples = create_dspy_examples(markdown_content, one_study_records)
        all_examples.extend(examples)

    # Summary
    total_folders = len([f for f in os.listdir(md_dir) if f.endswith("_md")])

    print(f"Total folders found: {total_folders}")
    print(f"Examples created: {len(all_examples)}")
    print(f"Missed studies: {len(missing)}")
    if missing:
        for sid, reason in missing:
            print(f"  - {sid} ({reason})")

    return all_examples


# Usage
md_dir = "/nlp/data/karthik9/Sprint1/Dental/Data/acute_pain_mds"
target_file = "/nlp/data/karthik9/Sprint1/Dental/Data/jsons/dichotomous_outcomes.json"
all_examples = create_examples_for_all_studies(md_dir, target_file)

# Print the first example
print("Raw Example object:")
print(all_examples[0])

print("\nAccessing fields:")
print("Input (markdown_content):", all_examples[0].markdown_content[:50], "...")
print("Expected output (extracted_records):", all_examples[0].extracted_records[1])


⚠️ No ground truth records found for 3287_Cooper, skipping...
Total folders found: 51
Examples created: 50
Missed studies: 1
  - 3287_Cooper (no_records)
Raw Example object:
Example({'markdown_content': '# **Dental Research Journal**\n\n# **Original Article**\n\n# **Comparison of anti‑inflammatory and analgesic effects of\u200a Ginger powder and Ibuprofen in postsurgical pain model: A randomized, double‑blind, case–control clinical trial**\n\n#### **Farshid Rayati1 , Fatemeh Hajmanouchehri2 , Elnaz Najafi<sup>3</sup>**\n\n1 Department of Oral and Maxillofacial Surgery, Dental Caries Prevention Research Center, Qazvin University of Medical Sciences, 2 Department of Clinical and Anatomical Pathology, Qazvin University of Medical Sciences, Qazvin, 3 Department of Orthodontics, School of Dentistry, Islamic Azad University of Isfahan, Isfahan, Iran\n\n#### **ABSTRACT**\n\n**Background:** Ginger has been used as an herbal drug for a long time for the treatment of chronic inflammatory conditi

## 11. DSPy Optimizers Setup and Training


In [None]:
# # Use with DSPy optimizer
# optimizer = BootstrapFewShot(metric=medical_extraction_metric)


# 3. Run the optimization
def run_optimization(examples, test_size=0.3):
    """
    Run the complete optimization process.
    
    Args:
        examples: List of dspy.Example objects from your create_dspy_examples()
        test_size: Fraction to use for testing (rest used for training)
    
    Returns:
        compiled_pipeline: Optimized pipeline
    """
    
    # Split examples into train/test
    split_idx = int(len(examples) * (1 - test_size))
    trainset = examples[:split_idx]
    testset = examples[split_idx:]
    
    print(f"Using {len(trainset)} examples for training")
    print(f"Using {len(testset)} examples for testing")
    
    if len(trainset) == 0:
        print("Warning: No training examples! Using all examples for training.")
        trainset = examples
        testset = examples[:1]  # Use first example for testing
    
    # Create optimizer and pipeline
    optimizer = create_optimizer()
    sync_pipeline = SyncPipelineWrapper()
    
    print("Starting optimization...")
    
    try:
        # This is where the magic happens
        compiled_pipeline = optimizer.compile(
            sync_pipeline,
            trainset=trainset
        )
        compiled_pipeline.save("dspy_optimized_pipeline.json")

        
        print("Optimization completed successfully!")
        
        # Test the compiled pipeline on test set
        if testset:
            print(f"\nTesting on {len(testset)} examples...")
            total_score = 0
            
            for i, example in enumerate(testset):
                try:
                    result = compiled_pipeline.forward(example.markdown_content)
                    score = medical_extraction_metric(example, result)
                    total_score += score
                    print(f"Test {i+1}: Score = {score:.2f}")
                except Exception as e:
                    print(f"Test {i+1}: Error = {e}")
            
            avg_score = total_score / len(testset)
            print(f"\nAverage test score: {avg_score:.2f}")
        
        return compiled_pipeline
        
    except Exception as e:
        print(f"Optimization failed: {e}")
        import traceback
        traceback.print_exc()
        return None

# 4. Quick runner function
def quick_optimize(examples):
    """Simple one-line optimization."""
    return run_optimization(examples)



# Now run optimization:
compiled_pipeline = quick_optimize(examples)

# Use the optimized pipeline:
if compiled_pipeline:
    result = compiled_pipeline(markdown_content)
    print(f"Extracted {len(result.extracted_records)} records")


Using 16 examples for training
Using 8 examples for testing
Starting optimization...




Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 14 outcomes for Paracetamol 500 mg
Found 12 outcomes for Piroxicam 20 mg


2025/09/17 17:45:14 ERROR dspy.teleprompt.bootstrap: Failed to run or to evaluate example Example({'markdown_content': '![](_page_0_Picture_1.jpeg)\n\nPatron: 3477-ADP;\n\nJournal Title: International journal of clinical pharmacology research ISSN: 0251-1649\n\nVolume: 14 Issue: 5 Month/Year: 1994Rages: 185-91\n\nArticle Author: Dolci G, Ripari M, Pacifici L, Umile A\n\n**Article Title:** Evaluation of piroxicam-betacyclodextrin, piroxicam, paracetamol and placebo in post-operative oral\n\nImprint:\n\nCall #:\n\nLocation:\n\nOdyssey\n\nCharge Maxcost: \\$15.00\n\n## Shipping Address:\n\nAmerican Dental Association Department of Library Services 211 East Chicago Avenue Chicago, IL 60611-2637\n\nFax: 312-440-2774 Phone: 312-440-2653 email: library@ada.org EMAIL: LIBRARY@ADA.ORG\n\nNOTES:\n\n# EVALUATION OF PIROXICAM- $\\beta$ -CYCLODEXTRIN, PIROXICAM, PARACETAMOL AND PLACEBO IN POST-OPERATIVE ORAL SURGERY PAIN\n\n### DOLCI G., RIPARI M., PACIFICI L., UMILE A.<sup>1</sup>\n\nOdontoiatric 

Total records extracted: 54
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 14 outcomes for Paracetamol 500 mg
Found 12 outcomes for Piroxicam 20 mg




Total records extracted: 54
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 12 outcomes for Piroxicam 20 mg
Found 14 outcomes for Paracetamol 500 mg




Total records extracted: 54
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 12 outcomes for Piroxicam 20 mg
Found 14 outcomes for Paracetamol 500 mg




Total records extracted: 54
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 12 outcomes for Piroxicam 20 mg
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 14 outcomes for Paracetamol 500 mg


 31%|███▏      | 5/16 [00:18<00:39,  3.63s/it]


Total records extracted: 54
Bootstrapped 4 full traces after 5 examples for up to 1 rounds, amounting to 5 attempts.
Optimization completed successfully!

Testing on 8 examples...
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 12 outcomes for Piroxicam 20 mg
Found 14 outcomes for Paracetamol 500 mg
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 18 outcomes for Placebo




Total records extracted: 54
Test 1: Score = 1.00
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 18 outcomes for Placebo
Found 14 outcomes for Paracetamol 500 mg
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 12 outcomes for Piroxicam 20 mg




Total records extracted: 54
Test 2: Score = 1.00
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 12 outcomes for Piroxicam 20 mg
Found 14 outcomes for Paracetamol 500 mg




Total records extracted: 54
Test 3: Score = 1.00
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 18 outcomes for Placebo
Found 14 outcomes for Paracetamol 500 mg
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 12 outcomes for Piroxicam 20 mg




Total records extracted: 54
Test 4: Score = 1.00
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 14 outcomes for Paracetamol 500 mg
Found 12 outcomes for Piroxicam 20 mg
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam




Total records extracted: 54
Test 5: Score = 1.00
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 12 outcomes for Piroxicam 20 mg
Found 14 outcomes for Paracetamol 500 mg
Found 18 outcomes for Placebo




Total records extracted: 54
Test 6: Score = 1.00




Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 14 outcomes for Paracetamol 500 mg
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 12 outcomes for Piroxicam 20 mg
Found 18 outcomes for Placebo




Total records extracted: 54
Test 7: Score = 1.00
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 12 outcomes for Piroxicam 20 mg
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Found 14 outcomes for Paracetamol 500 mg




Total records extracted: 54
Test 8: Score = 1.00

Average test score: 1.00
Extracting metadata and interventions concurrently...
Study metadata: {'first_author': 'Dolci', 'population_code': '2'}
Found 4 interventions
Processing intervention: Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Processing intervention: Piroxicam 20 mg
Processing intervention: Paracetamol 500 mg
Processing intervention: Placebo
Found 14 outcomes for Paracetamol 500 mg
Found 12 outcomes for Piroxicam 20 mg
Found 18 outcomes for Placebo
Found 10 outcomes for Piroxicam-β-cyclodextrin equivalent to 20 mg of piroxicam
Total records extracted: 54
Extracted 54 records


# Batch Processing


In [None]:
# # Batch processing wrapper for efficient concurrent execution
# class AsyncBatchProcessor:
#     """Handles batch processing of multiple operations with concurrency control."""
    
#     def __init__(self, max_concurrent: int = 5):
#         self.semaphore = asyncio.Semaphore(max_concurrent)
    
#     async def process_interventions_batch(self, intervention_extractor: AsyncInterventionExtractor, 
#                                         markdown_contents: List[str]) -> List[List[Dict[str, Any]]]:
#         """Process multiple markdown contents concurrently for interventions."""
        
#         async def _extract_with_semaphore(content: str):
#             async with self.semaphore:
#                 return await intervention_extractor.forward(content)
        
#         tasks = [_extract_with_semaphore(content) for content in markdown_contents]
#         return await asyncio.gather(*tasks)
    
#     async def process_outcomes_batch(self, outcome_extractor: AsyncOutcomeExtractor,
#                                    markdown_content: str, 
#                                    intervention_descriptions: List[str]) -> List[List[Dict[str, Any]]]:
#         """Process multiple interventions concurrently for outcomes."""
        
#         async def _extract_with_semaphore(intervention_desc: str):
#             async with self.semaphore:
#                 return await outcome_extractor.forward(markdown_content, intervention_desc)
        
#         tasks = [_extract_with_semaphore(desc) for desc in intervention_descriptions]
#         return await asyncio.gather(*tasks)



# class AsyncBatchPipeline:
#     """Batch processor for handling multiple files concurrently."""
    
#     def __init__(self, max_concurrent_files: int = 3, max_concurrent_per_file: int = 5):
#         self.file_semaphore = asyncio.Semaphore(max_concurrent_files)
#         self.max_concurrent_per_file = max_concurrent_per_file
    
#     async def process_single_file(self, file_path: str, markdown_content: str, 
#                                 output_dir: str = None, override: bool = False):
#         """Process a single file with semaphore control."""
#         async with self.file_semaphore:
#             pipeline = AsyncMedicalDataExtractionPipeline(self.max_concurrent_per_file)
#             print(f"Processing file: {Path(file_path).name}")
#             return await pipeline.run_and_save(markdown_content, file_path, output_dir, override)
    
#     async def process_files_batch(self, file_data: List[Dict], output_dir: str = None, 
#                                 override: bool = False):
#         """Process multiple files concurrently.
        
#         Args:
#             file_data: List of dicts with 'file_path' and 'markdown_content' keys
#             output_dir: Output directory for results
#             override: Whether to overwrite existing files
#         """
#         print(f"Starting batch processing of {len(file_data)} files...")
        
#         tasks = [
#             self.process_single_file(
#                 item['file_path'], 
#                 item['markdown_content'], 
#                 output_dir, 
#                 override
#             ) 
#             for item in file_data
#         ]
        
#         results = await asyncio.gather(*tasks, return_exceptions=True)
        
#         # Count successful vs failed
#         successful = sum(1 for r in results if not isinstance(r, Exception))
#         failed = len(results) - successful
        
#         print(f"Batch processing complete: {successful} successful, {failed} failed")
#         return results

    
# class AsyncBatchEvaluator:
#     """Batch evaluator for processing multiple files concurrently."""
    
#     def __init__(self, max_concurrent_files: int = 3, max_concurrent_per_file: int = 10):
#         self.file_semaphore = asyncio.Semaphore(max_concurrent_files)
#         self.evaluator = AsyncMedicalExtractionEvaluator(max_concurrent=max_concurrent_per_file)
    
#     async def evaluate_single_file(self, extracted_records: List[Dict], ground_truth: List[Dict], 
#                                  source_file: str, save_csv: bool = True, csv_dir: str = "evaluation_results"):
#         """Evaluate a single file with semaphore control."""
#         async with self.file_semaphore:
#             print(f"Evaluating file: {source_file}")
            
#             results = await self.evaluator.evaluate(extracted_records, ground_truth)
            
#             if save_csv:
#                 await self.evaluator.save_evaluation_to_csv(
#                     extracted_records, ground_truth, source_file, csv_dir
#                 )
            
#             return {
#                 'source_file': source_file,
#                 'results': results
#             }
    
#     async def evaluate_files_batch(self, file_data: List[Dict], save_csv: bool = True, 
#                                  csv_dir: str = "evaluation_results"):
#         """Evaluate multiple files concurrently.
        
#         Args:
#             file_data: List of dicts with 'extracted_records', 'ground_truth', 'source_file' keys
#         """
#         print(f"Starting batch evaluation of {len(file_data)} files...")
        
#         tasks = [
#             self.evaluate_single_file(
#                 item['extracted_records'],
#                 item['ground_truth'], 
#                 item['source_file'],
#                 save_csv,
#                 csv_dir
#             )
#             for item in file_data
#         ]
        
#         results = await asyncio.gather(*tasks, return_exceptions=True)
        
#         successful = sum(1 for r in results if not isinstance(r, Exception))
#         failed = len(results) - successful
        
#         print(f"Batch evaluation complete: {successful} successful, {failed} failed")
#         return results

# print("Async medical extraction evaluator defined successfully")


# Batch processing function
# async def run_batch_extraction_and_evaluation(file_data: List[Dict], 
#                                              max_concurrent_files: int = 3,
#                                              override: bool = False):
#     """Run batch processing for multiple files.
    
#     Args:
#         file_data: List of dicts with 'markdown_content', 'source_file', 'ground_truth' keys
#         max_concurrent_files: Number of files to process simultaneously
#         override: Whether to overwrite existing files
#     """
    
#     print(f"Starting async batch processing of {len(file_data)} files...")
#     print("=" * 70)
    
#     # Initialize batch processors
#     batch_pipeline = AsyncBatchPipeline(max_concurrent_files, max_concurrent_per_file=5)
#     batch_evaluator = AsyncBatchEvaluator(max_concurrent_files, max_concurrent_per_file=10)
    
#     start_time = datetime.now()
    
#     try:
#         # Prepare pipeline data
#         pipeline_data = [
#             {
#                 'file_path': item['source_file'],
#                 'markdown_content': item['markdown_content']
#             }
#             for item in file_data
#         ]
        
#         # Prepare evaluation data  
#         evaluation_data = [
#             {
#                 'extracted_records': [],  # Will be filled after extraction
#                 'ground_truth': item['ground_truth'],
#                 'source_file': item['source_file']
#             }
#             for item in file_data
#         ]
        
#         # Run extractions first
#         print("Phase 1: Running extractions...")
#         extraction_results = await batch_pipeline.process_files_batch(
#             pipeline_data, output_dir="extraction_results", override=override
#         )
        
#         # Load extracted results for evaluation
#         for i, result in enumerate(extraction_results):
#             if not isinstance(result, Exception) and result:
#                 # Load the extracted records from the result
#                 if hasattr(result, 'extracted_records'):
#                     evaluation_data[i]['extracted_records'] = result.extracted_records
        
#         # Run evaluations
#         print("\nPhase 2: Running evaluations...")
#         evaluation_results = await batch_evaluator.evaluate_files_batch(
#             evaluation_data, save_csv=True, csv_dir="evaluation_results"
#         )
        
#         end_time = datetime.now()
#         processing_time = (end_time - start_time).total_seconds()
        
#         # Summary
#         successful_extractions = sum(1 for r in extraction_results if not isinstance(r, Exception))
#         successful_evaluations = sum(1 for r in evaluation_results if not isinstance(r, Exception))
        
#         print("\n" + "=" * 70)
#         print("BATCH PROCESSING SUMMARY")
#         print("=" * 70)
#         print(f"Total files: {len(file_data)}")
#         print(f"Successful extractions: {successful_extractions}")
#         print(f"Successful evaluations: {successful_evaluations}")
#         print(f"Total processing time: {processing_time:.2f} seconds")
#         print(f"Average time per file: {processing_time/len(file_data):.2f} seconds")
        
#         return {
#             'extraction_results': extraction_results,
#             'evaluation_results': evaluation_results,
#             'summary': {
#                 'total_files': len(file_data),
#                 'successful_extractions': successful_extractions,
#                 'successful_evaluations': successful_evaluations,
#                 'processing_time': processing_time
#             }
#         }
        
#     except Exception as e:
#         print(f"Error in batch processing: {e}")
#         traceback.print_exc()
#         return None



# # Usage example for batch processing:
# """
# batch_data = [
#     {
#         'markdown_content': content1,
#         'source_file': 'study1.json',
#         'ground_truth': gt_records1
#     },
#     {
#         'markdown_content': content2, 
#         'source_file': 'study2.json',
#         'ground_truth': gt_records2
#     }
# ]

# batch_results = await run_batch_extraction_and_evaluation(
#     batch_data, max_concurrent_files=3, override=False
# )
# """

