---
title: "System Tests"
execute:
  enabled: false
format:
  html:
    code-fold: false
    code-line-numbers: true
    code-copy: true
    highlight-style: github
    toc: true
---

System tests validate the complete hazelbean system, including smoke tests to verify basic functionality and end-to-end workflows.

## Overview

The system test suite covers:

- **Smoke Tests** - Quick validation that core features work
- **Workflow Tests** - Complete ProjectFlow workflows
- **Environment Tests** - Testing in different environments
- **Installation Tests** - Verifying correct installation

---

## Project Flow Workflows Tests

**Source File:** `hazelbean_tests/system/test_project_flow_workflows.py`

### test_complete_project_creation_to_cleanup_lifecycle()

Test full project lifecycle from creation to cleanup.

::: {.callout-note collapse="true"}
## Source code

```python
    def test_complete_project_creation_to_cleanup_lifecycle(self, temp_dir):
        """Test full project lifecycle from creation to cleanup."""
        lifecycle_log = []
        
        # Phase 1: Project Creation
        project_dir = os.path.join(temp_dir, 'test_project')
        p = hb.ProjectFlow(project_dir)
        
        lifecycle_log.append("project_created")
        
        # Validate project directories were created
        assert os.path.exists(p.project_dir)
        # Note: intermediate_dir is created during execution, not at project creation
        
        # Phase 2: Task Definition
        def data_preparation_task(p):
            """Prepare initial data files."""
            lifecycle_log.append("data_preparation_executed")
            
            # Create input data
            input_file = os.path.join(p.cur_dir, 'input_data.txt')
            with open(input_file, 'w') as f:
                f.write("Initial data for processing")
        
        def processing_iterator(p):
            """Process data in multiple scenarios."""
            lifecycle_log.append("processing_iterator_executed")
            
            # Set up processing scenarios
            p.iterator_replacements = {
                'process_type': ['clean', 'validate', 'transform'],
                'cur_dir_parent_dir': [
                    os.path.join(p.intermediate_dir, 'clean'),
                    os.path.join(p.intermediate_dir, 'validate'),
                    os.path.join(p.intermediate_dir, 'transform')
                ]
            }
            
            for dir_path in p.iterator_replacements['cur_dir_parent_dir']:
                os.makedirs(dir_path, exist_ok=True)
        
        def scenario_processing_task(p):
            """Process each scenario."""
            lifecycle_log.append(f"scenario_processing_executed_{p.process_type}")
            
            # Read input data
            input_file = os.path.join(p.data_preparation_task_dir, 'input_data.txt')
            if os.path.exists(input_file):
                with open(input_file, 'r') as f:
                    input_data = f.read()
                
                # Create scenario output
                output_file = os.path.join(p.cur_dir, f'{p.process_type}_result.txt')
                with open(output_file, 'w') as f:
                    f.write(f"{p.process_type.title()} processing: {input_data}")
            else:
                pytest.fail(f"Input data not found for {p.process_type} processing")
        
        def results_compilation_task(p):
            """Compile all processing results."""
            lifecycle_log.append("results_compilation_executed")
            
            # Collect results from all scenarios - files are in task subdirectories
            scenario_results = []
            for process_type in ['clean', 'validate', 'transform']:
                result_file = os.path.join(
                    p.intermediate_dir, process_type, 'scenario_processing_task', f'{process_type}_result.txt'
                )
                if os.path.exists(result_file):
                    with open(result_file, 'r') as f:
                        scenario_results.append(f.read())
            
            # Create final compilation
            final_output = os.path.join(p.cur_dir, 'final_results.txt')
            with open(final_output, 'w') as f:
                f.write("Compiled Results:\n")
                f.write("\n".join(scenario_results))
        
        # Build project workflow
        data_prep = p.add_task(data_preparation_task)
        processing = p.add_iterator(processing_iterator, run_in_parallel=False)
        scenario_task = p.add_task(scenario_processing_task, parent=processing)
        results = p.add_task(results_compilation_task)
        
        lifecycle_log.append("tasks_defined")
        
        # Phase 3: Execution
        p.execute()
        lifecycle_log.append("execution_completed")
        
        # Phase 4: Validation
        assert len(lifecycle_log) == 9  # 1 created + 1 defined + 1 data_prep + 1 iterator + 3 scenarios + 1 compilation + 1 execution_completed
        
        # Validate execution order
        expected_order = [
            "project_created",
            "tasks_defined", 
            "data_preparation_executed",
            "processing_iterator_executed",
            "scenario_processing_executed_clean",
            "scenario_processing_executed_validate", 
            "scenario_processing_executed_transform",
            "results_compilation_executed",
            "execution_completed"
        ]
        
        for i, expected in enumerate(expected_order):
            assert lifecycle_log[i].startswith(expected) or lifecycle_log[i] == expected
        
        # Validate file outputs
        input_file = os.path.join(p.data_preparation_task_dir, 'input_data.txt')
        final_results = os.path.join(p.results_compilation_task_dir, 'final_results.txt')
        
        assert os.path.exists(input_file)
        assert os.path.exists(final_results)
        
        # Validate final results content
        with open(final_results, 'r') as f:
            content = f.read()
            assert "Clean processing: Initial data" in content
            assert "Validate processing: Initial data" in content
            assert "Transform processing: Initial data" in content
        
        # Phase 5: Cleanup (handled by temp_dir fixture)
        lifecycle_log.append("cleanup_ready")

```
:::

