# Lab 20: Error Handling and Data Validation

## Overview

In this lab, we'll explore the error handling and data validation techniques used in Bonsai v3 to ensure robust performance even with imperfect input data. Effective error handling is critical for genetic genealogy applications, where data quality can vary significantly.

In [None]:
# Standard imports
import os
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
from IPython.display import display, HTML, Markdown
import inspect
import importlib
import copy
import random
import logging
import traceback
import json
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Set, Optional, Union, Any, Callable

sys.path.append(os.path.dirname(os.getcwd()))

# Cross-compatibility setup
from scripts_support.lab_cross_compatibility import setup_environment, is_jupyterlite, save_results, save_plot

# Set up environment-specific paths
DATA_DIR, RESULTS_DIR = setup_environment()

# Set visualization styles
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_context("notebook")

In [ ]:
# Setup Bonsai module paths
if not is_jupyterlite():
    # In local environment, add the utils directory to system path
    utils_dir = os.getenv('PROJECT_UTILS_DIR', os.path.join(os.path.dirname(DATA_DIR), 'utils'))
    bonsaitree_dir = os.path.join(utils_dir, 'bonsaitree')
    
    # Add to path if it exists and isn't already there
    if os.path.exists(bonsaitree_dir) and bonsaitree_dir not in sys.path:
        sys.path.append(bonsaitree_dir)
        print(f"Added {bonsaitree_dir} to sys.path")
else:
    # In JupyterLite, use a simplified approach
    print("⚠️ Running in JupyterLite: Some Bonsai functionality may be limited.")
    print("This notebook is primarily designed for local execution where the Bonsai codebase is available.")

In [ ]:
# Helper functions for exploring modules
def display_module_classes(module_name):
    """Display classes and their docstrings from a module"""
    try:
        # Import the module
        module = importlib.import_module(module_name)
        
        # Find all classes
        classes = inspect.getmembers(module, inspect.isclass)
        
        # Filter classes defined in this module (not imported)
        classes = [(name, cls) for name, cls in classes if cls.__module__ == module_name]
        
        # Print info for each class
        for name, cls in classes:
            print(f"\n## {name}")
            
            # Get docstring
            doc = inspect.getdoc(cls)
            if doc:
                print(f"Docstring: {doc}")
            else:
                print("No docstring available")
            
            # Get methods
            methods = inspect.getmembers(cls, inspect.isfunction)
            if methods:
                print("\nMethods:")
                for method_name, method in methods:
                    if not method_name.startswith('_'):  # Skip private methods
                        print(f"- {method_name}")
    except ImportError as e:
        print(f"Error importing module {module_name}: {e}")
    except Exception as e:
        print(f"Error processing module {module_name}: {e}")

def display_module_functions(module_name):
    """Display functions and their docstrings from a module"""
    try:
        # Import the module
        module = importlib.import_module(module_name)
        
        # Find all functions
        functions = inspect.getmembers(module, inspect.isfunction)
        
        # Filter functions defined in this module (not imported)
        functions = [(name, func) for name, func in functions if func.__module__ == module_name]
        
        # Print info for each function
        for name, func in functions:
            if name.startswith('_'):  # Skip private functions
                continue
                
            print(f"\n## {name}")
            
            # Get signature
            sig = inspect.signature(func)
            print(f"Signature: {name}{sig}")
            
            # Get docstring
            doc = inspect.getdoc(func)
            if doc:
                print(f"Docstring: {doc}")
            else:
                print("No docstring available")
    except ImportError as e:
        print(f"Error importing module {module_name}: {e}")
    except Exception as e:
        print(f"Error processing module {module_name}: {e}")

def view_source(obj):
    """Display the source code of an object (function or class)"""
    try:
        source = inspect.getsource(obj)
        display(Markdown(f"```python\n{source}\n```"))
    except Exception as e:
        print(f"Error retrieving source: {e}")

## Check Bonsai Installation

Let's verify that the Bonsai v3 module is available for import:

In [ ]:
try:
    from utils.bonsaitree.bonsaitree import v3
    print("✅ Successfully imported Bonsai v3 module")
except ImportError as e:
    print(f"❌ Failed to import Bonsai v3 module: {e}")
    print("This lab requires access to the Bonsai v3 codebase.")
    print("Make sure you've properly set up your environment with the Bonsai repository.")

## Lab 20: Error Handling and Data Validation in Bonsai v3

In computational genetic genealogy, we often work with data that is imperfect, incomplete, or inconsistent. Effective error handling and data validation are critical for building robust applications that can gracefully handle these challenges. In this lab, we'll explore the error handling and data validation mechanisms used in Bonsai v3.

We'll focus on several key aspects:

1. **Custom Exception Hierarchy**: Understanding Bonsai's exception classes and how they help with targeted error handling
2. **Input Validation**: Techniques for validating data before processing
3. **Defensive Programming**: Strategies to anticipate and handle potential errors
4. **Graceful Degradation**: Continuing operation despite partial failures
5. **Logging and Debugging**: Tools for tracking and diagnosing issues

Let's implement simplified versions of these mechanisms to better understand how they work in Bonsai v3.

## Part 1: Custom Exception Hierarchy

Bonsai v3 uses a custom exception hierarchy to provide specific error types that make it easier to handle different categories of errors appropriately. Let's look at how this works:

In [ ]:
# Check if Bonsai exceptions are available for import
if not is_jupyterlite():
    try:
        from utils.bonsaitree.bonsaitree.v3.exceptions import BonsaiException
        
        # Display the source code if available
        print("Source code for BonsaiException base class:")
        view_source(BonsaiException)
    except (ImportError, AttributeError) as e:
        print(f"Could not import exception classes: {e}")
        print("We'll implement our own versions for this lab.")
else:
    print("Cannot display source code in JupyterLite environment.")

### 1.1 Implementing a Custom Exception Hierarchy

Let's implement a simplified version of Bonsai's exception hierarchy:

In [ ]:
class BonsaiException(Exception):
    """
    Base exception class for all Bonsai-specific exceptions.
    
    This provides a common ancestor for all Bonsai exceptions, making it
    easier to catch any Bonsai-related error with a single except clause.
    
    Attributes:
        message: Human-readable error message
        details: Optional dictionary with additional error details
    """
    def __init__(self, message, details=None):
        self.message = message
        self.details = details or {}
        super().__init__(self.message)
    
    def __str__(self):
        if self.details:
            details_str = ", ".join(f"{k}={v}" for k, v in self.details.items())
            return f"{self.message} ({details_str})"
        return self.message
    
    def to_dict(self):
        """Convert the exception to a dictionary representation."""
        return {
            "error_type": self.__class__.__name__,
            "message": self.message,
            "details": self.details
        }

# Input-related exceptions
class InputError(BonsaiException):
    """Base class for input-related errors."""
    pass

class ValidationError(InputError):
    """Raised when input data fails validation."""
    pass

class DataFormatError(InputError):
    """Raised when input data has incorrect format."""
    pass

class MissingDataError(InputError):
    """Raised when required data is missing."""
    pass

# Processing-related exceptions
class ProcessingError(BonsaiException):
    """Base class for processing-related errors."""
    pass

class CalculationError(ProcessingError):
    """Raised when a calculation fails."""
    pass

class IncompatibleDataError(ProcessingError):
    """Raised when data sources are incompatible."""
    pass

class AlgorithmError(ProcessingError):
    """Raised when an algorithm fails to produce a result."""
    pass

# Configuration-related exceptions
class ConfigurationError(BonsaiException):
    """Raised when there's an issue with the configuration."""
    pass

# Resource-related exceptions
class ResourceError(BonsaiException):
    """Base class for resource-related errors."""
    pass

class MemoryError(ResourceError):
    """Raised when an operation would exceed available memory."""
    pass

class TimeoutError(ResourceError):
    """Raised when an operation takes too long."""
    pass

# Create a visual representation of the exception hierarchy
def visualize_exception_hierarchy():
    """Visualize the exception hierarchy using networkx."""
    # Create a directed graph
    G = nx.DiGraph()
    
    # Get all classes defined in this notebook
    all_classes = {name: cls for name, cls in globals().items() if isinstance(cls, type)}
    
    # Add exception classes to the graph
    exception_classes = {name: cls for name, cls in all_classes.items() 
                         if issubclass(cls, Exception) and cls != Exception}
    
    # Add nodes and edges
    for name, cls in exception_classes.items():
        G.add_node(name)
        # Get base classes (excluding object and Exception)
        bases = [base.__name__ for base in cls.__bases__ 
                 if base.__name__ in exception_classes or base.__name__ == "Exception"]
        for base in bases:
            G.add_edge(base, name)
    
    # Create positions for the graph
    pos = nx.nx_agraph.graphviz_layout(G, prog="dot") if hasattr(nx, 'nx_agraph') else nx.spring_layout(G)
    
    # Create figure
    plt.figure(figsize=(12, 8))
    
    # Define node colors based on type
    node_colors = []
    for node in G.nodes():
        if node == "Exception" or node == "BonsaiException":
            node_colors.append("#6495ED")  # Base classes - blue
        elif "Input" in node:
            node_colors.append("#FF6347")  # Input errors - red
        elif "Processing" in node or "Algorithm" in node or "Calculation" in node:
            node_colors.append("#32CD32")  # Processing errors - green
        elif "Configuration" in node:
            node_colors.append("#FFD700")  # Configuration errors - yellow
        elif "Resource" in node or "Memory" in node or "Timeout" in node:
            node_colors.append("#9370DB")  # Resource errors - purple
        else:
            node_colors.append("#A9A9A9")  # Others - gray
    
    # Draw the graph
    nx.draw_networkx(G, pos, with_labels=True, node_color=node_colors, 
                    node_size=3000, alpha=0.8, arrows=True, 
                    arrowsize=20, arrowstyle='->', width=2)
    
    plt.title("Bonsai Exception Hierarchy")
    plt.axis("off")
    plt.tight_layout()
    plt.show()

# Visualize the exception hierarchy
visualize_exception_hierarchy()

### 1.2 Using the Exception Hierarchy

Now that we have our exception hierarchy, let's see how it can be used in practice:

