In [3]:
%pip install groq


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
from openai import OpenAI
import textgrad as tg
from textgrad.engine.local_model_openai_api import ChatExternalClient
from groq import Groq
import dspy
from dspy.primitives.program import Module
import logging
from datetime import datetime, timedelta
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge_score import rouge_scorer
import yake
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer
from tqdm import tqdm
import time
import itertools
from pydantic import BaseModel, Field
import threading
from dspy.predict import ChainOfThought

dspy_config_lock = threading.Lock()
dspy_configured = threading.Event()

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

key_lock = threading.Lock()

# Rate limiting constants
RATE_LIMIT = 30  # requests per minute per API key
DELAY_BETWEEN_REQUESTS = 60 / RATE_LIMIT  # seconds between requests
BATCH_SIZE = 25  # Slightly less than rate limit to account for overhead
API_KEYS = [
    "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",
    "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",
    "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",
    "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"
]
api_key_cycle = itertools.cycle(API_KEYS)
key_lock = Lock()

def ensure_nltk_data():
    """Ensure all required NLTK data is downloaded"""
    try:
        nltk.data.find('tokenizers/punkt')
    except LookupError:
        nltk.download('punkt')


def initialize_dspy(api_key):
    """Initialize DSPy configuration once"""
    global dspy_configured
    with dspy_config_lock:
        if not dspy_configured.is_set():
            try:
                dspy.settings.configure(lm=dspy.OpenAI(
                    model="llama-3.1-70b-versatile",
                    api_key=api_key,
                    api_base="https://api.groq.com/openai/v1"
                ))
                dspy_configured.set()
                logging.info("DSPy configured successfully")
            except Exception as e:
                logging.error(f"Failed to configure DSPy: {str(e)}")
                raise

class Input(BaseModel):
    context: str = Field(description="The context for the question")
    query: str = Field(description="The question to be answered")

class Output(BaseModel):
    answer: str = Field(description="The answer for the question")
    confidence: float = Field(ge=0, le=1, description="The confidence score for the answer")

class MedicalResponse(dspy.Signature):
    """Generate accurate medical responses with reasoning."""
    input: Input = dspy.InputField()
    output: Output = dspy.OutputField()

class GroqPredictor(dspy.TypedPredictor):
    """Custom predictor for Groq API"""
    def __init__(self, api_key):
        self.client = Groq(api_key=api_key)
        
    def forward(self, prompt):
        try:
            chat_completion = self.client.chat.completions.create(
                messages=[{"role": "user", "content": prompt}],
                model="llama-3.1-70b-versatile",
                temperature=0.7,
                max_tokens=400
            )
            return chat_completion.choices[0].message.content
        except Exception as e:
            logging.error(f"Groq API call failed: {e}")
            return None

def get_gpt_response(client, prompt, retry_count=3):
    """Get GPT response with rate limiting and retries"""
    system_prompt = """You are a medical expert assistant. Provide accurate, clear, and well-structured medical advice.
Focus on: accuracy, clear explanation, practical advice, and professional yet accessible language.
Please limit your response to a maximum of 1500 characters."""
    
    for attempt in range(retry_count):
        try:
            time.sleep(DELAY_BETWEEN_REQUESTS)
            response = client.chat.completions.create(
                model="llama-3.1-70b-versatile",
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.7,
                max_tokens=400
            )
            return response.choices[0].message.content
        except Exception as e:
            logging.warning(f"GPT attempt {attempt + 1} failed: {e}")
            if attempt < retry_count - 1:
                time.sleep(5)
    return None

def get_textgrad_response(client, prompt, reference, retry_count=3):
    """Get TextGrad optimized response"""
    try:
        engine = ChatExternalClient(client=client, model_string='llama-3.1-70b-versatile')
        tg.set_backward_engine(engine, override=True)

        loss_system_prompt = tg.Variable(
            """Evaluate the medical response based on the following criteria:
            - Accuracy: Ensure all information is factually correct and evidence-based.
            - Clarity: The response should be clear, concise, and easy to understand.
            - Completeness: Address all aspects of the medical query comprehensively.
            - Practicality: Provide actionable and practical advice that can be implemented.
            - Professionalism: Maintain a professional tone and uphold medical ethical standards.
            - Relevance: Ensure all information provided is directly related to the query.
            - Consistency: Maintain consistency in terminology and presentation throughout the response.
            """,
            requires_grad=False,
            role_description="medical evaluation system"
        )

        solution = tg.Variable(reference, requires_grad=True, role_description="medical response")
        loss_fn = tg.TextLoss(loss_system_prompt)
        optimizer = tg.TGD([solution])
        loss = loss_fn(solution)
        loss.backward()
        optimizer.step()
        
        return solution.value
    except Exception as e:
        logging.error(f"TextGrad optimization failed: {e}")
        return None