---

### test_multi_stage_project_workflow_with_dependencies()

Test complex multi-stage project with task dependencies.

::: {.callout-note collapse="true"}
## Source code

```python
    def test_multi_stage_project_workflow_with_dependencies(self, temp_dir):
        """Test complex multi-stage project with task dependencies."""
        stage_tracking = {'current_stage': 0, 'completed_stages': []}
        
        project_dir = os.path.join(temp_dir, 'multi_stage_project')
        p = hb.ProjectFlow(project_dir)
        
        def stage_1_setup(p):
            """Stage 1: Initial setup and configuration."""
            stage_tracking['current_stage'] = 1
            stage_tracking['completed_stages'].append(1)
            
            config_file = os.path.join(p.cur_dir, 'project_config.txt')
            with open(config_file, 'w') as f:
                f.write("project_version=1.0\nprocessing_mode=batch\noutput_format=csv")
        
        def stage_2_data_collection_iterator(p):
            """Stage 2: Collect data from multiple sources."""
            stage_tracking['current_stage'] = 2
            stage_tracking['completed_stages'].append(2)
            
            # Verify stage 1 completed
            config_file = os.path.join(p.stage_1_setup_dir, 'project_config.txt')
            assert os.path.exists(config_file), "Stage 1 dependency not met"
            
            # Set up data collection scenarios
            p.iterator_replacements = {
                'data_source': ['database', 'api', 'files'],
                'cur_dir_parent_dir': [
                    os.path.join(p.cur_dir, 'database_collection'),
                    os.path.join(p.cur_dir, 'api_collection'),
                    os.path.join(p.cur_dir, 'files_collection')
                ]
            }
            
            for dir_path in p.iterator_replacements['cur_dir_parent_dir']:
                os.makedirs(dir_path, exist_ok=True)
        
        def data_collection_task(p):
            """Collect data from specific source."""
            collected_file = os.path.join(p.cur_dir, f'{p.data_source}_data.txt')
            with open(collected_file, 'w') as f:
                f.write(f"Data collected from {p.data_source} source")
        
        def stage_3_processing(p):
            """Stage 3: Process all collected data."""
            stage_tracking['current_stage'] = 3
            stage_tracking['completed_stages'].append(3)
            
            # Verify stage 2 dependencies - files are in task subdirectories
            required_files = [
                'database_collection/data_collection_task/database_data.txt',
                'api_collection/data_collection_task/api_data.txt', 
                'files_collection/data_collection_task/files_data.txt'
            ]
            
            stage_2_dir = p.stage_2_data_collection_iterator_dir
            for req_file in required_files:
                full_path = os.path.join(stage_2_dir, req_file)
                assert os.path.exists(full_path), f"Stage 2 dependency missing: {req_file}"
            
            # Process all data
            processed_data = []
            for req_file in required_files:
                full_path = os.path.join(stage_2_dir, req_file)
                with open(full_path, 'r') as f:
                    processed_data.append(f.read())
            
            # Create processed output
            output_file = os.path.join(p.cur_dir, 'processed_data.txt')
            with open(output_file, 'w') as f:
                f.write("Processed Data:\n")
                f.write("\n".join(processed_data))
        
        def stage_4_final_output(p):
            """Stage 4: Generate final project outputs."""
            stage_tracking['current_stage'] = 4
            stage_tracking['completed_stages'].append(4)
            
            # Verify all previous stage dependencies
            dependencies = [
                (p.stage_1_setup_dir, 'project_config.txt'),
                (p.stage_3_processing_dir, 'processed_data.txt')
            ]
            
            for dep_dir, dep_file in dependencies:
                dep_path = os.path.join(dep_dir, dep_file)
                assert os.path.exists(dep_path), f"Dependency missing: {dep_path}"
            
            # Generate final output
            final_output = os.path.join(p.cur_dir, 'project_final_output.txt')
            with open(final_output, 'w') as f:
                f.write("Project completed successfully\n")
                f.write(f"Completed stages: {stage_tracking['completed_stages']}\n")
                f.write(f"Final stage: {stage_tracking['current_stage']}")
        
        # Build multi-stage workflow
        stage_1 = p.add_task(stage_1_setup)
        stage_2_iterator = p.add_iterator(stage_2_data_collection_iterator, run_in_parallel=False)
        collection_task = p.add_task(data_collection_task, parent=stage_2_iterator)
        stage_3 = p.add_task(stage_3_processing)
        stage_4 = p.add_task(stage_4_final_output)
        
        # Execute complete workflow
        p.execute()
        
        # Validate workflow completion
        assert stage_tracking['current_stage'] == 4
        assert stage_tracking['completed_stages'] == [1, 2, 3, 4]
        
        # Validate final output exists and is correct
        final_output_path = os.path.join(p.stage_4_final_output_dir, 'project_final_output.txt')
        assert os.path.exists(final_output_path)
        
        with open(final_output_path, 'r') as f:
            content = f.read()
            assert "Project completed successfully" in content
            assert "[1, 2, 3, 4]" in content
            assert "Final stage: 4" in content

```
:::