In [ ]:
def process_ibd_segments(segments, min_cm=7.0, min_snps=500):
    """
    Process IBD segments with input validation and error handling.
    
    Args:
        segments: List of IBD segments, each as (start_pos, end_pos, cM, snps)
        min_cm: Minimum centiMorgan threshold
        min_snps: Minimum SNP threshold
        
    Returns:
        List of filtered and processed segments
    
    Raises:
        ValidationError: If segments don't meet validation criteria
        MissingDataError: If required fields are missing
        DataFormatError: If data format is incorrect
    """
    # Input validation
    if segments is None:
        raise MissingDataError("IBD segments data is missing")
    
    if not isinstance(segments, list):
        raise DataFormatError(
            "IBD segments must be provided as a list",
            details={"actual_type": type(segments).__name__}
        )
    
    if len(segments) == 0:
        # No segments is a valid case, just return empty list
        return []
    
    # Verify segment format
    valid_segments = []
    invalid_segments = []
    
    for i, segment in enumerate(segments):
        try:
            # Check if segment has the expected format
            if not isinstance(segment, tuple) or len(segment) != 4:
                raise DataFormatError(
                    f"Segment {i} has incorrect format",
                    details={"segment": segment, "expected_format": "(start_pos, end_pos, cM, snps)"}
                )
            
            start_pos, end_pos, cm, snps = segment
            
            # Validate values
            if not isinstance(start_pos, (int, float)) or start_pos < 0:
                raise ValidationError(
                    f"Segment {i} has invalid start position",
                    details={"start_pos": start_pos}
                )
            
            if not isinstance(end_pos, (int, float)) or end_pos <= start_pos:
                raise ValidationError(
                    f"Segment {i} has invalid end position",
                    details={"end_pos": end_pos, "start_pos": start_pos}
                )
            
            if not isinstance(cm, (int, float)) or cm < 0:
                raise ValidationError(
                    f"Segment {i} has invalid centiMorgan value",
                    details={"cM": cm}
                )
            
            if not isinstance(snps, (int, float)) or snps < 0:
                raise ValidationError(
                    f"Segment {i} has invalid SNP count",
                    details={"snps": snps}
                )
            
            # Apply thresholds
            if cm < min_cm:
                continue  # Skip segments below cM threshold
            
            if snps < min_snps:
                continue  # Skip segments below SNP threshold
            
            # If we reach here, the segment is valid
            valid_segments.append(segment)
            
        except BonsaiException as e:
            # Store invalid segment and continue processing
            invalid_segments.append((i, segment, str(e)))
    
    # If all segments were invalid, raise an error
    if len(valid_segments) == 0 and len(invalid_segments) > 0:
        raise ValidationError(
            f"All {len(invalid_segments)} segments failed validation",
            details={"invalid_segments": invalid_segments}
        )
    
    return valid_segments

# Let's test the function with different inputs
def test_process_ibd_segments():
    # Valid input
    valid_segments = [
        (1000000, 5000000, 10.5, 1200),
        (10000000, 15000000, 8.2, 900),
        (20000000, 25000000, 6.5, 600),  # Below cM threshold, should be filtered out
        (30000000, 35000000, 12.0, 400)   # Below SNP threshold, should be filtered out
    ]
    
    # Different error cases
    missing_data = None
    wrong_type = "not a list"
    empty_list = []
    invalid_segment = [
        (1000000, 5000000, 10.5, 1200),
        "invalid",
        (20000000, 25000000, 6.5, 600)
    ]
    invalid_values = [
        (1000000, 5000000, 10.5, 1200),
        (15000000, 10000000, 8.2, 900),  # end_pos < start_pos
        (20000000, 25000000, -6.5, 600)  # negative cM
    ]
    all_invalid = [
        (15000000, 10000000, 8.2, 900),  # end_pos < start_pos
        (20000000, 25000000, -6.5, 600),  # negative cM
        ("invalid", "positions", "values", "here")  # wrong types
    ]
    
    # Test cases
    test_cases = [
        ("Valid segments", valid_segments, None),
        ("Missing data", missing_data, MissingDataError),
        ("Wrong type", wrong_type, DataFormatError),
        ("Empty list", empty_list, None),
        ("Invalid segment", invalid_segment, DataFormatError),
        ("Invalid values", invalid_values, DataFormatError),
        ("All invalid", all_invalid, ValidationError)
    ]
    
    # Run the tests
    results = []
    for name, input_data, expected_error in test_cases:
        try:
            result = process_ibd_segments(input_data)
            results.append({
                "test_name": name,
                "status": "Pass" if expected_error is None else "Fail (expected error not raised)",
                "result": f"{len(result)} valid segments"
            })
        except Exception as e:
            if expected_error and isinstance(e, expected_error):
                results.append({
                    "test_name": name,
                    "status": "Pass (expected error raised)",
                    "result": str(e)
                })
            else:
                results.append({
                    "test_name": name,
                    "status": f"Fail (unexpected error: {type(e).__name__})",
                    "result": str(e)
                })
    
    # Display results as a table
    print("Test Results:")
    print("-" * 100)
    print(f"{'Test Name':<20} | {'Status':<35} | {'Result':<40}")
    print("-" * 100)
    
    for result in results:
        print(f"{result['test_name']:<20} | {result['status']:<35} | {result['result']:<40}")

# Run the test function
test_process_ibd_segments()

### 1.3 Benefits of Targeted Error Handling

The custom exception hierarchy allows for more targeted error handling, letting us catch and respond to specific types of errors. Let's see how this works in practice:

In [ ]:
def ibd_analysis_pipeline(segments_data, iid1, iid2, min_cm=7.0, min_snps=500):
    """
    A simplified IBD analysis pipeline with targeted error handling.
    
    Args:
        segments_data: List of IBD segments
        iid1: ID of the first individual
        iid2: ID of the second individual
        min_cm: Minimum centiMorgan threshold
        min_snps: Minimum SNP threshold
        
    Returns:
        Dict with analysis results
    """
    try:
        # Process and validate the segments
        valid_segments = process_ibd_segments(segments_data, min_cm, min_snps)
        
        # Calculate statistics and relationship
        total_cm = sum(segment[2] for segment in valid_segments)
        longest_segment = max(valid_segments, key=lambda x: x[2])[2] if valid_segments else 0
        total_snps = sum(segment[3] for segment in valid_segments)
        
        # Simplified relationship inference
        relationship = "unknown"
        confidence = 0.0
        
        if total_cm > 3000:
            relationship = "parent-child"
            confidence = 0.95
        elif total_cm > 2000:
            relationship = "full-sibling"
            confidence = 0.9
        elif total_cm > 1000:
            relationship = "half-sibling/grandparent"
            confidence = 0.85
        elif total_cm > 500:
            relationship = "1st cousin"
            confidence = 0.8
        elif total_cm > 250:
            relationship = "2nd cousin"
            confidence = 0.7
        elif total_cm > 100:
            relationship = "3rd cousin"
            confidence = 0.6
        elif total_cm > 50:
            relationship = "4th cousin"
            confidence = 0.5
        elif total_cm > 20:
            relationship = "distant relative"
            confidence = 0.4
        else:
            relationship = "very distant/unrelated"
            confidence = 0.3
        
        # Return the results
        return {
            "iid1": iid1,
            "iid2": iid2,
            "segments_count": len(valid_segments),
            "total_cm": total_cm,
            "longest_segment_cm": longest_segment,
            "total_snps": total_snps,
            "inferred_relationship": relationship,
            "confidence": confidence,
            "status": "success"
        }
    
    except ValidationError as e:
        # Handle validation errors (can often be fixed by adjusting parameters)
        print(f"Validation error: {str(e)}")
        return {
            "iid1": iid1,
            "iid2": iid2,
            "error": str(e),
            "error_type": "validation",
            "status": "error",
            "suggested_action": "Check input data format and values"
        }
    
    except MissingDataError as e:
        # Handle missing data errors
        print(f"Missing data error: {str(e)}")
        return {
            "iid1": iid1,
            "iid2": iid2,
            "error": str(e),
            "error_type": "missing_data",
            "status": "error",
            "suggested_action": "Ensure all required data is provided"
        }
    
    except DataFormatError as e:
        # Handle data format errors
        print(f"Data format error: {str(e)}")
        return {
            "iid1": iid1,
            "iid2": iid2,
            "error": str(e),
            "error_type": "data_format",
            "status": "error",
            "suggested_action": "Fix the format of the input data"
        }
    
    except BonsaiException as e:
        # Handle any other Bonsai-specific exceptions
        print(f"Bonsai error: {str(e)}")
        return {
            "iid1": iid1,
            "iid2": iid2,
            "error": str(e),
            "error_type": "bonsai_error",
            "status": "error",
            "suggested_action": "Check the error message for details"
        }
    
    except Exception as e:
        # Handle any other unexpected exceptions
        print(f"Unexpected error: {str(e)}")
        return {
            "iid1": iid1,
            "iid2": iid2,
            "error": str(e),
            "error_type": "unexpected",
            "status": "error",
            "suggested_action": "Contact support with the error details"
        }

# Let's test the pipeline with different error scenarios
def test_ibd_analysis_pipeline():
    # Test cases
    test_cases = [
        {
            "name": "Valid Data",
            "segments": [
                (1000000, 5000000, 10.5, 1200),
                (10000000, 15000000, 8.2, 900),
                (20000000, 25000000, 15.5, 1800)
            ],
            "iid1": "sample1",
            "iid2": "sample2"
        },
        {
            "name": "Missing Data",
            "segments": None,
            "iid1": "sample1",
            "iid2": "sample2"
        },
        {
            "name": "Wrong Data Format",
            "segments": "not a list",
            "iid1": "sample1",
            "iid2": "sample2"
        },
        {
            "name": "Invalid Segments",
            "segments": [
                (1000000, 5000000, 10.5, 1200),
                (10000000, 5000000, 8.2, 900),  # end < start
                (20000000, 25000000, -5.5, 1800)  # negative cM
            ],
            "iid1": "sample1",
            "iid2": "sample2"
        },
        {
            "name": "Empty Segments",
            "segments": [],
            "iid1": "sample1",
            "iid2": "sample2"
        },
        {
            "name": "Parent-Child Relationship",
            "segments": [
                (1000000, 5000000, 1000.5, 12000),
                (10000000, 15000000, 800.2, 9000),
                (20000000, 25000000, 1500.5, 18000)
            ],
            "iid1": "parent",
            "iid2": "child"
        }
    ]
    
    # Run the tests
    results = []
    for test_case in test_cases:
        print(f"\nRunning test: {test_case['name']}")
        result = ibd_analysis_pipeline(
            test_case["segments"], 
            test_case["iid1"], 
            test_case["iid2"]
        )
        print(f"Result status: {result['status']}")
        
        # Store result for later comparison
        results.append({
            "test_name": test_case["name"],
            "status": result["status"],
            "result": result
        })
    
    # Create a visualization of the test results
    plt.figure(figsize=(12, 6))
    
    # Count successes vs different error types
    counts = {"success": 0}
    for result in results:
        if result["status"] == "success":
            counts["success"] += 1
        else:
            error_type = result["result"]["error_type"]
            counts[error_type] = counts.get(error_type, 0) + 1
    
    # Create bar chart
    categories = list(counts.keys())
    values = list(counts.values())
    colors = ['#66b3ff' if cat == 'success' else '#ff9999' for cat in categories]
    
    plt.bar(categories, values, color=colors)
    plt.title('IBD Analysis Pipeline Test Results')
    plt.xlabel('Result Type')
    plt.ylabel('Count')
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
    # Display full results table
    print("\nDetailed Test Results:")
    print("-" * 100)
    for result in results:
        print(f"Test: {result['test_name']}")
        print(f"Status: {result['status']}")
        
        r = result["result"]
        if result["status"] == "success":
            print(f"Segments: {r['segments_count']}")
            print(f"Total cM: {r['total_cm']:.2f}")
            print(f"Relationship: {r['inferred_relationship']} (confidence: {r['confidence']:.2f})")
        else:
            print(f"Error type: {r['error_type']}")
            print(f"Error message: {r['error']}")
            print(f"Suggested action: {r['suggested_action']}")
        
        print("-" * 100)

