In [None]:
# LLM Event Results Evaluator - Complete Working Version
# Evaluates LLM event annotation results and integrates with GateNLP

import pandas as pd
import json
from pathlib import Path
from typing import Dict, List, Tuple, Any
from gatenlp import Document
from gatenlp.corpora.memory import ListCorpus
from GatenlpUtils import loadCorpus

class LLMEventResultsEvaluator:
    """
    Evaluates LLM event annotation results against gold standard annotations.
    Provides comprehensive analysis and saves annotated corpus with predictions.
    """
    
    def __init__(self, output_dir: str):
        """Initialize evaluator with output directory containing pipeline results."""
        self.output_dir = Path(output_dir)
        self.corpus = loadCorpus()  # Load the corpus with gold standard annotations
        self.results_df = pd.DataFrame()
        
    def find_result_folders(self) -> List[Path]:
        """Find all result folders (model output directories)."""
        folders = [f for f in self.output_dir.iterdir() 
                  if f.is_dir() and not f.name.startswith('.') and f.name != "annotated_corpus_with_predictions"]
        print(f"Found {len(folders)} result folders: {[f.name for f in folders]}")
        return folders
    
    def find_result_jsons(self, folder: Path) -> List[Path]:
        """Find all JSON result files in a folder."""
        json_files = list(folder.glob("*.json"))
        print(f"  Found {len(json_files)} JSON files in {folder.name}")
        return json_files
    
    def parse_llm_event_predictions(self, json_path: Path) -> List[Dict]:
        """Parse LLM predictions from JSON file."""
        try:
            with open(json_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            predictions = []
            
            # Handle both single response and list of responses
            responses = data if isinstance(data, list) else [data]
            
            for response in responses:
                # Extract events from response
                if 'events' in response:
                    events = response['events']
                elif 'predicted_events' in response:
                    events = response['predicted_events']
                elif isinstance(response, dict) and 'response' in response:
                    # Handle nested response structure
                    inner_response = response['response']
                    if isinstance(inner_response, str):
                        try:
                            inner_response = json.loads(inner_response)
                        except:
                            continue
                    events = inner_response.get('events', [])
                else:
                    continue
                
                for event in events:
                    if isinstance(event, dict):
                        predictions.append({
                            'type': event.get('type', 'Event'),
                            'start': event.get('start', 0),
                            'end': event.get('end', 0),
                            'text': event.get('text', ''),
                            'features': {k: v for k, v in event.items() 
                                       if k not in ['type', 'start', 'end', 'text']}
                        })
            
            return predictions
            
        except Exception as e:
            print(f"Error parsing {json_path}: {e}")
            return []
    
    def process_result_json(self, json_path: Path, model_name: str):
        """Process a result JSON file and add predictions to corresponding document."""
        # Extract document name from JSON filename
        doc_name = json_path.stem
        
        # Find corresponding document in corpus
        target_doc = None
        for doc in self.corpus:
            corpus_doc_name = Path(doc.features.get("gate.SourceURL", "")).stem
            if corpus_doc_name == doc_name:
                target_doc = doc
                break
        
        if not target_doc:
            print(f"Warning: No corpus document found for {doc_name}")
            return
        
        # Parse predictions
        predictions = self.parse_llm_event_predictions(json_path)
        
        if predictions:
            # Create annotation set for this model's predictions
            pred_annset_name = f"{model_name}_predictions"
            pred_annset = target_doc.annset(pred_annset_name)
            pred_annset.clear()  # Clear any existing predictions
            
            # Add predictions as annotations
            for pred in predictions:
                features = dict(pred['features'])
                features['source'] = 'llm_prediction'
                features['model'] = model_name
                
                pred_annset.add(
                    start=pred['start'],
                    end=pred['end'],
                    type=pred['type'],
                    features=features
                )
            
            print(f"  Added {len(predictions)} predictions for {doc_name} ({model_name})")
    
    def calculate_annotation_overlap_metrics(self, gold_annset, pred_annset, annotation_type: str = None) -> Dict[str, float]:
        """Calculate precision, recall, F1 for annotation overlap."""
        
        # Filter by annotation type if specified
        if annotation_type:
            gold_anns = [ann for ann in gold_annset if ann.type == annotation_type]
            pred_anns = [ann for ann in pred_annset if ann.type == annotation_type]
        else:
            gold_anns = list(gold_annset)
            pred_anns = list(pred_annset)
        
        if not gold_anns and not pred_anns:
            return {'precision': 1.0, 'recall': 1.0, 'f1': 1.0, 'gold_count': 0, 'pred_count': 0}
        
        if not pred_anns:
            return {'precision': 0.0, 'recall': 0.0, 'f1': 0.0, 'gold_count': len(gold_anns), 'pred_count': 0}
        
        if not gold_anns:
            return {'precision': 0.0, 'recall': 0.0, 'f1': 0.0, 'gold_count': 0, 'pred_count': len(pred_anns)}
        
        # Calculate overlaps
        matches = 0
        for pred_ann in pred_anns:
            for gold_ann in gold_anns:
                # Check for overlap (any overlap counts as a match)
                if (pred_ann.start < gold_ann.end and pred_ann.end > gold_ann.start):
                    matches += 1
                    break
        
        # Calculate metrics
        precision = matches / len(pred_anns) if pred_anns else 0.0
        recall = matches / len(gold_anns) if gold_anns else 0.0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
        
        return {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'gold_count': len(gold_anns),
            'pred_count': len(pred_anns)
        }
    
    def evaluate_all_predictions(self) -> pd.DataFrame:
        """Evaluate all model predictions against gold standard."""
        results = []
        
        for doc in self.corpus:
            doc_name = Path(doc.features.get("gate.SourceURL", "")).stem
            gold_annset = doc.annset("consensus")  # Gold standard annotation set
            
            # Find all prediction annotation sets
            pred_annsets = [name for name in doc.annset_names() if name.endswith("_predictions")]
            
            for pred_annset_name in pred_annsets:
                model_name = pred_annset_name.replace("_predictions", "")
                pred_annset = doc.annset(pred_annset_name)
                
                # Get all unique annotation types
                all_types = set()
                for ann in gold_annset:
                    all_types.add(ann.type)
                for ann in pred_annset:
                    all_types.add(ann.type)
                
                # Evaluate each annotation type
                for ann_type in all_types:
                    metrics = self.calculate_annotation_overlap_metrics(
                        gold_annset, pred_annset, ann_type
                    )
                    
                    results.append({
                        'document': doc_name,
                        'model': model_name,
                        'annotation_type': ann_type,
                        'precision': metrics['precision'],
                        'recall': metrics['recall'],
                        'f1': metrics['f1'],
                        'gold_count': metrics['gold_count'],
                        'pred_count': metrics['pred_count']
                    })
                
                # Overall evaluation (all types combined)
                overall_metrics = self.calculate_annotation_overlap_metrics(gold_annset, pred_annset)
                results.append({
                    'document': doc_name,
                    'model': model_name,
                    'annotation_type': 'OVERALL',
                    'precision': overall_metrics['precision'],
                    'recall': overall_metrics['recall'],
                    'f1': overall_metrics['f1'],
                    'gold_count': overall_metrics['gold_count'],
                    'pred_count': overall_metrics['pred_count']
                })
        
        self.results_df = pd.DataFrame(results)
        return self.results_df
    
    def save_corpus_with_annotations(self):
        """
        Save the corpus with all model predictions as separate annotation sets.
        Files will be saved in JSON format within the pipeline results directory.
        """
        try:
            # Create output directory WITHIN the pipeline results folder
            output_corpus_dir = self.output_dir / "annotated_corpus_with_predictions"
            output_corpus_dir.mkdir(parents=True, exist_ok=True)
            
            print(f"üìÅ Saving annotated corpus to: {output_corpus_dir}")
            
            saved_files = []
            
            for doc in self.corpus:
                doc_name = Path(doc.features.get("gate.SourceURL", "")).stem
                
                # Check if document has any predictions
                pred_annsets = [name for name in doc.annset_names() if name.endswith("_predictions")]
                
                if pred_annsets:
                    try:
                        # Create permanent annotation sets from predictions
                        for annset_name in pred_annsets:
                            model_name = annset_name.replace("_predictions", "")
                            pred_annset = doc.annset(annset_name)
                            permanent_annset = doc.annset(model_name)
                            permanent_annset.clear()
                            
                            # Copy annotations
                            for ann in pred_annset:
                                features = dict(ann.features)
                                features.pop("source", None)  # Remove metadata
                                features.pop("model", None)
                                permanent_annset.add(ann.start, ann.end, ann.type, features)
                        
                        # Save document in JSON format
                        output_file = output_corpus_dir / f"{doc_name}.json"
                        with open(output_file, 'w', encoding='utf-8') as f:
                            json.dump(doc.to_dict(), f, ensure_ascii=False, indent=2)
                        
                        saved_files.append(doc_name)
                        
                        # Print summary of what was saved
                        annset_summary = []
                        for annset_name in doc.annset_names():
                            if annset_name and not annset_name.endswith("_predictions"):  # Skip temporary sets
                                ann_count = len(doc.annset(annset_name))
                                if ann_count > 0:
                                    annset_summary.append(f"{annset_name}({ann_count})")
                        
                        if annset_summary:
                            print(f"  {doc_name}.json: {', '.join(annset_summary)}")
                    except Exception as e:
                        print(f"  ‚ùå Failed to save {doc_name}: {e}")
            
            print(f"‚úÖ Saved {len(saved_files)} annotated documents")
            
            # Create a summary file
            summary_file = output_corpus_dir / "annotation_summary.txt"
            with open(summary_file, 'w', encoding='utf-8') as f:
                f.write("Annotated Corpus Summary\n")
                f.write("=" * 50 + "\n\n")
                f.write(f"Generated on: {pd.Timestamp.now()}\n")
                f.write(f"Pipeline results folder: {self.output_dir}\n")
                f.write(f"Total documents: {len(saved_files)}\n")
                f.write(f"Format: JSON (GateNLP BDOC format)\n\n")
                
                f.write("Annotation Sets Added:\n")
                f.write("-" * 30 + "\n")
                
                # Get all unique annotation set names across all documents
                all_annsets = set()
                for doc in self.corpus:
                    for annset_name in doc.annset_names():
                        if annset_name and not annset_name.endswith("_predictions"):
                            all_annsets.add(annset_name)
                
                for annset_name in sorted(all_annsets):
                    if annset_name not in ["consensus", ""]:  # Skip gold standard and default
                        f.write(f"- {annset_name} (LLM predictions)\n")
                
                f.write(f"\nFiles saved to: {output_corpus_dir}\n")
                f.write("\nTo view in Gate:\n")
                f.write("1. Open Gate Developer\n")
                f.write("2. Load documents from this directory\n")
                f.write("3. Select JSON/BDOC format when loading\n")
                f.write("4. View different annotation sets in the annotation sets panel\n")
            
            return str(output_corpus_dir)
            
        except Exception as e:
            print(f"‚ùå Error saving corpus: {e}")
            import traceback
            traceback.print_exc()
            return None

print("‚úÖ LLMEventResultsEvaluator class loaded successfully!")

In [None]:
# Visual Analysis and Enhanced Results Display
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd

def create_visual_analysis(results_df: pd.DataFrame):
    """Create comprehensive visual analysis of the evaluation results."""
    
    if results_df.empty:
        print("No results to visualize")
        return
    
    # Set up the plotting style
    plt.style.use('default')
    sns.set_palette("husl")
    
    # Create a large figure with multiple subplots
    fig = plt.figure(figsize=(20, 16))
    
    # 1. Heatmap: F1 scores by Model and Annotation Type
    plt.subplot(3, 3, 1)
    pivot_f1 = results_df.pivot_table(values='f1', index='model', columns='annotation_type', aggfunc='mean')
    sns.heatmap(pivot_f1, annot=True, fmt='.3f', cmap='RdYlGn', center=0.5, 
                cbar_kws={'label': 'F1 Score'})
    plt.title('F1 Scores by Model and Annotation Type')
    plt.xlabel('Annotation Type')
    plt.ylabel('Model')
    
    # 2. Bar plot: Average F1 by Model
    plt.subplot(3, 3, 2)
    model_f1 = results_df.groupby('model')['f1'].mean().sort_values(ascending=True)
    model_f1.plot(kind='barh', color='skyblue')
    plt.title('Average F1 Score by Model')
    plt.xlabel('F1 Score')
    plt.grid(axis='x', alpha=0.3)
    
    # 3. Bar plot: Average F1 by Annotation Type
    plt.subplot(3, 3, 3)
    ann_f1 = results_df.groupby('annotation_type')['f1'].mean().sort_values(ascending=True)
    ann_f1.plot(kind='barh', color='lightcoral')
    plt.title('Average F1 Score by Annotation Type')
    plt.xlabel('F1 Score')
    plt.grid(axis='x', alpha=0.3)
    
    # 4. Precision vs Recall scatter plot
    plt.subplot(3, 3, 4)
    for model in results_df['model'].unique():
        model_data = results_df[results_df['model'] == model]
        plt.scatter(model_data['recall'], model_data['precision'], 
                   label=model, alpha=0.7, s=60)
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision vs Recall by Model')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(alpha=0.3)
    
    # 5. Distribution of F1 scores
    plt.subplot(3, 3, 5)
    results_df.boxplot(column='f1', by='model', ax=plt.gca())
    plt.title('F1 Score Distribution by Model')
    plt.suptitle('')  # Remove the default title
    plt.xticks(rotation=45)
    plt.ylabel('F1 Score')
    
    # 6. Document-level performance heatmap
    plt.subplot(3, 3, 6)
    doc_model_f1 = results_df.groupby(['document', 'model'])['f1'].mean().reset_index()
    if not doc_model_f1.empty:
        doc_model_pivot = doc_model_f1.pivot(index='document', columns='model', values='f1')
        sns.heatmap(doc_model_pivot, annot=True, fmt='.2f', cmap='RdYlGn', center=0.3,
                    cbar_kws={'label': 'F1 Score'})
        plt.title('F1 Scores by Document and Model')
        plt.xlabel('Model')
        plt.ylabel('Document')
    
    # 7. Gold vs Predicted annotations count
    plt.subplot(3, 3, 7)
    total_gold = results_df.groupby('model')['gold_count'].sum()
    total_pred = results_df.groupby('model')['pred_count'].sum()
    
    x = np.arange(len(total_gold))
    width = 0.35
    
    plt.bar(x - width/2, total_gold.values, width, label='Gold Standard', color='gold', alpha=0.8)
    plt.bar(x + width/2, total_pred.values, width, label='Predicted', color='steelblue', alpha=0.8)
    
    plt.xlabel('Model')
    plt.ylabel('Total Annotations')
    plt.title('Gold vs Predicted Annotation Counts')
    plt.xticks(x, total_gold.index, rotation=45)
    plt.legend()
    plt.grid(axis='y', alpha=0.3)
    
    # 8. Annotation type performance comparison
    plt.subplot(3, 3, 8)
    ann_metrics = results_df.groupby('annotation_type')[['precision', 'recall', 'f1']].mean()
    ann_metrics.plot(kind='bar', ax=plt.gca(), color=['lightblue', 'lightgreen', 'lightcoral'])
    plt.title('Average Metrics by Annotation Type')
    plt.ylabel('Score')
    plt.xticks(rotation=45)
    plt.legend()
    plt.grid(axis='y', alpha=0.3)
    
    # 9. Model performance ranking
    plt.subplot(3, 3, 9)
    model_ranking = results_df.groupby('model')[['precision', 'recall', 'f1']].mean().sort_values('f1', ascending=False)
    model_ranking.plot(kind='bar', ax=plt.gca(), color=['lightblue', 'lightgreen', 'lightcoral'])
    plt.title('Model Performance Ranking')
    plt.ylabel('Score')
    plt.xticks(rotation=45)
    plt.legend()
    plt.grid(axis='y', alpha=0.3)
    
    plt.tight_layout()
    plt.show()

def create_detailed_results_table(results_df: pd.DataFrame):
    """Create a detailed results table with better formatting."""
    
    if results_df.empty:
        print("No results to display")
        return
    
    print("\n" + "="*120)
    print("DETAILED EVALUATION RESULTS")
    print("="*120)
    
    # Summary by model
    print("\nüìä SUMMARY BY MODEL:")
    print("-" * 80)
    model_summary = results_df.groupby('model').agg({
        'precision': ['mean', 'std'],
        'recall': ['mean', 'std'],
        'f1': ['mean', 'std'],
        'gold_count': 'sum',
        'pred_count': 'sum'
    }).round(3)
    
    # Flatten column names
    model_summary.columns = ['_'.join(col).strip() for col in model_summary.columns.values]
    print(model_summary.to_string())
    
    # Summary by annotation type
    print("\n\nüìã SUMMARY BY ANNOTATION TYPE:")
    print("-" * 80)
    ann_summary = results_df.groupby('annotation_type').agg({
        'precision': ['mean', 'std'],
        'recall': ['mean', 'std'], 
        'f1': ['mean', 'std'],
        'gold_count': 'sum',
        'pred_count': 'sum'
    }).round(3)
    
    ann_summary.columns = ['_'.join(col).strip() for col in ann_summary.columns.values]
    print(ann_summary.to_string())
    
    # Best and worst performers
    print("\n\nüèÜ TOP PERFORMERS:")
    print("-" * 50)
    top_performers = results_df.nlargest(10, 'f1')[['model', 'annotation_type', 'document', 'f1', 'precision', 'recall']]
    print(top_performers.to_string(index=False))
    
    print("\n\n‚ö†Ô∏è  LOWEST PERFORMERS:")
    print("-" * 50)
    low_performers = results_df.nsmallest(10, 'f1')[['model', 'annotation_type', 'document', 'f1', 'precision', 'recall']]
    print(low_performers.to_string(index=False))
    
    # Document-level analysis
    print("\n\nüìÑ DOCUMENT-LEVEL ANALYSIS:")
    print("-" * 50)
    doc_analysis = results_df.groupby('document').agg({
        'f1': ['mean', 'std', 'min', 'max'],
        'model': 'count'
    }).round(3)
    doc_analysis.columns = ['_'.join(col).strip() for col in doc_analysis.columns.values]
    print(doc_analysis.to_string())
    
    # Final summary
    print("\n\nüéØ OVERALL SUMMARY:")
    print("-" * 30)
    print(f"Total evaluations performed: {len(results_df)}")
    print(f"Documents analyzed: {results_df['document'].nunique()}")
    print(f"Models compared: {results_df['model'].nunique()}")
    print(f"Annotation types evaluated: {results_df['annotation_type'].nunique()}")
    print(f"Average F1 score: {results_df['f1'].mean():.3f}")
    print(f"Best performing model: {results_df.groupby('model')['f1'].mean().idxmax()}")
    print(f"Best annotation type: {results_df.groupby('annotation_type')['f1'].mean().idxmax()}")

print("‚úÖ Visual analysis and results display functions loaded successfully!")

‚úÖ Visual analysis and results display functions loaded successfully!
These functions will be called automatically by the main execution pipeline.


In [None]:
# Utility Functions for Manual Operations and Testing
from pathlib import Path

def run_evaluation_on_folder(output_dir: str = "output/pipeline_results_20250718_085458"):
    """Run evaluation on a specific pipeline results folder."""
    
    print(f"? Running evaluation on: {output_dir}")
    print("=" * 60)
    
    try:
        # Initialize evaluator
        evaluator = LLMEventResultsEvaluator(output_dir)
        
        # Find and process all result folders
        result_folders = evaluator.find_result_folders()
        
        if not result_folders:
            print("‚ùå No result folders found!")
            # Try fallback to parent directory
            parent_dir = Path(output_dir).parent
            print(f"üîç Checking parent directory: {parent_dir}")
            
            model_folders = []
            for folder in parent_dir.iterdir():
                if folder.is_dir() and any(folder.glob("*.json")) and folder.name != "annotated_corpus_with_predictions":
                    model_folders.append(folder)
            
            if model_folders:
                print(f"Found {len(model_folders)} model folders in parent directory")
                evaluator.output_dir = parent_dir
                result_folders = model_folders
            else:
                print("‚ùå No model folders found!")
                return None
        
        # Process all result JSON files
        print(f"\nüìä Processing prediction results from {len(result_folders)} folders...")
        for folder in result_folders:
            result_jsons = evaluator.find_result_jsons(folder)
            for json_path in result_jsons:
                evaluator.process_result_json(json_path, folder.name)
        
        # Check if we have predictions
        docs_with_predictions = 0
        for doc in evaluator.corpus:
            pred_annsets = [name for name in doc.annset_names() if name.endswith("_predictions")]
            if pred_annsets:
                docs_with_predictions += 1
        
        print(f"Documents with predictions: {docs_with_predictions}")
        
        if docs_with_predictions == 0:
            print("‚ùå No predictions found in corpus!")
            return None
        
        # Run evaluation
        print("\nüîç Evaluating predictions against gold standard...")
        results_df = evaluator.evaluate_all_predictions()
        
        if results_df.empty:
            print("‚ùå No evaluation results generated!")
            return None
        
        # Save results
        results_file = Path(output_dir) / "evaluation_results.csv"
        results_df.to_csv(results_file, index=False)
        print(f"üíæ Results saved to: {results_file}")
        
        # Save annotated corpus
        print("\nüìÅ Saving annotated corpus...")
        corpus_path = evaluator.save_corpus_with_annotations()
        
        if corpus_path:
            print(f"‚úÖ Annotated corpus saved to: {corpus_path}")
        
        # Generate visualizations
        print("\nüìà Generating visual analysis...")
        create_visual_analysis(results_df)
        
        print("\nüìã Generating detailed results...")
        create_detailed_results_table(results_df)
        
        return evaluator, results_df
        
    except Exception as e:
        print(f"‚ùå Evaluation failed: {e}")
        import traceback
        traceback.print_exc()
        return None

def check_corpus_status(output_dir: str = "output/pipeline_results_20250718_085458"):
    """Check the status of annotations in the corpus."""
    
    print("üîç Checking corpus annotation status")
    print("-" * 40)
    
    evaluator = LLMEventResultsEvaluator(output_dir)
    
    docs_with_preds = 0
    total_docs = len(evaluator.corpus)
    
    for doc in evaluator.corpus:
        doc_name = Path(doc.features.get("gate.SourceURL", "")).stem
        pred_annsets = [name for name in doc.annset_names() if name.endswith("_predictions")]
        
        if pred_annsets:
            docs_with_preds += 1
            print(f"üìã {doc_name}: {len(pred_annsets)} prediction sets")
            for annset_name in pred_annsets:
                ann_count = len(doc.annset(annset_name))
                print(f"    - {annset_name}: {ann_count} annotations")
    
    print(f"\nüìä Summary:")
    print(f"   Total documents: {total_docs}")
    print(f"   Documents with predictions: {docs_with_preds}")
    print(f"   Coverage: {docs_with_preds/total_docs*100:.1f}%")
    
    return docs_with_preds, total_docs

def list_available_results(base_dir: str = "output"):
    """List all available pipeline result directories."""
    
    print("üìÅ Available pipeline result directories:")
    print("-" * 50)
    
    base_path = Path(base_dir)
    pipeline_dirs = [d for d in base_path.iterdir() 
                    if d.is_dir() and d.name.startswith("pipeline_results_")]
    
    for i, directory in enumerate(sorted(pipeline_dirs), 1):
        print(f"{i}. {directory.name}")
        
        # Check if it has result folders
        result_folders = [f for f in directory.iterdir() 
                         if f.is_dir() and not f.name.startswith('.') and f.name != "annotated_corpus_with_predictions"]
        if result_folders:
            print(f"   Models: {[f.name for f in result_folders[:3]]}{'...' if len(result_folders) > 3 else ''}")
        
        # Check if annotated corpus exists
        corpus_dir = directory / "annotated_corpus_with_predictions"
        if corpus_dir.exists():
            corpus_files = list(corpus_dir.glob("*.json"))
            print(f"   Annotated corpus: ‚úÖ ({len(corpus_files)} documents)")
        else:
            print(f"   Annotated corpus: ‚ùå (not created)")
    
    return pipeline_dirs

print("üîß Utility functions loaded:")
print("   - run_evaluation_on_folder(): Run complete evaluation")
print("   - check_corpus_status(): Check annotation status")
print("   - list_available_results(): List available directories")

üîß Utility functions loaded:
   - manual_corpus_save(): Force save corpus with predictions
   - check_corpus_status(): Check annotation status
   - list_pipeline_results(): List available pipeline directories

Use these for debugging or manual operations if needed.


In [None]:
# Main Execution with GateNLP Integration and Corpus Viewer
from gatenlp.visualization.corpusviewer import CorpusViewer

def enhanced_main(output_dir: str = "output/pipeline_results_20250718_085458"):
    """Enhanced main function with comprehensive analysis and GateNLP integration."""
    
    print("üöÄ Starting Enhanced LLM Event Evaluation")
    print("=" * 60)
    
    # Run the evaluation
    result = run_evaluation_on_folder(output_dir)
    
    if result:
        evaluator, results_df = result
        
        print("\n?Ô∏è Opening GateNLP Corpus Viewer...")
        try:
            # Create and display corpus viewer
            viewer = CorpusViewer(evaluator.corpus)
            viewer.show()
            print("‚úÖ Corpus viewer opened successfully!")
            print("   - Use the viewer to explore documents and annotations")
            print("   - Switch between annotation sets to see different model predictions")
            print("   - Compare predictions with gold standard (consensus)")
        except Exception as e:
            print(f"‚ö†Ô∏è Corpus viewer failed to open: {e}")
        
        # Summary statistics
        print(f"\n? Final Summary:")
        print(f"   Total evaluations: {len(results_df)}")
        print(f"   Documents processed: {results_df['document'].nunique()}")
        print(f"   Models evaluated: {results_df['model'].nunique()}")
        print(f"   Average F1 score: {results_df['f1'].mean():.3f}")
        print(f"   Best performing model: {results_df.groupby('model')['f1'].mean().idxmax()}")
        
        print(f"\nüéØ OUTPUTS GENERATED:")
        print(f"üìä Evaluation results: {evaluator.output_dir}/evaluation_results.csv")
        print(f"üìÅ Annotated corpus: {evaluator.output_dir}/annotated_corpus_with_predictions/")
        print(f"   ‚Üí Open these JSON files in Gate to view model predictions!")
        
        return evaluator, results_df
    else:
        print("‚ùå Evaluation failed. Check if result folders and JSON files exist.")
        return None

# Test with existing predictions in corpus
def test_existing_predictions():
    """Test evaluation using any existing predictions in the corpus."""
    
    print("üß™ Testing with existing corpus predictions")
    print("-" * 50)
    
    try:
        evaluator = LLMEventResultsEvaluator("output/pipeline_results_20250718_085458")
        
        # Check for existing predictions
        docs_with_preds = 0
        total_pred_sets = 0
        
        for doc in evaluator.corpus:
            doc_name = Path(doc.features.get("gate.SourceURL", "")).stem
            pred_annsets = [name for name in doc.annset_names() if name.endswith("_predictions")]
            
            if pred_annsets:
                docs_with_preds += 1
                total_pred_sets += len(pred_annsets)
                print(f"üìã {doc_name}: {pred_annsets}")
        
        print(f"\nFound {total_pred_sets} prediction sets in {docs_with_preds} documents")
        
        if total_pred_sets > 0:
            # Run evaluation on existing predictions
            results_df = evaluator.evaluate_all_predictions()
            
            if not results_df.empty:
                print(f"‚úÖ Generated {len(results_df)} evaluation results")
                
                # Quick summary
                print("\nüìä Quick Summary:")
                model_summary = results_df.groupby('model')['f1'].mean().sort_values(ascending=False)
                for model, f1 in model_summary.items():
                    print(f"  {model}: F1 = {f1:.3f}")
                
                # Generate visualizations
                create_visual_analysis(results_df)
                create_detailed_results_table(results_df)
                
                # Show corpus viewer
                print("\nüëÅÔ∏è Opening corpus viewer...")
                viewer = CorpusViewer(evaluator.corpus)
                viewer.show()
                
                return evaluator, results_df
            else:
                print("‚ùå No evaluation results generated")
        else:
            print("‚ùå No prediction sets found in corpus")
            
    except Exception as e:
        print(f"‚ùå Test failed: {e}")
        import traceback
        traceback.print_exc()
    
    return None

# Auto-run: Try existing predictions first, then full pipeline
print("üöÄ Auto-starting evaluation...")

# First, check what we have
list_available_results()

# Try existing predictions
result = test_existing_predictions()

if result:
    evaluator, results_df = result
    print(f"\n‚úÖ Evaluation completed using existing predictions!")
else:
    print("\n‚ö†Ô∏è No existing predictions found, running full pipeline...")
    result = enhanced_main()
    
    if result:
        evaluator, results_df = result
        print("\n‚úÖ Full evaluation pipeline completed!")
    else:
        print("\n‚ùå Both existing predictions test and full pipeline failed.")
        print("Please check your data and try manual evaluation with:")
        print("   run_evaluation_on_folder('your_output_directory')")

print("\nüéØ Evaluation complete! Use the corpus viewer to explore results.")

üöÄ Starting evaluation...
üß™ Testing with existing corpus predictions
--------------------------------------------------
Loaded input/updated/annotated\dev\CASE OF ALTAY v. TURKEY (No. 2).xml into corpus
Loaded input/updated/annotated\dev\CASE OF BELYAYEV AND OTHERS v. UKRAINE.xml into corpus
Loaded input/updated/annotated\dev\CASE OF BIGUN v. UKRAINE.xml into corpus
Loaded input/updated/annotated\test\CASE OF CABUCAK v. GERMANY.xml into corpus
Loaded input/updated/annotated\test\CASE OF CAN v. TURKEY.xml into corpus
Loaded input/updated/annotated\test\CASE OF CRISTIAN CATALIN UNGUREANU v. ROMANIA.xml into corpus
Loaded input/updated/annotated\train\CASE OF DOKTOROV v. BULGARIA.xml into corpus
Loaded input/updated/annotated\train\CASE OF EGILL EINARSSON v. ICELAND (No. 2).xml into corpus
Loaded input/updated/annotated\train\CASE OF HOINESS v. NORWAY.xml into corpus
Loaded input/updated/annotated\train\CASE OF KOSAITE - CYPIENE AND OTHERS v. LITHUANIA.xml into corpus
Loaded input/up