---

### test_error_handling_in_complete_project_workflow()

Test error handling and recovery in complete project workflows.

::: {.callout-note collapse="true"}
## Source code

```python
    def test_error_handling_in_complete_project_workflow(self, temp_dir):
        """Test error handling and recovery in complete project workflows."""
        error_tracking = {'errors_encountered': [], 'recovery_successful': False}
        
        project_dir = os.path.join(temp_dir, 'error_handling_project')
        p = hb.ProjectFlow(project_dir)
        
        def reliable_setup_task(p):
            """Task that always succeeds."""
            setup_file = os.path.join(p.cur_dir, 'setup_success.txt')
            with open(setup_file, 'w') as f:
                f.write("Setup completed successfully")
        
        def error_prone_iterator(p):
            """Iterator that may encounter issues but handles them gracefully."""
            try:
                p.iterator_replacements = {
                    'scenario': ['success', 'warning', 'recoverable'],
                    'cur_dir_parent_dir': [
                        os.path.join(p.cur_dir, 'success_scenario'),
                        os.path.join(p.cur_dir, 'warning_scenario'),
                        os.path.join(p.cur_dir, 'recoverable_scenario')
                    ]
                }
                
                for dir_path in p.iterator_replacements['cur_dir_parent_dir']:
                    os.makedirs(dir_path, exist_ok=True)
                    
            except Exception as e:
                error_tracking['errors_encountered'].append(f"Iterator setup error: {e}")
                # Recovery: set minimal scenarios
                p.iterator_replacements = {
                    'scenario': ['recovery'],
                    'cur_dir_parent_dir': [os.path.join(p.cur_dir, 'recovery_scenario')]
                }
                os.makedirs(p.iterator_replacements['cur_dir_parent_dir'][0], exist_ok=True)
        
        def scenario_handling_task(p):
            """Task that handles different scenario types."""
            scenario_type = p.scenario
            
            if scenario_type == 'success':
                # Normal processing
                output_file = os.path.join(p.cur_dir, 'success_result.txt')
                with open(output_file, 'w') as f:
                    f.write("Successfully processed")
                    
            elif scenario_type == 'warning':
                # Processing with warnings
                error_tracking['errors_encountered'].append(f"Warning in {scenario_type}")
                output_file = os.path.join(p.cur_dir, 'warning_result.txt')
                with open(output_file, 'w') as f:
                    f.write("Processed with warnings")
                    
            elif scenario_type == 'recoverable':
                # Recoverable error handling
                try:
                    # Simulate recoverable error
                    if not os.path.exists('/nonexistent/path'):
                        raise FileNotFoundError("Simulated recoverable error")
                except FileNotFoundError as e:
                    error_tracking['errors_encountered'].append(f"Recoverable error: {e}")
                    # Recovery action
                    output_file = os.path.join(p.cur_dir, 'recovered_result.txt')
                    with open(output_file, 'w') as f:
                        f.write("Recovered from error")
                        
            elif scenario_type == 'recovery':
                # Recovery scenario
                error_tracking['recovery_successful'] = True
                output_file = os.path.join(p.cur_dir, 'recovery_result.txt')
                with open(output_file, 'w') as f:
                    f.write("Recovery scenario executed")
        
        def cleanup_and_summary_task(p):
            """Final task that summarizes error handling."""
            summary_file = os.path.join(p.cur_dir, 'error_summary.txt')
            with open(summary_file, 'w') as f:
                f.write(f"Errors encountered: {len(error_tracking['errors_encountered'])}\n")
                f.write(f"Recovery successful: {error_tracking['recovery_successful']}\n")
                f.write("Error details:\n")
                for error in error_tracking['errors_encountered']:
                    f.write(f"- {error}\n")
        
        # Build error-handling workflow
        setup = p.add_task(reliable_setup_task)
        error_iterator = p.add_iterator(error_prone_iterator, run_in_parallel=False)
        scenario_task = p.add_task(scenario_handling_task, parent=error_iterator)
        cleanup = p.add_task(cleanup_and_summary_task)
        
        # Execute workflow (should complete despite errors)
        p.execute()
        
        # Validate error handling
        assert len(error_tracking['errors_encountered']) > 0  # Should have encountered some issues
        
        # Validate workflow completed
        summary_file = os.path.join(p.cleanup_and_summary_task_dir, 'error_summary.txt')
        assert os.path.exists(summary_file)
        
        # Validate error summary
        with open(summary_file, 'r') as f:
            content = f.read()
            assert "Errors encountered:" in content
            assert "Recovery successful:" in content

```
:::