# Run the test function
test_ibd_analysis_pipeline()

## Part 2: Input Validation

Input validation is a critical part of error handling, helping to catch issues early before they propagate through the system. Let's explore the input validation techniques used in Bonsai v3.

### 2.1 Using Data Classes for Validation

One technique used in Bonsai v3 is to define data classes with built-in validation. Let's implement a simple example:

In [ ]:
@dataclass
class IBDSegment:
    """
    A data class representing an IBD segment with built-in validation.
    
    Attributes:
        start_pos: Start position (in base pairs)
        end_pos: End position (in base pairs)
        cm: Length in centiMorgans
        snps: Number of SNPs in the segment
        chromosome: Chromosome number (1-22, X, Y)
    """
    start_pos: int
    end_pos: int
    cm: float
    snps: int
    chromosome: str = "1"  # Default to chromosome 1
    
    def __post_init__(self):
        """Validate the segment after initialization."""
        # Check types
        if not isinstance(self.start_pos, int):
            raise ValidationError(
                "start_pos must be an integer",
                details={"start_pos": self.start_pos}
            )
        
        if not isinstance(self.end_pos, int):
            raise ValidationError(
                "end_pos must be an integer",
                details={"end_pos": self.end_pos}
            )
        
        if not isinstance(self.cm, (int, float)):
            raise ValidationError(
                "cm must be a number",
                details={"cm": self.cm}
            )
        
        if not isinstance(self.snps, int):
            raise ValidationError(
                "snps must be an integer",
                details={"snps": self.snps}
            )
        
        # Check values
        if self.start_pos < 0:
            raise ValidationError(
                "start_pos must be non-negative",
                details={"start_pos": self.start_pos}
            )
        
        if self.end_pos <= self.start_pos:
            raise ValidationError(
                "end_pos must be greater than start_pos",
                details={"start_pos": self.start_pos, "end_pos": self.end_pos}
            )
        
        if self.cm < 0:
            raise ValidationError(
                "cm must be non-negative",
                details={"cm": self.cm}
            )
        
        if self.snps < 0:
            raise ValidationError(
                "snps must be non-negative",
                details={"snps": self.snps}
            )
        
        # Validate chromosome
        valid_chromosomes = [str(i) for i in range(1, 23)] + ["X", "Y"]
        if self.chromosome not in valid_chromosomes:
            raise ValidationError(
                "chromosome must be 1-22, X, or Y",
                details={"chromosome": self.chromosome}
            )
    
    @property
    def length_bp(self):
        """Get the length of the segment in base pairs."""
        return self.end_pos - self.start_pos
    
    @property
    def density(self):
        """Get the SNP density (SNPs per centiMorgan)."""
        return self.snps / self.cm if self.cm > 0 else 0
    
    def overlaps(self, other):
        """Check if this segment overlaps with another segment."""
        if self.chromosome != other.chromosome:
            return False
        return self.start_pos < other.end_pos and self.end_pos > other.start_pos

# Test the data class with valid and invalid data
def test_ibd_segment_class():
    # Valid data
    valid_segments = [
        {"start_pos": 1000000, "end_pos": 5000000, "cm": 10.5, "snps": 1200, "chromosome": "1"},
        {"start_pos": 10000000, "end_pos": 15000000, "cm": 8.2, "snps": 900, "chromosome": "X"}
    ]
    
    # Invalid data
    invalid_segments = [
        {"start_pos": -1000, "end_pos": 5000000, "cm": 10.5, "snps": 1200, "chromosome": "1"},  # negative start
        {"start_pos": 15000000, "end_pos": 10000000, "cm": 8.2, "snps": 900, "chromosome": "X"},  # end < start
        {"start_pos": 1000000, "end_pos": 5000000, "cm": -10.5, "snps": 1200, "chromosome": "1"},  # negative cM
        {"start_pos": 1000000, "end_pos": 5000000, "cm": 10.5, "snps": -1200, "chromosome": "1"},  # negative SNPs
        {"start_pos": 1000000, "end_pos": 5000000, "cm": 10.5, "snps": 1200, "chromosome": "Z"}  # invalid chromosome
    ]
    
    # Test valid segments
    print("Testing valid segments:")
    valid_objects = []
    for i, segment_data in enumerate(valid_segments):
        try:
            segment = IBDSegment(**segment_data)
            valid_objects.append(segment)
            print(f"✅ Segment {i+1} is valid: {segment}")
            print(f"   Length: {segment.length_bp} bp, Density: {segment.density:.2f} SNPs/cM")
        except Exception as e:
            print(f"❌ Segment {i+1} failed unexpectedly: {e}")
    
    # Test overlapping segments
    if len(valid_objects) >= 2:
        overlaps = valid_objects[0].overlaps(valid_objects[1])
        print(f"\nSegments overlap: {overlaps}")
    
    # Test invalid segments
    print("\nTesting invalid segments:")
    for i, segment_data in enumerate(invalid_segments):
        try:
            segment = IBDSegment(**segment_data)
            print(f"❌ Segment {i+1} should have failed but didn't: {segment}")
        except ValidationError as e:
            print(f"✅ Segment {i+1} failed as expected: {e}")
        except Exception as e:
            print(f"⚠️ Segment {i+1} failed with unexpected error: {e}")

# Run the tests
test_ibd_segment_class()

### 2.2 Validator Functions

Another approach is to use separate validator functions that can be applied to different types of data. This allows for more flexibility and reuse:

In [ ]:
class Validator:
    """A collection of validation functions for genetic data."""
    
    @staticmethod
    def validate_chromosome(chrom):
        """
        Validate a chromosome identifier.
        
        Args:
            chrom: Chromosome identifier to validate
            
        Returns:
            The validated chromosome identifier
            
        Raises:
            ValidationError: If the chromosome is invalid
        """
        valid_chromosomes = [str(i) for i in range(1, 23)] + ["X", "Y"]
        
        if not isinstance(chrom, str):
            raise ValidationError(
                "Chromosome must be a string",
                details={"chromosome": chrom, "type": type(chrom).__name__}
            )
        
        # Normalize the chromosome format (remove "chr" prefix if present)
        normalized = chrom.replace("chr", "").upper()
        
        if normalized not in valid_chromosomes:
            raise ValidationError(
                "Chromosome must be 1-22, X, or Y",
                details={"chromosome": chrom, "normalized": normalized}
            )
        
        return normalized
    
    @staticmethod
    def validate_position(pos, allow_zero=True):
        """
        Validate a genomic position.
        
        Args:
            pos: Position to validate
            allow_zero: Whether to allow position 0
            
        Returns:
            The validated position
            
        Raises:
            ValidationError: If the position is invalid
        """
        if not isinstance(pos, (int, float)):
            raise ValidationError(
                "Position must be a number",
                details={"position": pos, "type": type(pos).__name__}
            )
        
        if not allow_zero and pos == 0:
            raise ValidationError(
                "Position cannot be zero",
                details={"position": pos}
            )
        
        if pos < 0:
            raise ValidationError(
                "Position must be non-negative",
                details={"position": pos}
            )
        
        # Convert to integer
        return int(pos)
    
    @staticmethod
    def validate_centimorgans(cm):
        """
        Validate a centiMorgan value.
        
        Args:
            cm: centiMorgan value to validate
            
        Returns:
            The validated centiMorgan value
            
        Raises:
            ValidationError: If the centiMorgan value is invalid
        """
        if not isinstance(cm, (int, float)):
            raise ValidationError(
                "centiMorgan value must be a number",
                details={"cm": cm, "type": type(cm).__name__}
            )
        
        if cm < 0:
            raise ValidationError(
                "centiMorgan value must be non-negative",
                details={"cm": cm}
            )
        
        return float(cm)
    
    @staticmethod
    def validate_snp_count(snps):
        """
        Validate a SNP count.
        
        Args:
            snps: SNP count to validate
            
        Returns:
            The validated SNP count
            
        Raises:
            ValidationError: If the SNP count is invalid
        """
        if not isinstance(snps, (int, float)):
            raise ValidationError(
                "SNP count must be a number",
                details={"snps": snps, "type": type(snps).__name__}
            )
        
        if snps < 0:
            raise ValidationError(
                "SNP count must be non-negative",
                details={"snps": snps}
            )
        
        # Convert to integer
        return int(snps)
    
    @staticmethod
    def validate_individual_id(iid):
        """
        Validate an individual identifier.
        
        Args:
            iid: Individual identifier to validate
            
        Returns:
            The validated individual identifier
            
        Raises:
            ValidationError: If the individual identifier is invalid
        """
        if not isinstance(iid, str):
            raise ValidationError(
                "Individual ID must be a string",
                details={"iid": iid, "type": type(iid).__name__}
            )
        
        if len(iid) == 0:
            raise ValidationError(
                "Individual ID cannot be empty",
                details={"iid": iid}
            )
        
        # No other restrictions on individual IDs
        return iid
    
    @staticmethod
    def validate_ibd_segment(segment_dict):
        """
        Validate an IBD segment dictionary.
        
        Args:
            segment_dict: Dictionary with IBD segment data
            
        Returns:
            Validated and normalized segment dictionary
            
        Raises:
            ValidationError: If the segment is invalid
            MissingDataError: If required fields are missing
        """
        if not isinstance(segment_dict, dict):
            raise ValidationError(
                "Segment must be a dictionary",
                details={"segment": segment_dict, "type": type(segment_dict).__name__}
            )
        
        # Check for required fields
        required_fields = ["start_pos", "end_pos", "cm", "snps"]
        for field in required_fields:
            if field not in segment_dict:
                raise MissingDataError(
                    f"Missing required field: {field}",
                    details={"segment": segment_dict}
                )
        
        # Validate and normalize each field
        normalized = {}
        
        # Optional chromosome field (default to "1")
        chrom = segment_dict.get("chromosome", "1")
        normalized["chromosome"] = Validator.validate_chromosome(chrom)
        
        # Required fields
        normalized["start_pos"] = Validator.validate_position(segment_dict["start_pos"])
        normalized["end_pos"] = Validator.validate_position(segment_dict["end_pos"])
        normalized["cm"] = Validator.validate_centimorgans(segment_dict["cm"])
        normalized["snps"] = Validator.validate_snp_count(segment_dict["snps"])
        
        # Additional validation for segment integrity
        if normalized["end_pos"] <= normalized["start_pos"]:
            raise ValidationError(
                "End position must be greater than start position",
                details={
                    "start_pos": normalized["start_pos"],
                    "end_pos": normalized["end_pos"]
                }
            )
        
        return normalized