def get_dspy_response(api_key, input_text):
    """Get response using DSPy framework"""
    try:
        # Ensure DSPy is initialized
        initialize_dspy(api_key)
        
        # Define the signature
        class MedicalResponse(dspy.Signature):
            input = dspy.InputField(desc="medical query or symptom description")
            response = dspy.OutputField(desc="detailed medical advice")

            def __init__(self):
                super().__init__()

        # Create program instance with proper error handling
        try:
            program = ChainOfThought(MedicalResponse)
        except Exception as e:
            logging.error(f"Failed to create ChainOfThought program: {str(e)}")
            return {'response': None}

        # Generate response with timeout
        try:
            pred = program(input=str(input_text))
            
            if hasattr(pred, 'response') and pred.response:
                response_text = str(pred.response).strip()
                if response_text:
                    logging.info("Successfully generated DSPy response")
                    return {'response': response_text}
            
            logging.warning("DSPy generated empty response")
            return {'response': None}
            
        except Exception as e:
            logging.error(f"Failed to generate DSPy response: {str(e)}")
            return {'response': None}
            
    except Exception as e:
        logging.error(f"DSPy processing failed: {str(e)}")
        logging.error(f"Input text was: {str(input_text)[:100]}...")
        return {'response': None}


def calculate_metrics(reference, candidate):
    """Calculate various similarity metrics between reference and candidate texts"""
    metrics = {}
    
    if not reference or not candidate:
        return {
            'content_similarity': 0,
            'word_overlap': 0,
            'bleu_score': 0,
            'rouge1': 0,
            'rouge2': 0,
            'rougeL': 0,
            'keyword_overlap': 0
        }
    
    try:
        # Content Similarity
        vectorizer = TfidfVectorizer()
        tfidf_matrix = vectorizer.fit_transform([reference, candidate])
        metrics['content_similarity'] = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
        
        # Word Overlap
        ref_words = set(reference.lower().split())
        cand_words = set(candidate.lower().split())
        metrics['word_overlap'] = len(ref_words.intersection(cand_words)) / len(ref_words) if ref_words else 0
        
        # BLEU Score
        smoothie = SmoothingFunction().method4
        reference_tokens = nltk.word_tokenize(reference.lower())
        candidate_tokens = nltk.word_tokenize(candidate.lower())
        metrics['bleu_score'] = sentence_bleu([reference_tokens], candidate_tokens, smoothing_function=smoothie)
        
        # ROUGE Scores
        scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
        rouge_scores = scorer.score(reference, candidate)
        metrics['rouge1'] = rouge_scores['rouge1'].fmeasure
        metrics['rouge2'] = rouge_scores['rouge2'].fmeasure
        metrics['rougeL'] = rouge_scores['rougeL'].fmeasure
        
        # Keyword Matching
        kw_extractor = yake.KeywordExtractor()
        ref_keywords = {kw[0] for kw in kw_extractor.extract_keywords(reference)}
        cand_keywords = {kw[0] for kw in kw_extractor.extract_keywords(candidate)}
        metrics['keyword_overlap'] = len(ref_keywords.intersection(cand_keywords)) / len(ref_keywords) if ref_keywords else 0
        
    except Exception as e:
        logging.error(f"Metrics calculation failed: {e}")
        metrics = {k: 0 for k in ['content_similarity', 'word_overlap', 'bleu_score', 'rouge1', 'rouge2', 'rougeL', 'keyword_overlap']}
    
    return metrics

def limit_response(response, limit=1500):
    """Truncate the response to the specified character limit"""
    return response[:limit] if response and len(response) > limit else response