---

### test_file_lifecycle_operations_workflow()

Test complete file lifecycle operations in project workflow.

::: {.callout-note collapse="true"}
## Source code

```python
    def test_file_lifecycle_operations_workflow(self, temp_dir):
        """Test complete file lifecycle operations in project workflow."""
        file_operations = []
        
        project_dir = os.path.join(temp_dir, 'file_lifecycle_project')
        p = hb.ProjectFlow(project_dir)
        
        def file_creation_task(p):
            """Create various types of files."""
            file_operations.append("file_creation_started")
            
            # Create different file types
            files_to_create = [
                ('data.txt', 'Sample text data'),
                ('config.json', '{"setting": "value", "enabled": true}'),
                ('results.csv', 'name,value,status\ntest,100,pass')
            ]
            
            for filename, content in files_to_create:
                file_path = os.path.join(p.cur_dir, filename)
                with open(file_path, 'w') as f:
                    f.write(content)
                file_operations.append(f"created_{filename}")
                assert os.path.exists(file_path)
        
        def file_modification_iterator(p):
            """Modify files in different ways."""
            file_operations.append("file_modification_started")
            
            p.iterator_replacements = {
                'modification_type': ['append', 'update', 'backup'],
                'cur_dir_parent_dir': [
                    os.path.join(p.cur_dir, 'append_mod'),
                    os.path.join(p.cur_dir, 'update_mod'),
                    os.path.join(p.cur_dir, 'backup_mod')
                ]
            }
            
            for dir_path in p.iterator_replacements['cur_dir_parent_dir']:
                os.makedirs(dir_path, exist_ok=True)
        
        def modification_task(p):
            """Apply specific modification type."""
            file_operations.append(f"modification_{p.modification_type}_started")
            
            source_dir = p.file_creation_task_dir
            
            if p.modification_type == 'append':
                # Append to existing files
                source_file = os.path.join(source_dir, 'data.txt')
                if os.path.exists(source_file):
                    target_file = os.path.join(p.cur_dir, 'appended_data.txt')
                    # Copy original and append
                    shutil.copy2(source_file, target_file)
                    with open(target_file, 'a') as f:
                        f.write("\nAppended content")
                    file_operations.append("append_completed")
                    
            elif p.modification_type == 'update':
                # Update file contents
                source_file = os.path.join(source_dir, 'config.json')
                if os.path.exists(source_file):
                    target_file = os.path.join(p.cur_dir, 'updated_config.json')
                    with open(target_file, 'w') as f:
                        f.write('{"setting": "updated_value", "enabled": false, "version": 2}')
                    file_operations.append("update_completed")
                    
            elif p.modification_type == 'backup':
                # Create backup copies
                source_files = ['data.txt', 'config.json', 'results.csv']
                for filename in source_files:
                    source_path = os.path.join(source_dir, filename)
                    if os.path.exists(source_path):
                        backup_name = f'backup_{filename}'
                        backup_path = os.path.join(p.cur_dir, backup_name)
                        shutil.copy2(source_path, backup_path)
                        file_operations.append(f"backed_up_{filename}")
        
        def file_validation_task(p):
            """Validate all file operations completed correctly."""
            file_operations.append("validation_started")
            
            validation_results = []
            
            # Check original files
            original_files = ['data.txt', 'config.json', 'results.csv']
            for filename in original_files:
                file_path = os.path.join(p.file_creation_task_dir, filename)
                if os.path.exists(file_path):
                    validation_results.append(f"original_{filename}_exists")
            
            # Check modified files
            append_file = os.path.join(p.file_modification_iterator_dir, 'append_mod', 'appended_data.txt')
            update_file = os.path.join(p.file_modification_iterator_dir, 'update_mod', 'updated_config.json')
            
            if os.path.exists(append_file):
                validation_results.append("append_file_exists")
            if os.path.exists(update_file):
                validation_results.append("update_file_exists")
            
            # Check backup files
            backup_dir = os.path.join(p.file_modification_iterator_dir, 'backup_mod')
            backup_files = ['backup_data.txt', 'backup_config.json', 'backup_results.csv']
            for backup_file in backup_files:
                backup_path = os.path.join(backup_dir, backup_file)
                if os.path.exists(backup_path):
                    validation_results.append(f"{backup_file}_exists")
            
            # Write validation report
            report_file = os.path.join(p.cur_dir, 'validation_report.txt')
            with open(report_file, 'w') as f:
                f.write("File Lifecycle Validation Report\n")
                f.write(f"Operations performed: {len(file_operations)}\n")
                f.write(f"Validations passed: {len(validation_results)}\n")
                f.write("\nOperation details:\n")
                for op in file_operations:
                    f.write(f"- {op}\n")
                f.write("\nValidation results:\n")
                for result in validation_results:
                    f.write(f"- {result}\n")
        
        # Build file operations workflow
        creation = p.add_task(file_creation_task)
        modification_iter = p.add_iterator(file_modification_iterator, run_in_parallel=False)
        modification = p.add_task(modification_task, parent=modification_iter)
        validation = p.add_task(file_validation_task)
        
        # Execute file operations workflow
        p.execute()
        
        # Validate workflow completion
        assert len(file_operations) >= 10  # Should have many file operations
        
        # Validate final report exists
        report_file = os.path.join(p.file_validation_task_dir, 'validation_report.txt')
        assert os.path.exists(report_file)
        
        # Validate report content
        with open(report_file, 'r') as f:
            content = f.read()
            assert "File Lifecycle Validation Report" in content
            assert "Operations performed:" in content
            assert "Validations passed:" in content

```
:::