# Test the validator functions
def test_validators():
    # Test chromosome validation
    print("Testing chromosome validation:")
    chrom_tests = ["1", "22", "X", "chrY", "chr5", "Z", 10]
    for chrom in chrom_tests:
        try:
            result = Validator.validate_chromosome(chrom)
            print(f"✅ Chromosome '{chrom}' is valid: {result}")
        except ValidationError as e:
            print(f"❌ Chromosome '{chrom}' is invalid: {e}")
    
    # Test position validation
    print("\nTesting position validation:")
    pos_tests = [10000, 0, -1, "not a number"]
    for pos in pos_tests:
        try:
            result = Validator.validate_position(pos)
            print(f"✅ Position {pos} is valid: {result}")
        except ValidationError as e:
            print(f"❌ Position {pos} is invalid: {e}")
    
    # Test centiMorgan validation
    print("\nTesting centiMorgan validation:")
    cm_tests = [10.5, 0, -5.2, "not a number"]
    for cm in cm_tests:
        try:
            result = Validator.validate_centimorgans(cm)
            print(f"✅ cM {cm} is valid: {result}")
        except ValidationError as e:
            print(f"❌ cM {cm} is invalid: {e}")
    
    # Test segment validation
    print("\nTesting segment validation:")
    segment_tests = [
        {
            "start_pos": 1000000,
            "end_pos": 5000000,
            "cm": 10.5,
            "snps": 1200,
            "chromosome": "1"
        },
        {
            "start_pos": 10000000,
            "end_pos": 5000000,  # end < start
            "cm": 8.2,
            "snps": 900,
            "chromosome": "X"
        },
        {
            "start_pos": 1000000,
            "end_pos": 5000000,
            "snps": 1200,  # missing cm
            "chromosome": "1"
        },
        "not a dictionary"
    ]
    
    for i, segment in enumerate(segment_tests):
        try:
            result = Validator.validate_ibd_segment(segment)
            print(f"✅ Segment {i+1} is valid: {result}")
        except (ValidationError, MissingDataError) as e:
            print(f"❌ Segment {i+1} is invalid: {e}")
        except Exception as e:
            print(f"⚠️ Segment {i+1} raised unexpected error: {e}")

# Run the tests
test_validators()

## Part 3: Defensive Programming

Defensive programming is all about anticipating potential problems and handling them gracefully. Let's explore some defensive programming techniques used in Bonsai v3.

### 3.1 Checking Preconditions and Postconditions

One key technique in defensive programming is to check preconditions (conditions that must be true before a function runs) and postconditions (conditions that should be true after a function runs):

In [ ]:
def get_common_ancestors(id1, id2, up_dict):
    """
    Find common ancestors of two individuals in a pedigree.
    
    This function demonstrates defensive programming with
    precondition and postcondition checks.
    
    Args:
        id1: ID of the first individual
        id2: ID of the second individual
        up_dict: Dictionary mapping individual IDs to their parents
        
    Returns:
        Set of common ancestor IDs
        
    Raises:
        ValidationError: If input validation fails
        MissingDataError: If required data is missing
    """
    # Precondition checks
    if up_dict is None:
        raise MissingDataError("Pedigree data is missing (up_dict is None)")
    
    if not isinstance(up_dict, dict):
        raise ValidationError(
            "Pedigree data must be a dictionary",
            details={"actual_type": type(up_dict).__name__}
        )
    
    # Validate individual IDs
    for id_val, label in [(id1, "id1"), (id2, "id2")]:
        if id_val is None:
            raise MissingDataError(f"Individual ID is missing ({label} is None)")
        
        if not id_val in up_dict:
            raise ValidationError(
                f"Individual not found in pedigree",
                details={"id": id_val, "label": label}
            )
    
    # Defensive check for cycles in the pedigree
    def check_for_cycles(iid, visited=None, path=None):
        """Check if there are cycles in the ancestry path."""
        if visited is None:
            visited = set()
        if path is None:
            path = []
        
        if iid in path:
            # Found a cycle
            cycle_path = path[path.index(iid):] + [iid]
            raise ValidationError(
                "Cycle detected in pedigree",
                details={"cycle": "->".join(str(i) for i in cycle_path)}
            )
        
        if iid in visited:
            # Already checked this individual
            return
        
        visited.add(iid)
        new_path = path + [iid]
        
        # Check all parents
        for parent_id in up_dict.get(iid, {}):
            check_for_cycles(parent_id, visited, new_path)
    
    # Check for cycles starting from both individuals
    try:
        check_for_cycles(id1)
        check_for_cycles(id2)
    except ValidationError as e:
        # Attach additional context to the error
        e.details["error_context"] = "Cycle check during get_common_ancestors"
        raise
    
    # The actual function implementation
    ancestors1 = get_all_ancestors(id1, up_dict)
    ancestors2 = get_all_ancestors(id2, up_dict)
    
    # Find the common ancestors
    common_ancestors = ancestors1.intersection(ancestors2)
    
    # Postcondition checks
    if id1 in ancestors2:
        # id1 is an ancestor of id2
        assert id1 in common_ancestors, "id1 should be in common ancestors if it's an ancestor of id2"
    
    if id2 in ancestors1:
        # id2 is an ancestor of id1
        assert id2 in common_ancestors, "id2 should be in common ancestors if it's an ancestor of id1"
    
    return common_ancestors

def get_all_ancestors(id_val, up_dict):
    """
    Get all ancestors of an individual in a pedigree.
    
    Args:
        id_val: ID of the individual
        up_dict: Dictionary mapping individual IDs to their parents
        
    Returns:
        Set of ancestor IDs
    """
    ancestors = set()
    to_process = [id_val]
    
    while to_process:
        current = to_process.pop()
        
        for parent in up_dict.get(current, {}):
            if parent not in ancestors:
                ancestors.add(parent)
                to_process.append(parent)
    
    return ancestors

# Create a test pedigree
def create_test_pedigree(include_cycle=False):
    """
    Create a test pedigree for demonstrating defensive programming.
    
    Args:
        include_cycle: Whether to include a cycle in the pedigree
        
    Returns:
        Dictionary representing the pedigree
    """
    # Create a basic pedigree
    #
    #      1       2
    #      |       |
    #      +---3---+
    #          |
    #      +---4---+
    #      |       |
    #      5       6
    #
    pedigree = {
        1: {},  # No parents (founder)
        2: {},  # No parents (founder)
        3: {1: 1, 2: 1},  # Child of 1 and 2
        4: {3: 1},  # Child of 3
        5: {4: 1},  # Child of 4
        6: {4: 1},  # Child of 4
    }
    
    if include_cycle:
        # Add a cycle: 1 -> 3 -> 4 -> 1
        pedigree[1] = {4: 1}
    
    return pedigree

# Test the function with different inputs
def test_get_common_ancestors():
    # Create test pedigrees
    regular_pedigree = create_test_pedigree(include_cycle=False)
    cyclic_pedigree = create_test_pedigree(include_cycle=True)
    
    # Test cases
    test_cases = [
        {
            "name": "Valid case - distant relatives",
            "id1": 5,
            "id2": 6,
            "pedigree": regular_pedigree,
            "expected_success": True,
            "expected_ancestors": {1, 2, 3, 4}
        },
        {
            "name": "Valid case - direct ancestor",
            "id1": 1,
            "id2": 5,
            "pedigree": regular_pedigree,
            "expected_success": True,
            "expected_ancestors": {1}
        },
        {
            "name": "Missing individual",
            "id1": 5,
            "id2": 7,  # not in pedigree
            "pedigree": regular_pedigree,
            "expected_success": False,
            "expected_error": ValidationError
        },
        {
            "name": "Cyclic pedigree",
            "id1": 5,
            "id2": 6,
            "pedigree": cyclic_pedigree,
            "expected_success": False,
            "expected_error": ValidationError
        },
        {
            "name": "Missing pedigree",
            "id1": 5,
            "id2": 6,
            "pedigree": None,
            "expected_success": False,
            "expected_error": MissingDataError
        }
    ]
    
    # Run the tests
    results = []
    for test_case in test_cases:
        print(f"\nRunning test: {test_case['name']}")
        try:
            common = get_common_ancestors(
                test_case["id1"], 
                test_case["id2"], 
                test_case["pedigree"]
            )
            
            success = test_case["expected_success"]
            if success:
                expected_ancestors = test_case["expected_ancestors"]
                match = common == expected_ancestors
                if match:
                    print(f"✅ Test passed: Found expected common ancestors: {common}")
                else:
                    print(f"❌ Test failed: Expected {expected_ancestors}, got {common}")
                
                results.append({
                    "test": test_case["name"],
                    "status": "Pass" if match else "Fail",
                    "message": f"Expected {expected_ancestors}, got {common}" if not match else ""
                })
            else:
                print(f"❌ Test failed: Expected error but got success")
                results.append({
                    "test": test_case["name"],
                    "status": "Fail",
                    "message": "Expected error but got success"
                })
        
        except Exception as e:
            if not test_case["expected_success"]:
                expected_error = test_case["expected_error"]
                if isinstance(e, expected_error):
                    print(f"✅ Test passed: Got expected error: {e}")
                    results.append({
                        "test": test_case["name"],
                        "status": "Pass",
                        "message": f"Got expected error: {type(e).__name__}"
                    })
                else:
                    print(f"❌ Test failed: Expected {expected_error.__name__}, got {type(e).__name__}")
                    results.append({
                        "test": test_case["name"],
                        "status": "Fail",
                        "message": f"Expected {expected_error.__name__}, got {type(e).__name__}"
                    })
            else:
                print(f"❌ Test failed: Unexpected error: {e}")
                results.append({
                    "test": test_case["name"],
                    "status": "Fail",
                    "message": f"Unexpected error: {type(e).__name__}: {str(e)}"
                })
    
    # Display summary
    print("\nTest Summary:")
    print("-" * 100)
    for result in results:
        status_emoji = "✅" if result["status"] == "Pass" else "❌"
        print(f"{status_emoji} {result['test']}: {result['status']}")
        if result["message"]:
            print(f"   {result['message']}")
    
    # Count passes and failures
    passes = sum(1 for r in results if r["status"] == "Pass")
    failures = sum(1 for r in results if r["status"] == "Fail")
    
    print("-" * 100)
    print(f"Total: {len(results)} tests, {passes} passed, {failures} failed")
    
    # Visualize results
    plt.figure(figsize=(8, 5))
    plt.bar(["Passed", "Failed"], [passes, failures], color=["#66b3ff", "#ff9999"])
    plt.title("Test Results")
    plt.ylabel("Count")
    plt.grid(axis="y", linestyle="--", alpha=0.7)
    
    # Add text labels
    for i, v in enumerate([passes, failures]):
        plt.text(i, v + 0.1, str(v), ha="center")
    
    plt.tight_layout()
    plt.show()

