# Sample Questions Demo - Decoding Biology Hackathon

This notebook demonstrates how to process the sample questions from the hackathon and generate answers in the required format.

## Prerequisites

1. Start the vLLM Docker container:
   ```bash
   ./start_vllm_docker.sh
   ```

2. Make sure you have the sample questions file (`hackathon-train.json`) in the current directory


In [None]:
import json
import logging
import re
from pathlib import Path
from typing import List, Dict, Any
from openai import OpenAI
from transformers import AutoTokenizer
from tqdm import tqdm

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

#7Configuration for batch processing
BATCH_SIZE = 256  # Number of questions per batch. Increasing this will speed up the processing at the cost of using more memory
MAX_TOKENS_PER_QUESTION = 2_000  # Max tokens per individual question
TEMPERATURE = 0.7  # Model temp

## Initialize the OpenAI Client

Connect to the vLLM server running in Docker.


In [None]:
# Initialize the OpenAI client to connect to vLLM server
client = OpenAI(
    api_key="EMPTY",  # vLLM doesn't require authentication
    base_url="http://localhost:8000/v1", #vLLM server URL, make sure you have the correct port
)

# Test the connection
try:
    models = list(client.models.list())
    if models:
        print("vLLM server is up and running!")
        print(f"Available models: {[model.id for model in models]}")
        model_name = models[0].id
    else:
        raise Exception("No models available")
except Exception as e:
    print(f"Error connecting to vLLM server: {e}")
    print("Make sure the server is running by executing `./start_vllm_docker.sh`")
    raise e 


## Helper Functions

Define utility functions for loading and processing questions.


In [None]:
# Cache for tokenizers to avoid reloading them
_tokenizer_cache = {}

def get_tokenizer(model: str) -> AutoTokenizer:
    if model not in _tokenizer_cache:
        _tokenizer_cache[model] = AutoTokenizer.from_pretrained(model)
    return _tokenizer_cache[model]

def load_questions(filename) -> List[Dict[str, Any]]:
    """Load questions from a JSON file."""
    with open(filename, 'r') as f:
        if filename.endswith('.jsonl'):
            questions = [json.loads(line.strip()) for line in f if line.strip()]
        else:
            questions = json.load(f)
    
    print(f"Loaded {len(questions)} questions from {filename}")
    return questions

def create_prompts(questions: List[Dict[str, Any]], model_name:str) -> List[str]:
    """
    Create prompts for a batch of questions using AutoTokenizer.
    
    Args:
        questions: List of question data
    
    Returns:
        List of formatted prompts
    """
    tokenizer = get_tokenizer(model_name)
    prompts = []
    
    for question_data in questions:
        question = question_data['question']
        options = question_data.get('options', {})
        
        # Parse options if they're a string
        if isinstance(options, str):
            try:
                options = json.loads(options)
            except:
                options = {}
        
        # Create the prompt with dynamic options
        options_text = ""
        for key, value in options.items():
            options_text += f"{key}: {value}\n"
        
        # Determine the valid options for the system message
        valid_options = list(options.keys())
        options_list = ", ".join(valid_options)
        
        messages = [
            {"role": "system", "content": f"You are a biology expert. Answer the following multiple choice questions by selecting the correct option ({options_list}) and providing a brief explanation. Always format your answer as <answer>[letter]</answer>."},
            {"role": "user", "content": f"""Question: {question}

Options:
{options_text}
Please provide your answer as a single letter ({options_list}) followed by a brief explanation.
Format your answer as: <answer>[letter]</answer>

Answer:"""}
        ]
        
        text = tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
            enable_thinking=True # Switches between thinking and non-thinking modes. Default is True.
        )
        
        prompts.append(text)
    
    return prompts

def extract_answer_from_response(response_text: str, question_data: Dict[str, Any]) -> str:
    """
    Extract the answer letter (A, B, C, D, etc.) from the model response.
    
    Args:
        response_text: The raw response from the model
        question_data: The question data containing options
    
    Returns:
        The answer letter (A, B, C, D, etc.) or 'X' if not found
    """
    # Clean the response text
    response_text = response_text.strip().upper()
    
    # Get available options from the question data
    options = question_data.get('options', {})
    if isinstance(options, str):
        try:
            options = json.loads(options)
        except:
            options = {}
    
    # Extract valid option letters (A, B, C, D, etc.)
    valid_options = list(options.keys()) if isinstance(options, dict) else ['A', 'B', 'C', 'D', 'E']
    valid_pattern = '|'.join(valid_options)
    
    # Look for patterns like "A", "B", "Answer: A", "The answer is B", etc.
    patterns = [
        rf'<answer>([{valid_pattern}])</answer>',  # Look for the required format first, then fallback patterns
        rf'[aA]nswer[\s:]*([{valid_pattern}])',
        rf'\b([{valid_pattern}])\b',
        rf'option[\s:]*([{valid_pattern}])',
        rf'choice[\s:]*([{valid_pattern}])',
    ]
    
    for pattern in patterns:
        match = re.search(pattern, response_text, re.IGNORECASE)
        if match:
            return match.group(1).upper()
    
    # If no clear answer found, try to match against the actual options
    if isinstance(options, dict):
        for key, value in options.items():
            if value.lower() in response_text.lower():
                return key.upper()
    
    # Default to 'X' if no answer found
    return 'X'

