# DeltaBench Annotation Correction Tool

This notebook provides an interactive interface for manually correcting human annotations in the DeltaBench dataset. The corrected annotations are saved to a new clean dataset.

## Workflow:
1. Load original dataset and existing clean dataset (if any)
2. Sample a random math problem not in clean dataset
3. Display question and sections with current annotations
4. Interactively correct annotations section by section
5. Save corrected data to `deltabench_clean.jsonl`
6. Repeat

In [None]:
# Setup
import sys
import os
import json
import random
import pandas as pd
from typing import Dict, List, Optional, Any
from datetime import datetime
import textwrap

sys.path.append('src')
from src import DeltaBenchDataset

# Set random seed for reproducibility
random.seed(42)

print("DeltaBench Annotation Correction Tool loaded!")

In [None]:
class AnnotationCorrector:
    def __init__(self, original_dataset_path: str, clean_dataset_path: str = "data/deltabench_clean.jsonl"):
        self.original_dataset_path = original_dataset_path
        self.clean_dataset_path = clean_dataset_path
        
        # Load original dataset
        self.dataset = DeltaBenchDataset()
        self.original_data = self.dataset.load_jsonl(original_dataset_path)
        
        # Load or initialize clean dataset
        self.clean_data = self.load_clean_dataset()
        self.processed_ids = set(ex['id'] for ex in self.clean_data)
        
        # Filter for math problems only
        self.math_problems = [ex for ex in self.original_data if ex.get('task_l1') == 'math']
        self.available_problems = [ex for ex in self.math_problems if ex['id'] not in self.processed_ids]
        
        print(f"Loaded {len(self.original_data)} total examples")
        print(f"Found {len(self.math_problems)} math problems")
        print(f"Already processed: {len(self.processed_ids)}")
        print(f"Available for correction: {len(self.available_problems)}")
    
    def load_clean_dataset(self) -> List[Dict]:
        """Load existing clean dataset or create empty list."""
        if os.path.exists(self.clean_dataset_path):
            clean_data = []
            with open(self.clean_dataset_path, 'r') as f:
                for line in f:
                    clean_data.append(json.loads(line.strip()))
            print(f"Loaded {len(clean_data)} examples from existing clean dataset")
            return clean_data
        else:
            print("No existing clean dataset found. Starting fresh.")
            return []
    
    def save_clean_dataset(self):
        """Save clean dataset to file."""
        os.makedirs(os.path.dirname(self.clean_dataset_path), exist_ok=True)
        with open(self.clean_dataset_path, 'w') as f:
            for example in self.clean_data:
                f.write(json.dumps(example) + '\n')
        print(f"Saved {len(self.clean_data)} examples to {self.clean_dataset_path}")
    
    def get_random_problem(self) -> Optional[Dict]:
        """Get a random math problem not yet in clean dataset."""
        if not self.available_problems:
            print("No more problems available for correction!")
            return None
        
        problem = random.choice(self.available_problems)
        self.available_problems.remove(problem)
        return problem
    
    def return_problem_to_pool(self, problem: Dict):
        """Return a problem to the available pool (used when quitting mid-correction)."""
        self.available_problems.append(problem)
    
    def display_problem(self, example: Dict):
        """Display problem details with complete solution (no truncation)."""
        print("=" * 100)
        print(f"PROBLEM ID: {example['id']}")
        print(f"Task: {example.get('task_l1', 'N/A')} / {example.get('task_l2', 'N/A')}")
        print(f"Origin: {example.get('origin', 'N/A')}")
        print("=" * 100)
        
        # Display question
        print("\nQUESTION:")
        print("-" * 50)
        print(example.get('question', ''))
        print()
        
        # Display correct answer
        print("CORRECT ANSWER:")
        print("-" * 50)
        print(f"Answer: {example.get('answer', 'N/A')}")
        print()
        
        # Display complete correct solution (NO TRUNCATION)
        solution = example.get('solution', '')
        if solution:
            print("CORRECT SOLUTION:")
            print("-" * 50)
            print(solution)
            print()
        
        # Display provided answer from reasoning
        print("PROVIDED ANSWER (from reasoning):")
        print("-" * 50)
        print(f"Final Correct: {example.get('final_correct', 'N/A')} (0=incorrect reasoning, 1=correct)")
        print()
        
        # Display current error info
        error_sections = example.get('reason_error_section_numbers', [])
        unuseful_sections = example.get('reason_unuseful_section_numbers', [])
        print("CURRENT ANNOTATIONS:")
        print("-" * 50)
        print(f"Error sections: {error_sections}")
        print(f"Unuseful sections: {unuseful_sections}")
        print()
    
    def display_section(self, section_num: int, content: str, annotation: Dict):
        """Display a single section with its annotation (no solution display here)."""
        print(f"\n{'='*60}")
        print(f"SECTION {section_num}")
        print(f"{'='*60}")
        
        # Display section content (NO TRUNCATION)
        print("\nSECTION CONTENT:")
        print("-" * 30)
        print(content)
        
        # Display current annotations
        print("\nCURRENT ANNOTATIONS:")
        print("-" * 30)
        print(f"Reasoning Correctness: {annotation.get('reasoning_correctness', 'N/A')} (0=correct, 1=error)")
        print(f"Reasoning Usefulness: {annotation.get('reasoning_usefulness', 'N/A')} (0=unuseful, 1=useful)")
        print(f"Error Step: {annotation.get('error_step', 'N/A')}")
        print(f"Explanation: {annotation.get('explanation', 'N/A')}")
        print(f"Correction: {annotation.get('correction', 'N/A')}")
        print()
    
    def get_user_input(self, prompt: str, current_value: Any = None, field_type: str = "text") -> Any:
        """Get user input with current value display and skip option."""
        if current_value is not None and current_value != "":
            print(f"Current value: {current_value}")
        
        print(f"{prompt}")
        print("(Press ENTER to keep current value, type 'skip' to skip this section, type 'quit' to exit session)")
        
        user_input = input("> ").strip()
        
        if user_input.lower() == 'quit':
            return 'QUIT_SESSION'
        elif user_input.lower() == 'skip':
            return 'SKIP_SECTION'
        elif user_input == "":
            return current_value
        else:
            if field_type == "int":
                try:
                    return int(user_input)
                except ValueError:
                    print("Invalid integer. Keeping current value.")
                    return current_value
            elif field_type == "binary":
                if user_input in ['0', '1']:
                    return user_input
                else:
                    print("Invalid binary value (must be 0 or 1). Keeping current value.")
                    return current_value
            else:
                return user_input
    
    def correct_section_annotation(self, section_num: int, content: str, annotation: Dict) -> Dict:
        """Interactively correct a section's annotation."""
        self.display_section(section_num, content, annotation)
        
        # Ask if user wants to modify this section
        print("\nModify this section's annotations? (y/n/skip/quit)")
        print("- y: Modify annotations")
        print("- n: Keep current annotations")
        print("- skip: Skip this section")
        print("- quit: Exit correction session")
        
        modify = input("> ").strip().lower()
        
        if modify == 'quit':
            return 'QUIT_SESSION'
        elif modify == 'skip':
            return 'SKIP_SECTION'
        elif modify != 'y':
            return annotation  # Keep current annotation
        
        # Collect new annotations
        new_annotation = annotation.copy()
        
        print("\n" + "="*40)
        print("CORRECTING ANNOTATIONS")
        print("="*40)
        
        # Reasoning correctness
        correctness = self.get_user_input(
            "Reasoning Correctness (0=correct, 1=error):", 
            annotation.get('reasoning_correctness', ''),
            "binary"
        )
        if correctness == 'QUIT_SESSION':
            return 'QUIT_SESSION'
        elif correctness == 'SKIP_SECTION':
            return 'SKIP_SECTION'
        new_annotation['reasoning_correctness'] = correctness
        
        # Reasoning usefulness
        usefulness = self.get_user_input(
            "Reasoning Usefulness (0=unuseful, 1=useful):", 
            annotation.get('reasoning_usefulness', ''),
            "binary"
        )
        if usefulness == 'QUIT_SESSION':
            return 'QUIT_SESSION'
        elif usefulness == 'SKIP_SECTION':
            return 'SKIP_SECTION'
        new_annotation['reasoning_usefulness'] = usefulness
        
        # Error step (if it's an error)
        if correctness == '1':
            error_step = self.get_user_input(
                "Error Step (number):", 
                annotation.get('error_step', ''),
                "int"
            )
            if error_step == 'QUIT_SESSION':
                return 'QUIT_SESSION'
            elif error_step == 'SKIP_SECTION':
                return 'SKIP_SECTION'
            new_annotation['error_step'] = error_step
        
        # Explanation
        explanation = self.get_user_input(
            "Explanation:", 
            annotation.get('explanation', '')
        )
        if explanation == 'QUIT_SESSION':
            return 'QUIT_SESSION'
        elif explanation == 'SKIP_SECTION':
            return 'SKIP_SECTION'
        new_annotation['explanation'] = explanation
        
        # Correction
        correction = self.get_user_input(
            "Correction:", 
            annotation.get('correction', '')
        )
        if correction == 'QUIT_SESSION':
            return 'QUIT_SESSION'
        elif correction == 'SKIP_SECTION':
            return 'SKIP_SECTION'
        new_annotation['correction'] = correction
        
        return new_annotation
    
    def correct_example(self, example: Dict) -> Optional[Dict]:
        """Correct all annotations for an example."""
        self.display_problem(example)
        
        # Ask if user wants to process this example
        print("="*80)
        print("READY TO ANALYZE SECTIONS")
        print("="*80)
        print("Review the complete solution above, then proceed to analyze each section.")
        print("You can compare each section's reasoning against the correct solution you just read.")
        print()
        
        process = input("Process this example? (y/n/quit): ").strip().lower()
        
        if process == 'quit':
            return 'QUIT_SESSION'
        elif process != 'y':
            return None  # Skip this example
        
        # Create corrected copy
        corrected_example = example.copy()
        
        # Get sections
        sections_content = example.get('sections_content', '') or example.get('section_content', '')
        if not sections_content:
            print("No sections content found. Skipping example.")
            return None
        
        sections = self.dataset.parse_sections(sections_content)
        
        # Get current annotations
        sections_labeled_info = example.get('sections_labeled_info', [])
        annotations_dict = {info['section_number']: info for info in sections_labeled_info}
        
        # Correct each section
        new_annotations = []
        new_error_sections = []
        new_unuseful_sections = []
        has_changes = False  # Track if any changes were made
        
        for section_num, content in sections:
            if section_num in annotations_dict:
                current_annotation = annotations_dict[section_num]
                
                corrected_annotation = self.correct_section_annotation(
                    section_num, content, current_annotation
                )
                
                if corrected_annotation == 'QUIT_SESSION':
                    return 'QUIT_SESSION'
                elif corrected_annotation == 'SKIP_SECTION':
                    print("Skipping this section...")
                    new_annotations.append(current_annotation)
                    # Check if original annotation had errors/unuseful sections
                    if current_annotation.get('reasoning_correctness') == '1':
                        new_error_sections.append(section_num)
                    if current_annotation.get('reasoning_usefulness') == '0':
                        new_unuseful_sections.append(section_num)
                else:
                    new_annotations.append(corrected_annotation)
                    
                    # Check if changes were made
                    if corrected_annotation != current_annotation:
                        has_changes = True
                    
                    # Update error/unuseful section lists
                    if corrected_annotation.get('reasoning_correctness') == '1':
                        new_error_sections.append(section_num)
                    if corrected_annotation.get('reasoning_usefulness') == '0':
                        new_unuseful_sections.append(section_num)
            else:
                # No existing annotation - create empty one
                print(f"No annotation found for section {section_num}. Creating new one.")
                empty_annotation = {
                    'section_number': section_num,
                    'reasoning_correctness': '',
                    'reasoning_usefulness': '',
                    'error_step': '',
                    'explanation': '',
                    'correction': ''
                }
                corrected_annotation = self.correct_section_annotation(
                    section_num, content, empty_annotation
                )
                
                if corrected_annotation == 'QUIT_SESSION':
                    return 'QUIT_SESSION'
                elif corrected_annotation != 'SKIP_SECTION':
                    new_annotations.append(corrected_annotation)
                    has_changes = True
        
        # Update the example
        corrected_example['sections_labeled_info'] = new_annotations
        corrected_example['reason_error_section_numbers'] = new_error_sections
        corrected_example['reason_unuseful_section_numbers'] = new_unuseful_sections
        
        # Add correction metadata
        corrected_example['correction_metadata'] = {
            'corrected_by': 'manual',
            'correction_date': datetime.now().isoformat(),
            'original_error_sections': example.get('reason_error_section_numbers', []),
            'original_unuseful_sections': example.get('reason_unuseful_section_numbers', []),
            'has_changes': has_changes
        }
        
        return corrected_example
    
    def run_correction_session(self):
        """Run interactive correction session."""
        print("\n" + "="*80)
        print("STARTING ANNOTATION CORRECTION SESSION")
        print("="*80)
        print("Instructions:")
        print("- First, you'll see the complete problem, answer, and solution")
        print("- Then you'll go through each section to correct annotations")
        print("- For each section, you can modify annotations or press ENTER to keep current")
        print("- Type 'skip' to skip a section or entire example")
        print("- Type 'quit' to exit the session completely (partially corrected examples won't be saved)")
        print("- If you skip through all sections, the original annotations are kept (this is fine)")
        print("- Progress is saved after each fully processed example")
        print("- NO TRUNCATION: All content is shown in full for proper evaluation")
        print("="*80)
        
        while True:
            # Get next problem
            problem = self.get_random_problem()
            if problem is None:
                break
            
            # Correct the problem
            result = self.correct_example(problem)
            
            if result == 'QUIT_SESSION':
                print("\n" + "="*50)
                print("QUITTING CORRECTION SESSION")
                print("="*50)
                print("- Current problem was not fully processed and will remain available for future sessions")
                print("- Any previously completed problems have been saved")
                # Return the problem to the pool since it wasn't fully processed
                self.return_problem_to_pool(problem)
                break
            elif result is not None:
                # Save corrected example (either with changes or original annotations if all skipped)
                self.clean_data.append(result)
                self.processed_ids.add(result['id'])
                self.save_clean_dataset()
                
                change_status = "with changes" if result['correction_metadata']['has_changes'] else "original annotations kept"
                print(f"\n✓ Example {result['id']} processed ({change_status}) and saved!")
                print(f"Progress: {len(self.processed_ids)} examples processed")
                print(f"Remaining: {len(self.available_problems)} examples")
            else:
                print("\nExample skipped.")
        
        print("\nCorrection session complete!")
        print(f"Total processed examples: {len(self.clean_data)}")