# Run the tests
test_get_common_ancestors()

### 3.2 Implementing Fallback Mechanisms

Another important aspect of defensive programming is implementing fallback mechanisms when things go wrong:

In [ ]:
class IBDSegmentStore:
    """
    A store for IBD segments with fallback mechanisms.
    
    This class demonstrates defensive programming with
    fallback mechanisms and logging.
    """
    def __init__(self, segments=None, logger=None):
        """
        Initialize the IBD segment store.
        
        Args:
            segments: Initial segments to add
            logger: Logger instance
        """
        self.segments = []
        self.logger = logger or self._get_default_logger()
        
        self.config = {
            "min_cm": 7.0,
            "min_snps": 500,
            "max_segments": 10000,
            "allow_overlapping": True
        }
        
        # Try to add initial segments if provided
        if segments:
            try:
                for segment in segments:
                    self.add_segment(segment)
            except Exception as e:
                self.logger.warning(f"Failed to add some initial segments: {e}")
    
    def _get_default_logger(self):
        """Create a default logger if none is provided."""
        logger = logging.getLogger("IBDSegmentStore")
        logger.setLevel(logging.INFO)
        
        # Add console handler if not already present
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            logger.addHandler(handler)
        
        return logger
    
    def add_segment(self, segment_data):
        """
        Add a segment to the store with validation.
        
        Args:
            segment_data: Segment data (tuple or dictionary)
            
        Returns:
            True if the segment was added, False otherwise
            
        Raises:
            ValidationError: If the segment is invalid
        """
        try:
            # Handle different input formats with fallbacks
            if isinstance(segment_data, tuple) and len(segment_data) >= 4:
                # Tuple format: (start_pos, end_pos, cm, snps, [chromosome])
                start_pos, end_pos, cm, snps = segment_data[:4]
                chromosome = segment_data[4] if len(segment_data) > 4 else "1"
                
                segment_dict = {
                    "start_pos": start_pos,
                    "end_pos": end_pos,
                    "cm": cm,
                    "snps": snps,
                    "chromosome": chromosome
                }
            elif isinstance(segment_data, dict):
                # Dictionary format
                segment_dict = segment_data.copy()
            else:
                raise ValidationError(
                    "Segment must be a tuple or dictionary",
                    details={"segment": segment_data, "type": type(segment_data).__name__}
                )
            
            # Validate the segment
            validated = Validator.validate_ibd_segment(segment_dict)
            
            # Check against configuration thresholds
            if validated["cm"] < self.config["min_cm"]:
                self.logger.debug(
                    f"Segment rejected: cM ({validated['cm']}) below threshold "
                    f"({self.config['min_cm']})"
                )
                return False
            
            if validated["snps"] < self.config["min_snps"]:
                self.logger.debug(
                    f"Segment rejected: SNPs ({validated['snps']}) below threshold "
                    f"({self.config['min_snps']})"
                )
                return False
            
            # Check for maximum segments limit
            if len(self.segments) >= self.config["max_segments"]:
                self.logger.warning(
                    f"Segment store is full ({self.config['max_segments']} segments limit)"
                )
                # Fallback: Remove the shortest segment if this one is longer
                if any(s["cm"] < validated["cm"] for s in self.segments):
                    shortest = min(self.segments, key=lambda s: s["cm"])
                    self.segments.remove(shortest)
                    self.logger.info(
                        f"Removed shortest segment ({shortest['cm']} cM) to make room "
                        f"for new segment ({validated['cm']} cM)"
                    )
                else:
                    # New segment is not longer than any existing segment
                    return False
            
            # Check for overlapping segments
            if not self.config["allow_overlapping"]:
                # Create IBDSegment objects for easier comparison
                new_segment = IBDSegment(**validated)
                
                for existing in self.segments:
                    existing_obj = IBDSegment(**existing)
                    
                    if new_segment.chromosome == existing_obj.chromosome and new_segment.overlaps(existing_obj):
                        self.logger.debug(
                            f"Segment rejected: Overlaps with existing segment on "
                            f"chromosome {new_segment.chromosome}"
                        )
                        return False
            
            # Add the segment
            self.segments.append(validated)
            self.logger.debug(
                f"Added segment: chr{validated['chromosome']}:{validated['start_pos']}-"
                f"{validated['end_pos']} ({validated['cm']} cM, {validated['snps']} SNPs)"
            )
            return True
            
        except (ValidationError, MissingDataError) as e:
            # Log and re-raise validation errors
            self.logger.warning(f"Validation error: {e}")
            raise
        except Exception as e:
            # Log unexpected errors but don't raise
            self.logger.error(f"Unexpected error adding segment: {e}")
            return False
    
    def get_segments(self, chromosome=None, min_cm=None, min_snps=None):
        """
        Get segments with optional filtering.
        
        Args:
            chromosome: Optional chromosome filter
            min_cm: Optional minimum centiMorgan filter
            min_snps: Optional minimum SNP filter
            
        Returns:
            List of matching segments
        """
        # Start with all segments
        result = self.segments
        
        # Apply filters if provided
        if chromosome is not None:
            try:
                chrom = Validator.validate_chromosome(chromosome)
                result = [s for s in result if s["chromosome"] == chrom]
            except ValidationError as e:
                # Fallback: Log the error but return empty list
                self.logger.warning(f"Invalid chromosome filter: {e}")
                return []
        
        if min_cm is not None:
            try:
                cm_threshold = Validator.validate_centimorgans(min_cm)
                result = [s for s in result if s["cm"] >= cm_threshold]
            except ValidationError as e:
                # Fallback: Use the configured threshold instead
                self.logger.warning(
                    f"Invalid min_cm filter ({min_cm}), using default: {self.config['min_cm']}"
                )
                result = [s for s in result if s["cm"] >= self.config["min_cm"]]
        
        if min_snps is not None:
            try:
                snp_threshold = Validator.validate_snp_count(min_snps)
                result = [s for s in result if s["snps"] >= snp_threshold]
            except ValidationError as e:
                # Fallback: Use the configured threshold instead
                self.logger.warning(
                    f"Invalid min_snps filter ({min_snps}), using default: {self.config['min_snps']}"
                )
                result = [s for s in result if s["snps"] >= self.config["min_snps"]]
        
        return result
    
    def get_total_cm(self, chromosome=None):
        """
        Get the total centiMorgans for all segments.
        
        Args:
            chromosome: Optional chromosome filter
            
        Returns:
            Total centiMorgans
        """
        segments = self.get_segments(chromosome=chromosome)
        return sum(segment["cm"] for segment in segments)
    
    def get_segment_count(self, chromosome=None):
        """
        Get the number of segments.
        
        Args:
            chromosome: Optional chromosome filter
            
        Returns:
            Number of segments
        """
        segments = self.get_segments(chromosome=chromosome)
        return len(segments)
    
    def get_statistics(self):
        """
        Get statistics about the segments.
        
        Returns:
            Dictionary with statistics
        """
        if not self.segments:
            return {
                "segment_count": 0,
                "total_cm": 0,
                "avg_cm": 0,
                "max_cm": 0,
                "chromosomes": []
            }
        
        total_cm = sum(segment["cm"] for segment in self.segments)
        max_cm = max(segment["cm"] for segment in self.segments) if self.segments else 0
        chromosomes = sorted(set(segment["chromosome"] for segment in self.segments))
        
        return {
            "segment_count": len(self.segments),
            "total_cm": total_cm,
            "avg_cm": total_cm / len(self.segments) if self.segments else 0,
            "max_cm": max_cm,
            "chromosomes": chromosomes
        }