def process_sample(row, start_time):
    """Process a single sample without threading for DSPy"""
    try:
        with key_lock:
            api_key = next(api_key_cycle)
        
        # Initialize clients
        groq_client = OpenAI(base_url="https://api.groq.com/openai/v1", api_key=api_key)
        
        input_text = str(row['input']) if pd.notna(row['input']) else ""
        output_text = str(row['output']) if pd.notna(row['output']) else ""
        
        # Get GPT and TextGrad responses (these are thread-safe)
        gpt_response = None
        textgrad_response = None
        
        try:
            gpt_response = get_gpt_response(groq_client, input_text)
        except Exception as e:
            logging.warning(f"GPT response failed: {str(e)}")
            
        try:
            textgrad_response = get_textgrad_response(groq_client, input_text, output_text)
        except Exception as e:
            logging.warning(f"TextGrad response failed: {str(e)}")
        
        result = {
            'index': int(row.name) if isinstance(row.name, (int, float)) else 0,
            'input': input_text,
            'reference_output': output_text,
            'responses': {
                'gpt': str(gpt_response) if gpt_response is not None else None,
                'textgrad': str(textgrad_response) if textgrad_response is not None else None,
                'dspy': None  # We'll fill this in later
            }
        }
        
        logging.info(f"Successfully processed GPT and TextGrad for sample {row.name}")
        return result
        
    except Exception as e:
        logging.error(f"Error processing sample {row.name}: {str(e)}")
        return None
        
    except Exception as e:
        logging.error(f"Error processing sample {row.name}: {str(e)}")
        return None
    
def estimate_completion_time(total_samples):
    """Calculate estimated completion time based on rate limits"""
    # Each sample needs 3 API calls (GPT, TextGrad, and DSPy)
    total_requests = total_samples * 3
    
    # Calculate total minutes needed
    total_minutes = total_requests / RATE_LIMIT
    
    # Add 20% buffer for overhead and potential retries
    total_minutes *= 1.2
    
    return timedelta(minutes=total_minutes)