# Initialize the corrector
corrector = AnnotationCorrector('data/Deltabench_v1.jsonl')
print("\nAnnotation Corrector initialized and ready!")

## Usage Instructions

### Quick Start
Run the cell below to start the interactive correction session:

```python
corrector.run_correction_session()
```

### Interface Commands
- **ENTER**: Keep current annotation value
- **'skip'**: Skip current section/example
- **'quit'**: End correction session
- **Direct input**: Enter new value for annotation

### Annotation Fields
- **Reasoning Correctness**: 0 (correct) or 1 (error)
- **Reasoning Usefulness**: 0 (unuseful) or 1 (useful)
- **Error Step**: Step number where error occurs (if reasoning_correctness=1)
- **Explanation**: Description of the error/issue
- **Correction**: How to fix the error

### Progress Tracking
- Progress is automatically saved after each example
- Clean dataset is stored in `data/deltabench_clean.jsonl`
- Already processed examples are skipped automatically

## Utility Functions

Additional helper functions for managing the correction process:

In [None]:
# Check progress
def check_progress():
    print(f"Original dataset: {len(corrector.original_data)} examples")
    print(f"Math problems: {len(corrector.math_problems)} examples")
    print(f"Already corrected: {len(corrector.processed_ids)} examples")
    print(f"Available for correction: {len(corrector.available_problems)} examples")
    
    if len(corrector.processed_ids) > 0:
        completion_rate = len(corrector.processed_ids) / len(corrector.math_problems) * 100
        print(f"Completion rate: {completion_rate:.1f}%")