---

## Smoke Tests

**Source File:** `hazelbean_tests/system/test_smoke.py`

### test_hazelbean_imports_successfully()

Test that hazelbean can be imported without errors

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_hazelbean_imports_successfully(self):
        """Test that hazelbean can be imported without errors"""
        # This is already handled by the import above, but let's be explicit
        import hazelbean as hb
        assert hb is not None
        assert hasattr(hb, "ProjectFlow")

```
:::

---

### test_projectflow_imports()

Test that ProjectFlow is available and can be imported

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_projectflow_imports(self):
        """Test that ProjectFlow is available and can be imported"""
        assert hasattr(hb, "ProjectFlow")
        
        # Test that we can instantiate ProjectFlow
        with tempfile.TemporaryDirectory() as temp_dir:
            p = hb.ProjectFlow(temp_dir)
            assert p is not None

```
:::

---

### test_hazelbean_import_performance()

Benchmark the import time of hazelbean module.

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    @pytest.mark.benchmark
    def test_hazelbean_import_performance(self, benchmark):
        """Benchmark the import time of hazelbean module."""
        def import_hazelbean():
            import hazelbean as hb
            return hb
        
        result = benchmark(import_hazelbean)
        assert hasattr(result, "ProjectFlow")

```
:::

---

### test_projectflow_basic_functionality()

Test basic ProjectFlow functionality works

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_projectflow_basic_functionality(self):
        """Test basic ProjectFlow functionality works"""
        with tempfile.TemporaryDirectory() as temp_dir:
            p = hb.ProjectFlow(temp_dir)
            
            # Test basic get_path functionality with an existing file
            test_file = os.path.join(temp_dir, "test_file.txt")
            with open(test_file, 'w') as f:
                f.write("test content")
            
            path = p.get_path("test_file.txt")
            assert path is not None
            assert "test_file.txt" in path
            assert os.path.exists(path)

```
:::

---

### test_common_hazelbean_functions_available()

Test that common hazelbean functions are available

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_common_hazelbean_functions_available(self):
        """Test that common hazelbean functions are available"""
        # Test that key functions are available
        assert hasattr(hb, "temp")
        assert hasattr(hb, "get_path") 
        assert hasattr(hb, "describe")
        assert hasattr(hb, "save_array_as_npy")
        
        # Test that we can call temp function
        temp_path = hb.temp('.txt', remove_at_exit=True)
        assert temp_path is not None
        assert temp_path.endswith('.txt')

```
:::

---

### test_numpy_integration()

Test basic numpy integration with hazelbean

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_numpy_integration(self):
        """Test basic numpy integration with hazelbean"""
        import numpy as np
        
        # Create test array
        test_array = np.random.rand(10, 10)
        
        # Test saving with hazelbean
        temp_path = hb.temp('.npy', remove_at_exit=True)
        hb.save_array_as_npy(test_array, temp_path)
        
        # Verify file was created
        assert os.path.exists(temp_path)
        
        # Test describe function
        result = hb.describe(temp_path, surpress_print=True, surpress_logger=True)
        assert result is not None

```
:::

---

### test_basic_error_handling()