def main():
    print("Loading dataset...")
    try:
        df = pd.read_parquet('/workspaces/codespaces-jupyter/data/train-00000-of-00001-5e7cb295b9cff0bf.parquet').head(40)
    except Exception as e:
        logging.error(f"Failed to load dataset: {e}")
        return

    total_samples = len(df)
    start_time = datetime.now()
    
    # Process GPT and TextGrad responses with threading
    results = []
    try:
        with ThreadPoolExecutor(max_workers=4) as executor:
            future_to_index = {executor.submit(process_sample, row, start_time): idx 
                             for idx, row in df.iterrows()}
            
            for future in tqdm(as_completed(future_to_index), total=total_samples):
                result = future.result()
                if result:
                    # Initialize metrics dict for each result
                    result['metrics'] = {
                        'gpt': calculate_metrics(result['reference_output'], result['responses']['gpt']),
                        'textgrad': calculate_metrics(result['reference_output'], result['responses']['textgrad']),
                        'dspy': None
                    }
                    results.append(result)
    except Exception as e:
        logging.error(f"Error in threaded processing: {str(e)}")

    # Now process DSPy responses sequentially
    if results:
        try:
            # Configure DSPy with the new LM client
            with key_lock:
                api_key = next(api_key_cycle)
            
            # Use the new DSPy LM client
            lm = dspy.LM(
                model="llama-3.1-70b-versatile",
                api_key=api_key,
                api_base="https://api.groq.com/openai/v1/",
                temperature=0.7,
                max_tokens=2000,
                top_p=0.95,
                presence_penalty=0.1,
                frequency_penalty=0.1
            )
            dspy.settings.configure(lm=lm)
            
            # Define the DSPy program
            class MedicalResponse(dspy.Signature):
                """Generate comprehensive medical advice for given symptoms."""
                input = dspy.InputField(desc="Patient's symptoms and concerns")
                response = dspy.OutputField(desc="""
                Provide a concise yet comprehensive medical response that includes:
                1. Direct assessment of the condition
                2. Expected duration and course
                3. Clear guidance on when to seek immediate medical attention
                4. Specific treatment recommendations
                5. Home care instructions
                6. Preventive measures
                
                Response should:
                - Start with a greeting and acknowledgment
                - Use clear, authoritative medical language
                - Provide specific durations (e.g., "5-7 days")
                - Include medication recommendations when appropriate
                - Mention specific symptoms that warrant urgent care
                - End with a clear conclusion
                Must maintain professional medical tone and be at least 1500 characters.""")

            class MedicalAdvisor(dspy.Module):
                def __init__(self):
                    super().__init__()
                    self.chain = dspy.ChainOfThought(MedicalResponse)
                
                def forward(self, input_text):
                    enhanced_prompt = f"""As an experienced pediatrician, provide a professional medical response.

                    Patient Description: {input_text}

                    Format your response as follows:

                    1. Greeting
                    "Thank you for consulting Chat Doctor."

                    2. Initial Assessment
                    - State the likely condition
                    - Mention expected duration
                    - Specify if immediate medical attention is needed

                    3. Key Points
                    - Mention specific symptoms that would require immediate medical attention
                    - List what parents should monitor
                    - Provide clear guidance on medication use/avoiding unnecessary medications

                    4. Treatment Recommendations
                    - Specific medications or supplements if needed
                    - Dosage guidelines if applicable
                    - Home care instructions

                    5. Follow-up Care
                    - When to seek medical attention
                    - What symptoms would warrant immediate care
                    - Prevention measures

                    Remember to:
                    - Be direct and authoritative
                    - Use medical terminology with explanations
                    - Provide specific timeframes
                    - Include clear yes/no guidance on antibiotics
                    - Mention specific supplements if appropriate
                    - End with a clear conclusion

                    Keep response professional and detailed while maintaining an accessible tone."""
        
                    
                    prediction = self.chain(input=enhanced_prompt)
                    response = prediction.response
                    
                    if len(response) < 1500:
                        enhanced_prompt += "\n\nPlease provide more detailed information, including:\n"
                        enhanced_prompt += "- Specific symptom monitoring guidelines\n"
                        enhanced_prompt += "- Detailed home care instructions\n"
                        enhanced_prompt += "- Common complications to watch for\n"
                        enhanced_prompt += "- Long-term management strategies\n"
                        prediction = self.chain(input=enhanced_prompt)
                        response = prediction.response
                    
                    return response

            # Process each sample sequentially
            for result in results:
                try:
                    advisor = MedicalAdvisor()
                    response = advisor(result['input'])
                    if response and len(response) >= 1500:
                        result['responses']['dspy'] = str(response)
                        result['metrics']['dspy'] = calculate_metrics(
                            result['reference_output'], 
                            result['responses']['dspy']
                        )
                        logging.info(f"Successfully added DSPy response ({len(response)} chars) for sample {result['index']}")
                    else:
                        logging.warning(f"DSPy response too short ({len(response) if response else 0} chars) for sample {result['index']}")
                        result['responses']['dspy'] = None
                        result['metrics']['dspy'] = calculate_metrics("", None)
                except Exception as e:
                    logging.error(f"Failed to get DSPy response for sample {result['index']}: {str(e)}")
                    result['responses']['dspy'] = None
                    result['metrics']['dspy'] = calculate_metrics("", None)
                    
        except Exception as e:
            logging.error(f"Failed to configure DSPy: {str(e)}")
    
    # Save results
    try:
        final_results = {
            'metadata': {
                'total_samples_processed': len(results),
                'start_time': start_time.isoformat(),
                'end_time': datetime.now().isoformat(),
                'actual_duration': str(datetime.now() - start_time),
            },
            'results': results
        }
        
        with open('final_results.json', 'w', encoding='utf-8') as f:
            json.dump(final_results, f, indent=2, ensure_ascii=False, default=str)
        logging.info("Results saved to 'final_results.json'")
    except Exception as e:
        logging.error(f"Failed to save results: {str(e)}")

if __name__ == "__main__":
    main()

Loading dataset...


  0%|          | 0/40 [00:00<?, ?it/s]2024-11-17 15:14:30,955 - INFO - HTTP Request: POST https://api.groq.com/openai/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-17 15:14:30,965 - INFO - LLMCall function forward
2024-11-17 15:14:30,966 - INFO - _backward_through_llm prompt
2024-11-17 15:14:30,967 - INFO - _backward_through_llm gradient
2024-11-17 15:14:30,967 - INFO - TextualGradientDescent prompt for update
2024-11-17 15:14:30,968 - INFO - TextualGradientDescent optimizer response
2024-11-17 15:14:30,969 - INFO - TextualGradientDescent updated text
2024-11-17 15:14:30,969 - INFO - Successfully processed GPT and TextGrad for sample 1
2024-11-17 15:14:30,975 - INFO - Using default tokenizer.
2024-11-17 15:14:31,038 - INFO - Using default tokenizer.
  2%|▎         | 1/40 [00:03<02:13,  3.41s/it]2024-11-17 15:14:31,100 - INFO - HTTP Request: POST https://api.groq.com/openai/v1/chat/completions "HTTP/1.1 200 OK"
2024-11-17 15:14:31,110 - INFO - LLMCall function forward
2024-11-17 15:14:3