# Preview a random problem without correcting
def preview_problem():
    problem = corrector.get_random_problem()
    if problem:
        corrector.display_problem(problem)
        # Add it back to available problems
        corrector.available_problems.append(problem)
    else:
        print("No problems available for preview.")

# Load a specific problem by ID
def load_problem_by_id(problem_id: str):
    for problem in corrector.original_data:
        if problem['id'] == problem_id:
            corrector.display_problem(problem)
            return problem
    print(f"Problem {problem_id} not found.")
    return None

# Show statistics about current clean dataset
def show_clean_dataset_stats():
    if not corrector.clean_data:
        print("No clean dataset examples yet.")
        return
    
    print(f"Clean dataset: {len(corrector.clean_data)} examples")
    
    # Count corrections made
    total_corrections = 0
    for ex in corrector.clean_data:
        if 'correction_metadata' in ex:
            orig_errors = set(ex['correction_metadata'].get('original_error_sections', []))
            new_errors = set(ex.get('reason_error_section_numbers', []))
            if orig_errors != new_errors:
                total_corrections += 1
    
    print(f"Examples with corrections: {total_corrections}")
    print(f"Examples unchanged: {len(corrector.clean_data) - total_corrections}")

print("Utility functions loaded!")
print("Available functions: check_progress(), preview_problem(), load_problem_by_id(), show_clean_dataset_stats()")

In [None]:
# Check initial progress
check_progress()

In [None]:
# Start the correction session
# Uncomment and run when ready:
corrector.run_correction_session()