Test that get_path raises NameError for unresolvable paths.

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_basic_error_handling(self):
        """Test that get_path raises NameError for unresolvable paths.
        
        Note: get_path() intentionally raises NameError (not FileNotFoundError) because
        it performs complex path resolution logic beyond simple file existence checking:
        - Resolves paths relative to project structure
        - Searches multiple possible_dirs
        - Attempts cloud bucket downloads
        
        NameError semantically indicates "name/reference resolution failed" which is
        more accurate than "file at specific path not found".
        See docs/plans/exception-handling-analysis.md for detailed rationale.
        """
        with tempfile.TemporaryDirectory() as temp_dir:
            p = hb.ProjectFlow(temp_dir)
            
            # get_path should raise NameError when path cannot be resolved
            with pytest.raises(NameError) as exc_info:
                path = p.get_path("definitely_does_not_exist.txt")
            
            # Verify error message provides useful context
            error_msg = str(exc_info.value)
            assert "does not exist" in error_msg.lower()

```
:::

---

### test_get_path_generates_doc()

Smoke-test + write example QMD.

::: {.callout-note collapse="true"}
## Source code

```python
    def test_get_path_generates_doc(self, tmp_path):
        """Smoke-test + write example QMD."""
        # ---------- Test behaviour -------------------------------------------
        p = hb.ProjectFlow(project_dir=str(tmp_path))     # cast Path → str
        
        # Create the file so get_path can find it
        test_file = tmp_path / "foo.txt"
        test_file.write_text("test content")
        
        resolved = p.get_path("foo.txt")

        assert resolved.endswith("foo.txt")               # file name correct
        assert str(tmp_path) in resolved                  # lives in project dir

        # ---------- Generate documentation -----------------------------------
        qmd = DOCS_DIR / "get_path_example.qmd"

        qmd.write_text(textwrap.dedent(f"""
        ---
        title: "Hazelbean example – get_path"
        execute: true          # run the chunk when rendering
        freeze: auto
        ---

        ```{{python}}
        import hazelbean as hb, tempfile, os
        with tempfile.TemporaryDirectory() as tmp:
            p = hb.ProjectFlow(project_dir=tmp)
            print(p.get_path("foo.txt"))
        ```

        Above we create a throw-away project directory and ask Hazelbean for
        `"foo.txt"`.  The printed path shows how *get_path* resolves files relative
        to the project workspace.
        """))
        
        # Verify documentation was generated
        assert qmd.exists()
        assert qmd.stat().st_size > 0  # File has content
        
        # Verify content contains expected elements
        content = qmd.read_text()
        assert "get_path" in content
        assert "hazelbean" in content
        assert "ProjectFlow" in content

```
:::

---

### test_error_handling_documentation()

Test documentation generation for error handling scenarios

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_error_handling_documentation(self):
        """Test documentation generation for error handling scenarios"""
        qmd = DOCS_DIR / "get_path_error_handling.qmd"

        qmd.write_text(textwrap.dedent("""
        ---
        title: "Hazelbean – get_path Error Handling"
        execute: true
        freeze: auto
        ---

        ## Error Handling Examples

In [None]:
import hazelbean as hb, tempfile

# Test with non-existent file
with tempfile.TemporaryDirectory() as tmp:
    p = hb.ProjectFlow(project_dir=tmp)
    
    # This should work even if file doesn't exist
    try:
        path = p.get_path("missing_file.txt")
        print(f"Resolved path: {path}")
        print("get_path handles missing files gracefully")
    except Exception as e:
        print(f"Error occurred: {e}")

        The above demonstrates how `get_path` handles missing files.
        """))
        
        # Verify documentation was generated
        assert qmd.exists()
        assert qmd.stat().st_size > 0

```
:::

---

### test_performance_documentation()

Test documentation generation for performance examples

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_performance_documentation(self):
        """Test documentation generation for performance examples"""
        qmd = DOCS_DIR / "get_path_performance_guide.qmd"

        qmd.write_text(textwrap.dedent("""
        ---
        title: "Hazelbean – get_path Performance Guide"
        execute: true
        freeze: auto
        ---

        ## Performance Characteristics

In [None]:
import hazelbean as hb, tempfile, time

with tempfile.TemporaryDirectory() as tmp:
    p = hb.ProjectFlow(project_dir=tmp)
    
    # Create a test file
    test_file = "performance_test.txt"
    with open(f"{tmp}/{test_file}", 'w') as f:
        f.write("test content")
    
    # Benchmark single call
    start = time.time()
    path = p.get_path(test_file)
    single_call_time = time.time() - start
    
    print(f"Single get_path call: {single_call_time:.6f} seconds")
    print(f"Resolved path: {path}")
    
    # Benchmark multiple calls
    start = time.time()
    for i in range(100):
        path = p.get_path(test_file)
    multiple_calls_time = time.time() - start
    
    print(f"100 get_path calls: {multiple_calls_time:.6f} seconds")
    print(f"Average per call: {multiple_calls_time/100:.6f} seconds")

        This shows typical performance characteristics of the `get_path` method.
        """))
        
        # Verify documentation was generated
        assert qmd.exists()
        assert qmd.stat().st_size > 0