# Test the IBD Segment Store
def test_ibd_segment_store():
    # Create a custom logger for testing
    test_logger = logging.getLogger("TestIBDSegmentStore")
    test_logger.setLevel(logging.DEBUG)
    
    # Add a handler that captures log messages
    log_messages = []
    
    class ListHandler(logging.Handler):
        def emit(self, record):
            log_messages.append(self.format(record))
    
    handler = ListHandler()
    formatter = logging.Formatter('%(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    test_logger.addHandler(handler)
    
    # Test with various scenarios
    print("Creating segment store with mixed valid and invalid segments")
    store = IBDSegmentStore([
        (1000000, 5000000, 10.5, 1200, "1"),  # Valid
        (10000000, 15000000, 8.2, 900, "X"),  # Valid
        (20000000, 15000000, 8.2, 900, "2"),  # Invalid (end < start)
        (30000000, 35000000, 5.5, 600, "3"),  # Valid but below default cM threshold
        (40000000, 45000000, 12.0, 400, "4"),  # Valid but below default SNP threshold
        "not a segment"  # Invalid type
    ], logger=test_logger)
    
    # Check what was added
    print(f"\nStore contains {store.get_segment_count()} segments")
    for i, segment in enumerate(store.segments):
        print(f"Segment {i+1}: chr{segment['chromosome']}:{segment['start_pos']}-{segment['end_pos']} "
              f"({segment['cm']} cM, {segment['snps']} SNPs)")
    
    # Try adding more segments with various fallback situations
    print("\nTesting fallback mechanisms:")
    
    # Add a segment with invalid chromosome but valid otherwise
    try:
        result = store.add_segment((50000000, 55000000, 15.0, 1500, "Z"))
        print(f"Added segment with invalid chromosome: {result}")
    except Exception as e:
        print(f"Error adding segment with invalid chromosome: {e}")
    
    # Modify configuration to disallow overlapping segments
    store.config["allow_overlapping"] = False
    
    # Try to add an overlapping segment
    overlap_result = store.add_segment((1000000, 3000000, 8.0, 800, "1"))
    print(f"Added overlapping segment: {overlap_result}")
    
    # Try filtering with invalid parameters
    print("\nTesting filtering with invalid parameters:")
    segments_invalid_chrom = store.get_segments(chromosome="Z")
    print(f"Segments with invalid chromosome: {len(segments_invalid_chrom)}")
    
    segments_invalid_cm = store.get_segments(min_cm="not a number")
    print(f"Segments with invalid min_cm: {len(segments_invalid_cm)}")
    
    # Get statistics
    stats = store.get_statistics()
    print("\nSegment Statistics:")
    for key, value in stats.items():
        print(f"{key}: {value}")
    
    # Test maximum segment limit
    print("\nTesting maximum segment limit:")
    
    # Create a store with a small maximum
    small_store = IBDSegmentStore(logger=test_logger)
    small_store.config["max_segments"] = 3
    
    # Add segments to trigger the limit
    for i in range(5):
        cm_value = 10.0 + i  # Increasing cM values
        result = small_store.add_segment((i * 10000000, (i + 1) * 10000000, cm_value, 1000, "1"))
        print(f"Added segment {i+1} (cM: {cm_value}): {result}")
    
    # Check what's in the store
    print(f"\nSmall store contains {small_store.get_segment_count()} segments")
    for i, segment in enumerate(small_store.segments):
        print(f"Segment {i+1}: ({segment['cm']} cM)")
    
    # Visualize statistics
    plt.figure(figsize=(10, 6))
    
    # Bar chart of segments by chromosome
    chrom_counts = {}
    for segment in store.segments:
        chrom = segment["chromosome"]
        chrom_counts[chrom] = chrom_counts.get(chrom, 0) + 1
    
    # Sort chromosomes naturally (1, 2, ..., 22, X, Y)
    def chrom_sort_key(chrom):
        try:
            return int(chrom)
        except ValueError:
            return 99 if chrom == "X" else 100  # X before Y
    
    sorted_chroms = sorted(chrom_counts.keys(), key=chrom_sort_key)
    counts = [chrom_counts[chrom] for chrom in sorted_chroms]
    
    plt.bar(sorted_chroms, counts, color="#66b3ff")
    plt.title("IBD Segments by Chromosome")
    plt.xlabel("Chromosome")
    plt.ylabel("Number of Segments")
    plt.grid(axis="y", linestyle="--", alpha=0.7)
    
    plt.tight_layout()
    plt.show()
    
    # Print some of the log messages
    print("\nSelected log messages:")
    for msg in log_messages[:10]:  # Show just the first few messages
        print(msg)

# Run the tests
test_ibd_segment_store()

## Part 4: Graceful Degradation

Graceful degradation is all about continuing to function (potentially with reduced capability) even when parts of the system fail. Let's look at how this works in Bonsai v3:

In [ ]:
class RelationshipAnalyzer:
    """
    A class that analyzes genetic relationships with graceful degradation.
    
    This demonstrates how to implement graceful degradation when components fail.
    """
    def __init__(self):
        """Initialize the relationship analyzer."""
        self.logger = logging.getLogger("RelationshipAnalyzer")
        self.logger.setLevel(logging.INFO)
        
        # Add console handler if not already present
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
        
        # Initialize all analyzers
        self._init_analyzers()
    
    def _init_analyzers(self):
        """Initialize the individual analyzers."""
        # In a real system, these might be separate components that could fail
        self.analyzers = {
            "ibd": self._create_ibd_analyzer(),
            "age": self._create_age_analyzer(),
            "sex": self._create_sex_analyzer(),
            "ethnicity": self._create_ethnicity_analyzer()
        }
        
        # Track which analyzers are working
        self.available_analyzers = set(self.analyzers.keys())
        
        self.logger.info(f"Initialized {len(self.available_analyzers)} analyzers")
    
    def _create_ibd_analyzer(self):
        """Create the IBD analyzer component."""
        # This would initialize the IBD analysis component
        # For demonstration, we'll just return a function
        
        def analyze_ibd(data):
            """Analyze IBD data to infer relationships."""
            if not data.get("segments"):
                raise MissingDataError("IBD segments are missing")
            
            segments = data["segments"]
            total_cm = sum(segment.get("cm", 0) for segment in segments)
            
            # Simple relationship inference based on total cM
            if total_cm > 3000:
                return {"relationship": "parent-child", "confidence": 0.95}
            elif total_cm > 2000:
                return {"relationship": "full-sibling", "confidence": 0.9}
            elif total_cm > 1000:
                return {"relationship": "half-sibling/grandparent", "confidence": 0.8}
            elif total_cm > 500:
                return {"relationship": "1st cousin", "confidence": 0.7}
            elif total_cm > 250:
                return {"relationship": "2nd cousin", "confidence": 0.6}
            elif total_cm > 100:
                return {"relationship": "3rd cousin", "confidence": 0.5}
            elif total_cm > 50:
                return {"relationship": "4th cousin", "confidence": 0.4}
            elif total_cm > 20:
                return {"relationship": "distant relative", "confidence": 0.3}
            else:
                return {"relationship": "unrelated", "confidence": 0.2}
        
        return analyze_ibd
    
    def _create_age_analyzer(self):
        """Create the age analyzer component."""
        def analyze_age(data):
            """Analyze age data to constrain relationships."""
            if not data.get("ages"):
                raise MissingDataError("Age data is missing")
            
            ages = data["ages"]
            if len(ages) < 2:
                raise ValidationError("Need at least two ages for comparison")
            
            age1, age2 = ages[:2]
            age_diff = abs(age1 - age2)
            
            # Age-based relationship constraints
            if age_diff < 10:
                return {
                    "possible": ["sibling", "cousin", "unrelated"],
                    "unlikely": ["parent-child", "grandparent"],
                    "confidence": 0.7
                }
            elif 10 <= age_diff < 20:
                return {
                    "possible": ["half-sibling", "cousin", "unrelated", "parent-child"],
                    "unlikely": [],
                    "confidence": 0.6
                }
            elif 20 <= age_diff < 40:
                return {
                    "possible": ["parent-child", "aunt/uncle", "unrelated"],
                    "unlikely": ["sibling", "cousin"],
                    "confidence": 0.7
                }
            else:  # age_diff >= 40
                return {
                    "possible": ["grandparent", "unrelated"],
                    "unlikely": ["sibling", "cousin"],
                    "confidence": 0.8
                }
        
        return analyze_age
    
    def _create_sex_analyzer(self):
        """Create the sex analyzer component."""
        def analyze_sex(data):
            """Analyze sex data to constrain relationships."""
            if not data.get("sexes"):
                raise MissingDataError("Sex data is missing")
            
            sexes = data["sexes"]
            if len(sexes) < 2:
                raise ValidationError("Need at least two sexes for comparison")
            
            sex1, sex2 = sexes[:2]
            
            # In a real system, this would be more sophisticated
            if sex1 == sex2:
                return {
                    "possible": ["sibling", "cousin", "grandparent", "unrelated"],
                    "impossible": ["mother-son", "father-daughter"],
                    "confidence": 0.9
                }
            else:
                return {
                    "possible": ["all"],
                    "impossible": [],
                    "confidence": 0.5
                }
        
        return analyze_sex
    
    def _create_ethnicity_analyzer(self):
        """Create the ethnicity analyzer component."""
        def analyze_ethnicity(data):
            """Analyze ethnicity data to constrain relationships."""
            if not data.get("ethnicities"):
                raise MissingDataError("Ethnicity data is missing")
            
            ethnicities = data["ethnicities"]
            if len(ethnicities) < 2:
                raise ValidationError("Need at least two ethnicity profiles for comparison")
            
            # Simplified ethnicity comparison
            # In a real system, this would compare detailed admixture proportions
            ethn1, ethn2 = ethnicities[:2]
            
            # Calculate a similarity score (0-1)
            common_regions = set(ethn1.keys()).intersection(set(ethn2.keys()))
            if not common_regions:
                similarity = 0.0
            else:
                differences = sum(abs(ethn1.get(region, 0) - ethn2.get(region, 0)) 
                                 for region in common_regions)
                similarity = max(0, 1 - differences / len(common_regions))
            
            # Interpret the similarity
            if similarity > 0.8:
                return {
                    "assessment": "very similar ancestry",
                    "related_probability": 0.8,
                    "confidence": 0.7
                }
            elif similarity > 0.6:
                return {
                    "assessment": "similar ancestry",
                    "related_probability": 0.6,
                    "confidence": 0.6
                }
            elif similarity > 0.4:
                return {
                    "assessment": "somewhat similar ancestry",
                    "related_probability": 0.4,
                    "confidence": 0.5
                }
            else:
                return {
                    "assessment": "different ancestry",
                    "related_probability": 0.2,
                    "confidence": 0.6
                }
        
        return analyze_ethnicity
    
    def analyze(self, data):
        """
        Analyze genetic data to infer relationships with graceful degradation.
        
        Args:
            data: Dictionary with genetic data
            
        Returns:
            Dictionary with analysis results
        """
        if not data:
            raise MissingDataError("No data provided for analysis")
        
        # Track all analysis results and errors
        results = {}
        errors = {}
        
        # Try each analyzer and gracefully handle failures
        for name, analyzer in self.analyzers.items():
            if name not in self.available_analyzers:
                self.logger.debug(f"Skipping unavailable analyzer: {name}")
                continue
            
            try:
                self.logger.debug(f"Running {name} analyzer")
                results[name] = analyzer(data)
            except MissingDataError as e:
                self.logger.info(f"{name} analyzer skipped: {e}")
                errors[name] = {"error": "missing_data", "message": str(e)}
            except ValidationError as e:
                self.logger.info(f"{name} analyzer failed validation: {e}")
                errors[name] = {"error": "validation", "message": str(e)}
            except Exception as e:
                self.logger.warning(f"{name} analyzer failed unexpectedly: {e}")
                errors[name] = {"error": "unexpected", "message": str(e)}
                
                # Mark this analyzer as unavailable for future calls
                self.available_analyzers.remove(name)
        
        # If all analyzers failed, raise an error
        if not results and errors:
            raise ProcessingError(
                "All analyzers failed",
                details={"errors": errors}
            )
        
        # Try to combine results for a final assessment
        final_assessment = self._combine_results(results, errors)
        
        # Return the complete analysis
        return {
            "individual_results": results,
            "errors": errors,
            "final_assessment": final_assessment,
            "available_analyzers": list(self.available_analyzers),
            "status": "partial" if errors else "complete"
        }
    
    def _combine_results(self, results, errors):
        """
        Combine individual analyzer results into a final assessment.
        
        Args:
            results: Dictionary with individual analyzer results
            errors: Dictionary with analyzer errors
            
        Returns:
            Dictionary with combined assessment
        """
        # If no results available, return minimal information
        if not results:
            return {
                "relationship": "unknown",
                "confidence": 0.0,
                "reason": "No analyzers succeeded"
            }
        
        # Start with the IBD results if available, as they're most reliable
        if "ibd" in results:
            assessment = results["ibd"].copy()
            assessment["primary_evidence"] = "ibd"
            confidence = assessment.get("confidence", 0.5)
        else:
            # Without IBD, we have lower confidence
            assessment = {
                "relationship": "unknown",
                "confidence": 0.1,
                "primary_evidence": "none"
            }
            confidence = 0.1
        
        # Apply constraints from other analyzers
        if "age" in results:
            age_result = results["age"]
            relationship = assessment.get("relationship", "unknown")
            
            # If current relationship is in the "unlikely" list, reduce confidence
            if relationship in age_result.get("unlikely", []):
                assessment["confidence"] = max(0.1, confidence - 0.2)
                assessment["notes"] = assessment.get("notes", []) + [
                    f"Age difference makes {relationship} relationship unlikely"
                ]
            
            # If current relationship is not in the "possible" list, reduce confidence
            possible = age_result.get("possible", [])
            if possible != ["all"] and relationship not in possible and relationship != "unknown":
                assessment["confidence"] = max(0.1, confidence - 0.3)
                assessment["notes"] = assessment.get("notes", []) + [
                    f"Age difference is not typical for {relationship} relationship"
                ]
        
        if "sex" in results:
            sex_result = results["sex"]
            relationship = assessment.get("relationship", "unknown")
            
            # If current relationship is in the "impossible" list, reduce confidence substantially
            if relationship in sex_result.get("impossible", []):
                assessment["confidence"] = 0.1
                assessment["notes"] = assessment.get("notes", []) + [
                    f"Sex combination makes {relationship} relationship impossible"
                ]
                assessment["relationship"] = "unknown"
        
        if "ethnicity" in results:
            ethnicity_result = results["ethnicity"]
            
            # Use ethnicity similarity to adjust confidence
            related_prob = ethnicity_result.get("related_probability", 0.5)
            
            # Only adjust slightly based on ethnicity
            adjustment = (related_prob - 0.5) * 0.1  # Small adjustment
            new_confidence = min(1.0, max(0.1, assessment.get("confidence", 0.5) + adjustment))
            assessment["confidence"] = new_confidence
            
            assessment["notes"] = assessment.get("notes", []) + [
                f"Ethnicity assessment: {ethnicity_result.get('assessment', 'unknown')}"
            ]
        
        # Note which analyzers were missing
        if errors:
            assessment["missing_evidence"] = list(errors.keys())
        
        return assessment

# Test the RelationshipAnalyzer with graceful degradation
def test_relationship_analyzer():
    # Create the analyzer
    analyzer = RelationshipAnalyzer()
    
    # Test cases
    test_cases = [
        {
            "name": "Complete data",
            "data": {
                "segments": [
                    {"start_pos": 1000000, "end_pos": 5000000, "cm": 10.5, "chromosome": "1"},
                    {"start_pos": 10000000, "end_pos": 15000000, "cm": 15.2, "chromosome": "2"},
                    {"start_pos": 20000000, "end_pos": 25000000, "cm": 8.7, "chromosome": "3"}
                ],
                "ages": [30, 60],
                "sexes": ["XX", "XY"],
                "ethnicities": {
                    "person1": {"European": 0.8, "East Asian": 0.1, "African": 0.1},
                    "person2": {"European": 0.7, "East Asian": 0.2, "African": 0.1}
                }
            }
        },
        {
            "name": "Missing IBD data",
            "data": {
                "ages": [25, 55],
                "sexes": ["XX", "XX"],
                "ethnicities": {
                    "person1": {"European": 0.9, "African": 0.1},
                    "person2": {"European": 0.5, "African": 0.5}
                }
            }
        },
        {
            "name": "Only IBD data",
            "data": {
                "segments": [
                    {"start_pos": 1000000, "end_pos": 5000000, "cm": 100.5, "chromosome": "1"},
                    {"start_pos": 10000000, "end_pos": 15000000, "cm": 150.2, "chromosome": "2"},
                    {"start_pos": 20000000, "end_pos": 25000000, "cm": 80.7, "chromosome": "3"}
                ]
            }
        },
        {
            "name": "Invalid age data",
            "data": {
                "segments": [
                    {"start_pos": 1000000, "end_pos": 5000000, "cm": 20.5, "chromosome": "1"}
                ],
                "ages": [30],  # Only one age
                "sexes": ["XX", "XY"],
                "ethnicities": {
                    "person1": {"European": 0.8, "East Asian": 0.1, "African": 0.1},
                    "person2": {"European": 0.2, "East Asian": 0.7, "African": 0.1}
                }
            }
        },
        {
            "name": "Empty data",
            "data": {}
        }
    ]
    
    # Run the tests
    results = []
    for test_case in test_cases:
        print(f"\nRunning test: {test_case['name']}")
        try:
            result = analyzer.analyze(test_case["data"])
            print(f"Analysis completed with status: {result['status']}")
            
            # Show the final assessment
            assessment = result["final_assessment"]
            print(f"Relationship: {assessment.get('relationship', 'unknown')} "
                  f"(confidence: {assessment.get('confidence', 0):.2f})")
            
            if "notes" in assessment:
                print("Notes:")
                for note in assessment["notes"]:
                    print(f"- {note}")
            
            # Show errors
            if result["errors"]:
                print("Errors:")
                for analyzer_name, error in result["errors"].items():
                    print(f"- {analyzer_name}: {error['message']}")
            
            results.append({
                "test": test_case["name"],
                "status": "success",
                "result": result
            })
            
        except Exception as e:
            print(f"Analysis failed: {e}")
            results.append({
                "test": test_case["name"],
                "status": "failure",
                "error": str(e)
            })
    
    # Create a visualization of analyzer availability
    plt.figure(figsize=(10, 5))
    
    # Show which analyzers were available for each test
    analyzer_names = ["ibd", "age", "sex", "ethnicity"]
    test_names = [r["test"] for r in results if r["status"] == "success"]
    
    # Create a matrix of availability
    availability_matrix = []
    for test_name in test_names:
        test_result = next(r for r in results if r["test"] == test_name)
        avail = test_result["result"]["available_analyzers"]
        row = [1 if name in avail else 0 for name in analyzer_names]
        availability_matrix.append(row)
    
    # Create a heatmap
    if availability_matrix:
        fig, ax = plt.subplots(figsize=(10, 5))
        sns.heatmap(availability_matrix, annot=True, cmap="YlGnBu", 
                    xticklabels=analyzer_names, yticklabels=test_names,
                    cbar=False, linewidths=1, linecolor='white')
        plt.title("Analyzer Availability by Test Case")
        plt.tight_layout()
        plt.show()
    
    # Return the results for further analysis
    return results

# Run the tests
test_relationship_analyzer()

## Part 5: Logging and Debugging

Effective logging and debugging are essential for diagnosing and resolving issues in production. Let's look at how Bonsai v3 handles this:

In [ ]:
class BonsaiLogger:
    """
    A specialized logger for Bonsai operations.
    
    This demonstrates best practices for logging in genetic genealogy applications.
    """
    # Log levels
    DEBUG = logging.DEBUG
    INFO = logging.INFO
    WARNING = logging.WARNING
    ERROR = logging.ERROR
    CRITICAL = logging.CRITICAL
    
    def __init__(self, name, level=logging.INFO, log_file=None, console=True):
        """
        Initialize the logger.
        
        Args:
            name: Logger name (usually the module name)
            level: Logging level
            log_file: Optional path to log file
            console: Whether to log to console
        """
        self.logger = logging.getLogger(name)
        self.logger.setLevel(level)
        
        # Clear existing handlers to avoid duplicates
        self.logger.handlers = []
        
        # Create formatters
        console_fmt = logging.Formatter(
            '%(levelname)s - %(name)s - %(message)s'
        )
        
        file_fmt = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # Add console handler if requested
        if console:
            console_handler = logging.StreamHandler()
            console_handler.setFormatter(console_fmt)
            self.logger.addHandler(console_handler)
        
        # Add file handler if requested
        if log_file:
            try:
                file_handler = logging.FileHandler(log_file)
                file_handler.setFormatter(file_fmt)
                self.logger.addHandler(file_handler)
            except (IOError, PermissionError) as e:
                # Fallback: Log to console if file logging fails
                self.logger.warning(f"Failed to create log file {log_file}: {e}")
                self.logger.warning("Falling back to console logging only")
        
        # Capture the start time for performance tracking
        self.start_time = time.time()
        self.last_checkpoint = self.start_time
        
        # Event and error counters
        self.counters = {
            "warnings": 0,
            "errors": 0,
            "validations": 0,
            "calculations": 0
        }
    
    def debug(self, message, **kwargs):
        """Log a debug message with optional structured data."""
        self._log(logging.DEBUG, message, **kwargs)
    
    def info(self, message, **kwargs):
        """Log an info message with optional structured data."""
        self._log(logging.INFO, message, **kwargs)
    
    def warning(self, message, **kwargs):
        """Log a warning message with optional structured data."""
        self.counters["warnings"] += 1
        self._log(logging.WARNING, message, **kwargs)
    
    def error(self, message, **kwargs):
        """Log an error message with optional structured data."""
        self.counters["errors"] += 1
        self._log(logging.ERROR, message, **kwargs)
    
    def critical(self, message, **kwargs):
        """Log a critical message with optional structured data."""
        self.counters["errors"] += 1
        self._log(logging.CRITICAL, message, **kwargs)
    
    def _log(self, level, message, **kwargs):
        """Internal method to format and log messages with structured data."""
        # Add timestamp and structured data if provided
        if kwargs:
            # Format structured data for readability
            data_str = ", ".join(f"{k}={self._format_value(v)}" for k, v in kwargs.items())
            full_message = f"{message} [{data_str}]"
        else:
            full_message = message
        
        # Log the message
        self.logger.log(level, full_message)
    
    def _format_value(self, value):
        """Format a value for structured logging."""
        if isinstance(value, (int, float, bool, str)):
            return str(value)
        elif isinstance(value, (list, tuple)) and len(value) <= 3:
            return str(value)
        elif isinstance(value, (list, tuple)):
            return f"[{len(value)} items]"
        elif isinstance(value, dict) and len(value) <= 3:
            return str(value)
        elif isinstance(value, dict):
            return f"{{{len(value)} items}}"
        else:
            return f"{type(value).__name__}"
    
    def checkpoint(self, name):
        """
        Log a performance checkpoint.
        
        Args:
            name: Checkpoint name
            
        Returns:
            Elapsed time since the last checkpoint
        """
        now = time.time()
        elapsed = now - self.last_checkpoint
        total_elapsed = now - self.start_time
        
        self.info(
            f"Checkpoint: {name}",
            elapsed_seconds=f"{elapsed:.3f}",
            total_elapsed=f"{total_elapsed:.3f}"
        )
        
        self.last_checkpoint = now
        return elapsed
    
    def log_exception(self, e, context=None):
        """
        Log an exception with context.
        
        Args:
            e: The exception
            context: Optional context dictionary
        """
        tb = traceback.format_exc()
        context_dict = context or {}
        
        # Extract exception details
        exc_type = type(e).__name__
        exc_message = str(e)
        
        # Make a clean traceback for logging
        tb_lines = tb.split('\n')
        if len(tb_lines) > 10:
            # Truncate if too long
            tb_summary = '\n'.join(tb_lines[:3] + ["..."] + tb_lines[-5:])
        else:
            tb_summary = tb
        
        # Log the exception
        self.error(
            f"Exception: {exc_type}: {exc_message}",
            exception_type=exc_type,
            **context_dict
        )
        
        # Log the traceback at debug level
        self.debug(f"Traceback:\n{tb_summary}")
    
    def log_validation(self, success, entity_type, details=None):
        """
        Log a validation event.
        
        Args:
            success: Whether validation succeeded
            entity_type: Type of entity being validated
            details: Optional validation details
        """
        self.counters["validations"] += 1
        details_dict = details or {}
        
        if success:
            self.debug(
                f"Validation succeeded: {entity_type}",
                **details_dict
            )
        else:
            self.warning(
                f"Validation failed: {entity_type}",
                **details_dict
            )
    
    def log_calculation(self, calc_type, details=None, result=None):
        """
        Log a calculation event.
        
        Args:
            calc_type: Type of calculation
            details: Optional calculation details
            result: Optional calculation result
        """
        self.counters["calculations"] += 1
        details_dict = details or {}
        
        # Log differently depending on result
        if result is None:
            self.debug(
                f"Starting calculation: {calc_type}",
                **details_dict
            )
        else:
            # Include condensed result in the log
            result_dict = {"result": self._format_value(result)}
            self.debug(
                f"Completed calculation: {calc_type}",
                **{**details_dict, **result_dict}
            )
    
    def get_summary(self):
        """
        Get a summary of logging activity.
        
        Returns:
            Dict with summary information
        """
        return {
            "total_time": time.time() - self.start_time,
            "counters": self.counters.copy()
        }

# Example usage of the logger in a pedigree reconstruction task
def demo_logging():
    # Create a logger
    log_file = os.path.join(RESULTS_DIR, "bonsai_demo.log")
    logger = BonsaiLogger("BonsaiDemo", level=logging.DEBUG, log_file=log_file)
    
    logger.info("Starting pedigree reconstruction demo")
    
    try:
        # Simulate a multi-step process with logging
        logger.checkpoint("Initialization")
        
        # Step 1: Load data
        logger.info("Loading IBD data")
        try:
            # Simulate a data loading step
            time.sleep(0.2)  # Simulate work
            segments = [
                {"start_pos": 1000000, "end_pos": 5000000, "cm": 10.5, "chromosome": "1"},
                {"start_pos": 10000000, "end_pos": 15000000, "cm": 8.2, "chromosome": "X"},
                {"start_pos": 20000000, "end_pos": 15000000, "cm": 8.2, "chromosome": "2"}  # Invalid
            ]
            logger.info("Loaded IBD segments", count=len(segments))
        except Exception as e:
            logger.log_exception(e, context={"step": "data_loading"})
            # Simulate fallback data
            segments = []
            logger.warning("Using empty segment list as fallback")
        
        # Step 2: Validate data
        logger.checkpoint("Data validation")
        valid_segments = []
        
        for i, segment in enumerate(segments):
            try:
                if "chromosome" not in segment:
                    raise ValidationError("Missing chromosome", details={"segment_index": i})
                
                if segment.get("start_pos", 0) >= segment.get("end_pos", 0):
                    raise ValidationError(
                        "Invalid positions", 
                        details={
                            "segment_index": i,
                            "start": segment.get("start_pos"),
                            "end": segment.get("end_pos")
                        }
                    )
                
                # Log successful validation
                logger.log_validation(True, "segment", {
                    "index": i,
                    "chromosome": segment.get("chromosome"),
                    "cm": segment.get("cm")
                })
                
                valid_segments.append(segment)
                
            except ValidationError as e:
                # Log failed validation
                logger.log_validation(False, "segment", {
                    "index": i,
                    "error": str(e),
                    **e.details
                })
        
        logger.info("Validated segments", valid=len(valid_segments), invalid=len(segments)-len(valid_segments))
        
        # Step 3: Perform calculations
        logger.checkpoint("Relationship inference")
        
        # Simulate a calculation that might fail
        try:
            logger.log_calculation("total_cm", {"segments": len(valid_segments)})
            total_cm = sum(segment.get("cm", 0) for segment in valid_segments)
            logger.log_calculation("total_cm", {"segments": len(valid_segments)}, total_cm)
            
            # Deliberately cause an error
            if random.random() < 0.3:  # 30% chance of error
                missing_value = None
                result = total_cm / len(missing_value)  # Will raise TypeError
            
            relationship = "unknown"
            if total_cm > 3000:
                relationship = "parent-child"
            elif total_cm > 2000:
                relationship = "full-sibling"
            elif total_cm > 1000:
                relationship = "half-sibling/grandparent"
            elif total_cm > 500:
                relationship = "1st cousin"
            
            logger.info("Inferred relationship", relationship=relationship, total_cm=total_cm)
            
        except Exception as e:
            logger.log_exception(e, context={"step": "relationship_inference", "valid_segments": len(valid_segments)})
            logger.warning("Failed to infer relationship, using fallback")
            relationship = "unknown"
        
        # Step 4: Generate report
        logger.checkpoint("Report generation")
        
        report = {
            "relationship": relationship,
            "segments": len(valid_segments),
            "total_cm": total_cm if 'total_cm' in locals() else None,
            "status": "success" if relationship != "unknown" else "partial"
        }
        
        logger.info("Generated report", status=report["status"])
        
        # Wrap up
        summary = logger.get_summary()
        logger.info(
            "Processing complete",
            total_time=f"{summary['total_time']:.3f}s",
            warnings=summary['counters']['warnings'],
            errors=summary['counters']['errors']
        )
        
        return report, summary
        
    except Exception as e:
        # Catch-all for unexpected errors
        logger.log_exception(e, context={"phase": "overall_process"})
        logger.critical("Processing failed with unhandled exception")
        
        # Even if we fail, return a summary
        summary = logger.get_summary()
        logger.info(
            "Processing failed",
            total_time=f"{summary['total_time']:.3f}s",
            warnings=summary['counters']['warnings'],
            errors=summary['counters']['errors']
        )
        
        return {"status": "failed", "error": str(e)}, summary

# Run the logging demo
report, summary = demo_logging()

# Display the results
print("\nReport:")
for key, value in report.items():
    print(f"{key}: {value}")

print("\nLogging Summary:")
print(f"Total time: {summary['total_time']:.3f} seconds")
print("Event counts:")
for event, count in summary['counters'].items():
    print(f"- {event}: {count}")

# Display a visualization of the event counts
plt.figure(figsize=(10, 5))
events = list(summary['counters'].keys())
counts = list(summary['counters'].values())

# Define colors based on event type
colors = []
for event in events:
    if event == "errors":
        colors.append("#ff9999")  # Red
    elif event == "warnings":
        colors.append("#ffcc99")  # Orange
    else:
        colors.append("#66b3ff")  # Blue

# Create the bar chart
plt.bar(events, counts, color=colors)
plt.title("Event Counts During Processing")
plt.ylabel("Count")
plt.grid(axis="y", linestyle="--", alpha=0.7)

# Add count labels on top of bars
for i, v in enumerate(counts):
    plt.text(i, v + 0.1, str(v), ha="center")

plt.tight_layout()
plt.show()

# If we created a log file, show its content
if os.path.exists(os.path.join(RESULTS_DIR, "bonsai_demo.log")):
    with open(os.path.join(RESULTS_DIR, "bonsai_demo.log"), "r") as f:
        log_content = f.readlines()
    
    print("\nLog File Content (first 10 lines):")
    for line in log_content[:10]:
        print(line.strip())

## Summary

In this lab, we explored the error handling and data validation techniques used in Bonsai v3 to ensure robust and reliable operation, even when faced with imperfect data or unexpected situations:

1. **Custom Exception Hierarchy**: We implemented a specialized exception hierarchy that allows for more targeted error handling and clearer error messages.

2. **Input Validation**: We demonstrated the importance of thorough input validation, using both data classes with built-in validation and separate validator functions.

3. **Defensive Programming**: We showed how to use precondition and postcondition checks to catch problems early and ensure the correctness of our functions.

4. **Graceful Degradation**: We implemented a system that continues to function (at potentially reduced capability) even when parts of it fail, focusing on maximizing the value of available data.

5. **Logging and Debugging**: We built a comprehensive logging system that provides visibility into the operation of complex genetic genealogy applications, making it easier to diagnose and resolve issues.

These techniques are critical for building reliable genetic genealogy applications that can handle the complexities and uncertainties of real-world genetic data. By implementing robust error handling and validation, Bonsai v3 is able to provide more reliable results and a better user experience.

In the next lab, we'll explore pedigree rendering and visualization techniques used in Bonsai v3 to help users interpret and understand the results of pedigree reconstruction.