def make_batches(questions: List[Dict[str, Any]], batch_size: int = 8) -> List[List[Dict[str, Any]]]:
    """
    Split questions into batches for processing.
    
    Args:
        questions: List of question data
        batch_size: Number of questions per batch
    
    Returns:
        List of batches, each containing a list of questions
    """
    batches = []
    for i in range(0, len(questions), batch_size):
        batch = questions[i:i + batch_size]
        batches.append(batch)
    return batches

def generate_completions(questions, model_name, output_filename="answers.jsonl") -> List[Dict[str, Any]]:
    """Generate completions for a list of questions."""
    batches = make_batches(questions, BATCH_SIZE)
    print(f"Processing {len(questions)} questions in {len(batches)} batches...")
    
    results = []
    for batch_idx, batch in tqdm(enumerate(batches), total=len(batches)):
        print(f"\nProcessing batch {batch_idx + 1}/{len(batches)} ({len(batch)} questions)...")
        
        try:
            prompts = create_prompts(batch, model_name)
            response = client.completions.create(
                model=model_name,
                prompt=prompts,
                max_tokens=MAX_TOKENS_PER_QUESTION,
                temperature=TEMPERATURE
            )
            
            for i, (question_data, choice) in enumerate(zip(batch, response.choices)):
                response_text = choice.text
                answer_letter = extract_answer_from_response(response_text, question_data)
                
                result = {
                    **question_data,
                    'raw_response': response_text,
                    'answer_letter': answer_letter
                }
                results.append(result)
                
        except Exception as e:
            print(f"Error processing batch {batch_idx + 1}: {e}")
            for question_data in batch:
                result = {
                    **question_data,
                    'raw_response': f'Error: {str(e)}',
                    'answer_letter': 'X'
                }
                results.append(result)
    
    # Save results
    with open(output_filename, 'w') as f:
        for result in results:
            f.write(json.dumps(result) + '\n')
    
    print(f"Results saved to {output_filename}")
    return results


## Load and Analyze Questions


In [None]:
# Load sample questions using the function
questions = load_questions("hackathon-train.jsonl")

# Display sample question structure
print(f"First question: {questions[0]['question'][:100]}...")
print("\nSample question structure:")
print(json.dumps(questions[0], indent=2))

# Analyze question types and option counts
question_types = {}
option_counts = {}
for q in questions:
    q_type = q.get('question_type', 'unknown')
    question_types[q_type] = question_types.get(q_type, 0) + 1
    
    # Parse options to count them
    options = q.get('options', {})
    if isinstance(options, str):
        try:
            options = json.loads(options)
        except:
            options = {}
    
    if isinstance(options, dict):
        option_count = len(options)
        option_counts[option_count] = option_counts.get(option_count, 0) + 1

print(f"\nQuestion types: {question_types}")
print(f"Option counts: {option_counts}")


## Load and Process Questions


In [None]:
print(f"Batch processing configuration:")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Max tokens per question: {MAX_TOKENS_PER_QUESTION}")
print(f"  Temperature: {TEMPERATURE}")
print(f"  Total questions: {len(questions)}")
print(f"  Number of batches: {(len(questions) + BATCH_SIZE - 1) // BATCH_SIZE}")

# Generate completions
results = generate_completions(questions, model_name, "sample_answers.jsonl")


## Statistics (Optional)

Calculate statistics about the generated answers.


In [None]:
# Calculate statistics
total_questions = len(results)
valid_answers = sum(1 for r in results if r['answer_letter'] in ['A', 'B', 'C', 'D', 'E'])
correct_answers = sum(1 for r in results if r['answer_letter'] == r['answer'])

print("\n=== SUMMARY STATISTICS ===")
print(f"Total questions processed: {total_questions}")
print(f"Valid answers (A, B, C, D, E): {valid_answers} ({valid_answers/total_questions*100:.1f}%)")
print(f"Correct answers: {correct_answers} ({correct_answers/total_questions*100:.1f}%)")
print(f"Accuracy (of valid answers): {correct_answers/valid_answers*100:.1f}%" if valid_answers > 0 else "No valid answers")

# Count by question type
question_types = {}
for result in results:
    q_type = result.get('question_type', 'unknown')
    if q_type not in question_types:
        question_types[q_type] = {'total': 0, 'correct': 0}
    question_types[q_type]['total'] += 1
    if result['answer_letter'] == result['answer']:
        question_types[q_type]['correct'] += 1

print("\n=== BY QUESTION TYPE ===")
for q_type, stats in question_types.items():
    accuracy = stats['correct'] / stats['total'] * 100 if stats['total'] > 0 else 0
    print(f"{q_type}: {stats['correct']}/{stats['total']} ({accuracy:.1f}%)")


## Summary

In [None]:
# Example: Process test questions
test_file = "hackathon-test.jsonl"
if Path(test_file).exists():
    test_questions = load_questions(test_file)
    test_results = generate_completions(test_questions, model_name, "test_answers.jsonl")
    print("✅ Test questions processed!")
else:
    print(f"ℹ️  No {test_file} found in directory")

## Upload Results

Upload your generated answers to the leaderboard:

```bash
# Upload test answers (if generated)
python upload_answers.py test_answers.jsonl --team-name "test_team" --tag "qwen3_8b_no_tooling"
```

## Customize Your Approach

To test your own methods, simply replace the `generate_completions` function with your own logic:

```python
def my_custom_method(questions, output_filename="my_answers.jsonl"):
    # Your custom logic here
    results = []
    for question in questions:
        # Process question with your method
        answer = your_method(question)
        results.append({
            **question,
            'answer_letter': answer
        })
    
    # Save results
    with open(output_filename, 'w') as f:
        for result in results:
            f.write(json.dumps(result) + '\n')
    
    return results
```