```
:::

---

### test_file_formats_documentation()

Test documentation generation for different file formats

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_file_formats_documentation(self):
        """Test documentation generation for different file formats"""
        qmd = DOCS_DIR / "get_path_file_formats.qmd"

        qmd.write_text(textwrap.dedent("""
        ---
        title: "Hazelbean – get_path File Format Support"
        execute: true
        freeze: auto
        ---

        ## Supported File Formats

In [None]:
import hazelbean as hb, tempfile, os

with tempfile.TemporaryDirectory() as tmp:
    p = hb.ProjectFlow(project_dir=tmp)
    
    # Test different file extensions
    file_types = [
        "data.csv",
        "raster.tif", 
        "vector.shp",
        "config.json",
        "script.py",
        "document.txt"
    ]
    
    print("Testing file format resolution:")
    for file_type in file_types:
        path = p.get_path(file_type)
        print(f"  {file_type} -> {os.path.basename(path)}")

        The `get_path` method works with various file formats commonly used
        in geospatial and scientific computing workflows.
        """))
        
        # Verify documentation was generated
        assert qmd.exists()
        assert qmd.stat().st_size > 0

```
:::

---

### test_temp_directory_creation()

Test that temporary directory creation works

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_temp_directory_creation(self):
        """Test that temporary directory creation works"""
        with tempfile.TemporaryDirectory() as temp_dir:
            p = hb.ProjectFlow(temp_dir)
            
            # Should be able to create subdirectories
            sub_path = os.path.join(temp_dir, "subdir", "nested")
            os.makedirs(sub_path, exist_ok=True)
            
            assert os.path.exists(sub_path)
            
            # get_path should construct paths without validation when raise_error_if_fail=False
            nested_file_path = p.get_path("subdir/nested/test.txt", raise_error_if_fail=False)
            assert "nested" in nested_file_path
            assert "test.txt" in nested_file_path

```
:::

---

### test_multiple_projectflow_instances()

Test that multiple ProjectFlow instances can coexist

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_multiple_projectflow_instances(self):
        """Test that multiple ProjectFlow instances can coexist"""
        with tempfile.TemporaryDirectory() as temp_dir1:
            with tempfile.TemporaryDirectory() as temp_dir2:
                p1 = hb.ProjectFlow(temp_dir1)
                p2 = hb.ProjectFlow(temp_dir2)
                
                # Each should resolve to its own directory (without validation)
                path1 = p1.get_path("test.txt", raise_error_if_fail=False)
                path2 = p2.get_path("test.txt", raise_error_if_fail=False)
                
                assert temp_dir1 in path1
                assert temp_dir2 in path2
                assert path1 != path2

```
:::

---

### test_relative_vs_absolute_paths()

Test handling of relative vs absolute paths with actual files

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_relative_vs_absolute_paths(self):
        """Test handling of relative vs absolute paths with actual files"""
        with tempfile.TemporaryDirectory() as temp_dir:
            p = hb.ProjectFlow(temp_dir)
            
            # Create test file for relative path testing
            rel_file = os.path.join(temp_dir, "relative_file.txt")
            with open(rel_file, 'w') as f:
                f.write("relative test content")
            
            # Test relative path
            rel_path = p.get_path("relative_file.txt")
            assert "relative_file.txt" in rel_path
            assert os.path.exists(rel_path)
            
            # Create test file for absolute path testing
            abs_file = os.path.join(temp_dir, "absolute_file.txt")
            with open(abs_file, 'w') as f:
                f.write("absolute test content")
            
            # Test absolute path
            abs_input = os.path.abspath(abs_file)
            abs_path = p.get_path(abs_input)
            assert "absolute_file.txt" in abs_path
            assert os.path.exists(abs_path)

```
:::

---

### test_special_characters_in_paths()

Test handling of special characters in file paths with actual files

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_special_characters_in_paths(self):
        """Test handling of special characters in file paths with actual files"""
        with tempfile.TemporaryDirectory() as temp_dir:
            p = hb.ProjectFlow(temp_dir)
            
            # Test various special characters (that are valid in file names)
            special_files = [
                "file_with_underscores.txt",
                "file-with-hyphens.txt",
                "file.with.dots.txt",
                "file with spaces.txt",  # May not be supported on all systems
                "file123numbers.txt",
                "UPPERCASE.TXT",
                "mixedCase.TxT"
            ]
            
            for special_file in special_files:
                # Create file with special characters
                file_path = os.path.join(temp_dir, special_file)
                try:
                    with open(file_path, 'w') as f:
                        f.write(f"test content for {special_file}")
                    
                    # Test get_path with actual file
                    path = p.get_path(special_file)
                    assert special_file in path or os.path.basename(path) == special_file
                    assert os.path.exists(path)
                except Exception as e:
                    # Some special characters might not be supported on some systems
                    # NameError indicates path resolution failure (hazelbean's design)
                    assert isinstance(e, (ValueError, OSError, NameError))

```
:::

---

### test_concurrent_access()

Test basic concurrent access patterns with actual files

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_concurrent_access(self):
        """Test basic concurrent access patterns with actual files"""
        import threading
        import time
        
        with tempfile.TemporaryDirectory() as temp_dir:
            p = hb.ProjectFlow(temp_dir)
            
            results = []
            errors = []
            
            # Pre-create all files before threads start (avoid race conditions)
            for thread_id in range(3):
                for i in range(10):
                    file_path = os.path.join(temp_dir, f"thread_{thread_id}_file_{i}.txt")
                    with open(file_path, 'w') as f:
                        f.write(f"Thread {thread_id} file {i}")
            
            def worker_thread(thread_id):
                try:
                    for i in range(10):
                        path = p.get_path(f"thread_{thread_id}_file_{i}.txt")
                        results.append((thread_id, path))
                        time.sleep(0.001)  # Small delay
                except Exception as e:
                    errors.append((thread_id, e))
            
            # Create and start multiple threads
            threads = []
            for i in range(3):
                t = threading.Thread(target=worker_thread, args=(i,))
                threads.append(t)
                t.start()
            
            # Wait for all threads to complete
            for t in threads:
                t.join()
            
            # Verify results
            assert len(errors) == 0, f"Errors in concurrent access: {errors}"
            assert len(results) == 30, f"Expected 30 results, got {len(results)}"
            
            # Verify all paths exist and are unique per thread
            thread_paths = {}
            for thread_id, path in results:
                assert os.path.exists(path), f"Path {path} doesn't exist"
                if thread_id not in thread_paths:
                    thread_paths[thread_id] = []
                thread_paths[thread_id].append(path)
            
            assert len(thread_paths) == 3, "Should have results from all 3 threads"

```
:::

---

### test_python_version_compatibility()

Test Python version compatibility with actual file

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_python_version_compatibility(self):
        """Test Python version compatibility with actual file"""
        import sys
        
        # Should work with Python 3.7+
        assert sys.version_info >= (3, 7), f"Python version {sys.version_info} may not be supported"
        
        # Test that hazelbean works with current Python version
        with tempfile.TemporaryDirectory() as temp_dir:
            p = hb.ProjectFlow(temp_dir)
            
            # Create test file
            test_file = os.path.join(temp_dir, "test.txt")
            with open(test_file, 'w') as f:
                f.write("Python version compatibility test")
            
            # Test basic get_path functionality
            path = p.get_path("test.txt")
            assert path is not None
            assert os.path.exists(path)

```
:::

---

### test_required_dependencies_available()

Test that required dependencies are available

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_required_dependencies_available(self):
        """Test that required dependencies are available"""
        # Test numpy
        import numpy as np
        assert hasattr(np, 'array')
        
        # Test that hazelbean can use numpy
        arr = np.random.rand(5, 5)
        temp_path = hb.temp('.npy', remove_at_exit=True)
        hb.save_array_as_npy(arr, temp_path)
        assert os.path.exists(temp_path)

```
:::

---

### test_file_system_permissions()

Test basic file system permissions

::: {.callout-note collapse="true"}
## Source code

```python
    @pytest.mark.smoke
    def test_file_system_permissions(self):
        """Test basic file system permissions"""
        with tempfile.TemporaryDirectory() as temp_dir:
            # Test directory creation
            test_dir = os.path.join(temp_dir, "permission_test")
            os.makedirs(test_dir)
            assert os.path.exists(test_dir)
            
            # Test file creation
            test_file = os.path.join(test_dir, "test.txt")
            with open(test_file, 'w') as f:
                f.write("permission test")
            assert os.path.exists(test_file)
            
            # Test file reading
            with open(test_file, 'r') as f:
                content = f.read()
            assert content == "permission test"
            
            # Test hazelbean can work in this environment
            p = hb.ProjectFlow(temp_dir)
            path = p.get_path("permission_test/test.txt")
            assert "test.txt" in path

```
:::

---


## Running System Tests

To run these tests:

```{.bash}
# Activate the hazelbean environment
conda activate hazelbean_env

# Run all system tests
pytest hazelbean_tests/system/ -v

# Run specific test file
pytest hazelbean_tests/system/<test_file>.py -v

# Run with coverage
pytest hazelbean_tests/system/ --cov=hazelbean --cov-report=html
```

## Test Organization

Tests are organized by:
- **Test Files** - Each file tests a specific module or feature
- **Test Functions** - Individual test cases within files
- **Test Classes** - Grouped related tests (where applicable)
- **Fixtures** - Shared test setup and teardown

## Related Documentation

- [Test Strategy](../README.md) - Overall testing approach
- [Contributing](../../CONTRIBUTING.md) - How to write tests
- [CI/CD](../../.github/workflows/) - Automated testing