# LogManager Comprehensive Testing Notebook

This notebook provides comprehensive testing for the LogManager class, covering:
- Basic logging features and CRUD operations for loggers and handlers
- Testing duplicate file warnings
- HDFS copy functionality
- Distributed coordination features

## Prerequisites
- LogManager class properly installed
- HDFS access (or local file system for testing)
- Required dependencies installed

## 1. Setup Environment and Import Libraries

In [7]:
# Import required libraries
import os
import sys
import time
import tempfile
import shutil
from pathlib import Path

# Import LogManager
from utilities import LogManager

print("✅ All libraries imported successfully")
print(f"Python version: {sys.version}")
print(f"Working directory: {os.getcwd()}")

✅ All libraries imported successfully
Python version: 3.13.1 (tags/v3.13.1:0671451, Dec  3 2024, 19:06:28) [MSC v.1942 64 bit (AMD64)]
Working directory: c:\Users\Lisa Tan\Desktop\Projects\utilities\examples\logger


## 2. Basic Logging Configuration and CRUD Operations

Test basic LogManager functionality including creating, reading, updating, and deleting loggers.

In [8]:
# Create a LogManager instance
log_manager = LogManager()

print("✅ LogManager created successfully")
print(f"Config path: {log_manager._config_path}")
print(f"Current handlers: {list(log_manager._handlers_map.keys())}")
print(f"Current loggers: {list(log_manager._loggers_map.keys())}")

Config file not provided, initializing logger with class default config.
HDFS copy enabled (default behavior)
Signal handlers registered
✅ LogManager created successfully
Config path: C:\Users\Lisa Tan\Desktop\Projects\utilities\src\main\logger\_default_logger_config.yaml
Current handlers: ['handler_file', 'handler_console']
Current loggers: ['default_task']


In [9]:
# Test Logger CRUD Operations
print("=== TESTING LOGGER CRUD OPERATIONS ===")

# CREATE: Add multiple loggers
try:
    # Add test loggers with different configurations
    log_manager.add_logger("test_app", [
        {"handler": "handler_console", "level": "DEBUG"},
        {"handler": "handler_file", "level": "INFO"}
    ])
    print("✅ Created logger 'test_app'")
    
    log_manager.add_logger("test_database", [
        {"handler": "handler_file", "level": "WARNING"}
    ])
    print("✅ Created logger 'test_database'")
    
except Exception as e:
    print(f"❌ Error creating loggers: {e}")

# READ: Get and use loggers
try:
    app_logger = log_manager.get_logger("test_app")
    db_logger = log_manager.get_logger("test_database")
    
    app_logger.info("This is a test message from app logger")
    app_logger.debug("Debug message from app logger")
    db_logger.warning("Warning message from database logger")
    
    print("✅ Successfully retrieved and used loggers")
except Exception as e:
    print(f"❌ Error using loggers: {e}")

print(f"Current loggers: {list(log_manager._loggers_map.keys())}")

=== TESTING LOGGER CRUD OPERATIONS ===
✅ Created logger 'test_app'
✅ Created logger 'test_database'
 [32m2025-08-22 22:03:50[0m | [1mINFO    [0m | [31mtest_app[0m | [36m2872554959.py    | <module>[0m : 26 - [37mThis is a test message from app logger[0m
 [32m2025-08-22 22:03:50[0m | [34m[1mDEBUG   [0m | [31mtest_app[0m | [36m2872554959.py    | <module>[0m : 27 - [37mDebug message from app logger[0m
✅ Successfully retrieved and used loggers
Current loggers: ['default_task', 'test_app', 'test_database']


In [10]:
# UPDATE: Modify existing logger configuration
print("\n=== UPDATING LOGGER CONFIGURATION ===")

try:
    # Update the test_app logger to only use console handler
    log_manager.update_logger("test_app", [
        {"handler": "handler_console", "level": "ERROR"}
    ])
    
    # Test the updated logger
    updated_logger = log_manager.get_logger("test_app")
    updated_logger.info("This INFO should not appear (level is ERROR)")
    updated_logger.error("This ERROR should appear")
    
    print("✅ Successfully updated logger configuration")
except Exception as e:
    print(f"❌ Error updating logger: {e}")

# DELETE: Remove a logger
print("\n=== DELETING LOGGER ===")

try:
    log_manager.remove_logger("test_database")
    print("✅ Successfully removed 'test_database' logger")
    print(f"Remaining loggers: {list(log_manager._loggers_map.keys())}")
except Exception as e:
    print(f"❌ Error removing logger: {e}")


=== UPDATING LOGGER CONFIGURATION ===
 [32m2025-08-22 22:03:50[0m | [31m[1mERROR   [0m | [31mtest_app[0m | [36m1768015933.py    | <module>[0m : 13 - [37mThis ERROR should appear[0m
✅ Successfully updated logger configuration

=== DELETING LOGGER ===
✅ Successfully removed 'test_database' logger
Remaining loggers: ['default_task', 'test_app']


## 3. Create and Manage Multiple Handlers

Test CRUD operations for different handler types and configurations.

In [11]:
# Test Handler CRUD Operations
print("=== TESTING HANDLER CRUD OPERATIONS ===")

# CREATE: Add custom handlers
try:
    # Add a custom file handler
    custom_file_handler = {
        "sink": "test_custom.log",
        "level": "DEBUG",
        "format": "format_detailed",
        "rotation": "10 MB"
    }
    
    log_manager.add_handler("custom_file_handler", custom_file_handler)
    print("✅ Created custom file handler")
    
    # Add a custom console handler
    custom_console_handler = {
        "sink": "sys.stderr",
        "level": "WARNING",
        "format": "format_simple"
    }
    
    log_manager.add_handler("custom_console_handler", custom_console_handler)
    print("✅ Created custom console handler")
    
except Exception as e:
    print(f"❌ Error creating handlers: {e}")

print(f"All handlers: {list(log_manager._handlers_map.keys())}")

=== TESTING HANDLER CRUD OPERATIONS ===
✅ Created custom file handler
✅ Created custom console handler
All handlers: ['handler_file', 'handler_console', 'custom_file_handler', 'custom_console_handler']


 ⚠️ The format referenced by handler 'custom_file_handler' is not defined in the 'formats' section of the config file. Using the format as is: 
	 format_detailed 

 ⚠️ The format referenced by handler 'custom_console_handler' is not defined in the 'formats' section of the config file. Using the format as is: 
	 format_simple 



In [12]:
# UPDATE: Modify handler configuration
print("\n=== UPDATING HANDLER CONFIGURATION ===")

try:
    # Update the custom console handler to INFO level
    updated_console_handler = {
        "sink": "sys.stdout",
        "level": "INFO",
        "format": "format_detailed"
    }
    
    log_manager.update_handler("custom_console_handler", updated_console_handler)
    print("✅ Successfully updated console handler")
    
except Exception as e:
    print(f"❌ Error updating handler: {e}")

# CREATE: Add logger that uses new handlers
try:
    log_manager.add_logger("test_custom", [
        {"handler": "custom_file_handler", "level": "DEBUG"},
        {"handler": "custom_console_handler", "level": "INFO"}
    ])
    
    # Test the custom logger
    custom_logger = log_manager.get_logger("test_custom")
    custom_logger.debug("Debug message to custom handlers")
    custom_logger.info("Info message to custom handlers")
    custom_logger.warning("Warning message to custom handlers")
    
    print("✅ Successfully created and tested custom logger")
except Exception as e:
    print(f"❌ Error with custom logger: {e}")


=== UPDATING HANDLER CONFIGURATION ===
✅ Successfully updated console handler
format_detailed
format_detailed
✅ Successfully created and tested custom logger


 ⚠️ The format referenced by handler 'custom_console_handler' is not defined in the 'formats' section of the config file. Using the format as is: 
	 format_detailed 



In [13]:
# DELETE: Remove handlers
print("\n=== DELETING HANDLERS ===")

try:
    # Remove the custom file handler
    log_manager.remove_handler("custom_file_handler")
    print("✅ Successfully removed custom file handler")
    
    print(f"Remaining handlers: {list(log_manager._handlers_map.keys())}")
    print(f"Remaining loggers: {list(log_manager._loggers_map.keys())}")
    
except Exception as e:
    print(f"❌ Error removing handler: {e}")


=== DELETING HANDLERS ===
✅ Successfully removed custom file handler
Remaining handlers: ['handler_file', 'handler_console', 'custom_console_handler']
Remaining loggers: ['default_task', 'test_app', 'test_custom']


## 4. Testing Duplicate File Warnings

Create multiple HDFS copy operations that target the same files to test duplicate detection.

In [14]:
# Setup test environment for duplicate file testing
print("=== SETTING UP DUPLICATE FILE TEST ENVIRONMENT ===")

# Create temporary directories and files
test_dir = Path(tempfile.mkdtemp(prefix="logmanager_test_"))
log_dir = test_dir / "logs"
hdfs_dir = test_dir / "hdfs_destination"

# Create directories
log_dir.mkdir(parents=True)
hdfs_dir.mkdir(parents=True)

# Create test log files
test_files = [
    log_dir / "app1.log",
    log_dir / "app2.log", 
    log_dir / "shared.log",
    log_dir / "database.log"
]

for file_path in test_files:
    with open(file_path, 'w') as f:
        f.write(f"Test log content for {file_path.name}\n")
        f.write(f"Timestamp: {time.time()}\n")
        f.write("This is test log data for duplicate detection testing.\n")

print(f"✅ Created test environment at: {test_dir}")
print(f"Log files: {[f.name for f in test_files]}")
print(f"Log directory: {log_dir}")
print(f"HDFS destination: {hdfs_dir}")

=== SETTING UP DUPLICATE FILE TEST ENVIRONMENT ===
✅ Created test environment at: C:\Users\LISATA~1\AppData\Local\Temp\logmanager_test_092zj4fs
Log files: ['app1.log', 'app2.log', 'shared.log', 'database.log']
Log directory: C:\Users\LISATA~1\AppData\Local\Temp\logmanager_test_092zj4fs\logs
HDFS destination: C:\Users\LISATA~1\AppData\Local\Temp\logmanager_test_092zj4fs\hdfs_destination


In [None]:
# Test duplicate file detection with overlapping copy operations
print("\n=== TESTING DUPLICATE FILE WARNINGS ===")

try:
    # Start first copy operation targeting all .log files
    log_manager.start_hdfs_copy(
        copy_name="copy_all_logs",
        path_patterns=[str(log_dir / "*.log")],
        hdfs_destination=str(hdfs_dir / "all_logs"),
        copy_interval=30  # 30 seconds interval
    )
    print("✅ Started copy operation 'copy_all_logs'")
    
    # Start second copy operation targeting specific files (will overlap)
    log_manager.start_hdfs_copy(
        copy_name="copy_app_logs", 
        path_patterns=[
            str(log_dir / "app1.log"),
            str(log_dir / "app2.log"),
            str(log_dir / "shared.log")  # This will cause overlap
        ],
        hdfs_destination=str(hdfs_dir / "app_logs"),
        copy_interval=25  # 25 seconds interval
    )
    print("✅ Started copy operation 'copy_app_logs'")
    
    # Start third copy operation with more overlap
    log_manager.start_hdfs_copy(
        copy_name="copy_shared_logs",
        path_patterns=[
            str(log_dir / "shared.log"),    # Overlaps with both above
            str(log_dir / "database.log")   # Overlaps with first
        ],
        hdfs_destination=str(hdfs_dir / "shared_logs"),
        copy_interval=20  # 20 seconds interval
    )
    print("✅ Started copy operation 'copy_shared_logs'")
    
    print("\n⚠️  Check the output above for duplicate file warnings!")
    
except Exception as e:
    print(f"❌ Error setting up duplicate file test: {e}")

# List active copy operations
operations = log_manager.list_hdfs_copy_operations()
print(f"\nActive HDFS copy operations: {len(operations)}")
for op in operations:
    print(f"  - {op['name']}: {op['is_alive']}")

Attempt 2 failed for C:\Users\LISATA~1\AppData\Local\Temp\logmanager_test_092zj4fs\logs\shared.log: Unsupported file format: .log. Supported formats: ['csv', 'txt', 'text', 'sql', 'json', 'yaml', 'yml', 'arrow', 'feather', 'parquet', 'pickle', 'pkl']. Retrying in 5s...
Attempt 3 failed for C:\Users\LISATA~1\AppData\Local\Temp\logmanager_test_092zj4fs\logs\shared.log: Unsupported file format: .log. Supported formats: ['csv', 'txt', 'text', 'sql', 'json', 'yaml', 'yml', 'arrow', 'feather', 'parquet', 'pickle', 'pkl']. Retrying in 5s...
Attempt 3 failed for C:\Users\LISATA~1\AppData\Local\Temp\logmanager_test_092zj4fs\logs\shared.log: Unsupported file format: .log. Supported formats: ['csv', 'txt', 'text', 'sql', 'json', 'yaml', 'yml', 'arrow', 'feather', 'parquet', 'pickle', 'pkl']. Retrying in 5s...
Failed to copy C:\Users\LISATA~1\AppData\Local\Temp\logmanager_test_092zj4fs\logs\shared.log after 4 attempts: Unsupported file format: .log. Supported formats: ['csv', 'txt', 'text', 'sql',

In [None]:
# Wait a moment and trigger manual copy to see duplicate warnings immediately
print("=== TRIGGERING MANUAL COPY TO SEE DUPLICATE WARNINGS ===")

time.sleep(2)  # Let the operations initialize

try:
    # Trigger all copy operations manually
    log_manager.trigger_hdfs_copy_now()
    print("✅ Triggered all copy operations manually")
    print("\n⚠️  Look for duplicate file warnings in the output above!")
    
except Exception as e:
    print(f"❌ Error triggering manual copy: {e}")

## 5. HDFS Copy Operations

Test various HDFS copy scenarios including error handling and different configurations.

In [None]:
# Test different HDFS copy configurations
print("=== TESTING VARIOUS HDFS COPY CONFIGURATIONS ===")

# Create additional test files with directory structure
structured_dir = test_dir / "structured_logs"
app_dir = structured_dir / "app"
db_dir = structured_dir / "database"

app_dir.mkdir(parents=True)
db_dir.mkdir(parents=True)

# Create files in subdirectories
structured_files = [
    app_dir / "app_2024.log",
    app_dir / "app_errors.log",
    db_dir / "db_queries.log",
    db_dir / "db_errors.log"
]

for file_path in structured_files:
    with open(file_path, 'w') as f:
        f.write(f"Structured log content for {file_path}\n")
        f.write(f"Created at: {time.time()}\n")

print(f"✅ Created structured log files")
print(f"App logs: {[f.name for f in app_dir.glob('*.log')]}")
print(f"DB logs: {[f.name for f in db_dir.glob('*.log')]}")

In [None]:
# Test copy with structure preservation
print("\n=== TESTING COPY WITH STRUCTURE PRESERVATION ===")

try:
    log_manager.start_hdfs_copy(
        copy_name="copy_with_structure",
        path_patterns=[str(structured_dir / "**" / "*.log")],
        hdfs_destination=str(hdfs_dir / "structured_copy"),
        root_dir=str(structured_dir),
        preserve_structure=True,
        copy_interval=40,
        create_dest_dirs=True
    )
    print("✅ Started structured copy operation")
    
except Exception as e:
    print(f"❌ Error with structured copy: {e}")

# Test copy without structure preservation
try:
    log_manager.start_hdfs_copy(
        copy_name="copy_flat_structure",
        path_patterns=[str(structured_dir / "**" / "*.log")],
        hdfs_destination=str(hdfs_dir / "flat_copy"),
        preserve_structure=False,
        copy_interval=45,
        max_retries=2,
        retry_delay=3
    )
    print("✅ Started flat copy operation")
    
except Exception as e:
    print(f"❌ Error with flat copy: {e}")

# Show all active operations
operations = log_manager.list_hdfs_copy_operations()
print(f"\nTotal active operations: {len(operations)}")
for op in operations:
    print(f"  - {op['name']}: alive={op['is_alive']}, thread={op['thread_name']}")

In [None]:
# Test manual triggering of specific operations
print("\n=== TESTING MANUAL COPY TRIGGERS ===")

try:
    # Trigger specific operation
    print("Triggering 'copy_with_structure' operation...")
    log_manager.trigger_hdfs_copy_now("copy_with_structure")
    
    time.sleep(1)
    
    # Trigger specific operation
    print("\nTriggering 'copy_flat_structure' operation...")
    log_manager.trigger_hdfs_copy_now("copy_flat_structure")
    
    print("\n✅ Manual triggers completed")
    
except Exception as e:
    print(f"❌ Error with manual triggers: {e}")

## 6. Distributed Coordination Testing

Test the distributed coordination features using environment variables.

In [None]:
# Test distributed coordination status
print("=== TESTING DISTRIBUTED COORDINATION ===")

# Check current coordination status
try:
    status = log_manager.get_hdfs_copy_status()
    print("Current HDFS Copy Status:")
    print(f"  Enabled: {status['hdfs_copy_enabled']}")
    print(f"  Reason: {status['reason']}")
    print(f"  Environment: {status['environment_variable']}")
    
except Exception as e:
    print(f"❌ Error getting status: {e}")

# Test the internal coordination methods
try:
    should_run = log_manager._should_run_hdfs_copy()
    print(f"\nShould run HDFS copy: {should_run}")
    
except Exception as e:
    print(f"❌ Error checking coordination: {e}")

In [None]:
# Test coordination with environment variable changes
print("\n=== TESTING COORDINATION WITH ENVIRONMENT CHANGES ===")

# Save original environment
original_env = os.environ.get('DISABLE_HDFS_COPY', None)

try:
    # Test with HDFS copy disabled
    print("Setting DISABLE_HDFS_COPY=true...")
    os.environ['DISABLE_HDFS_COPY'] = 'true'
    
    # Create new LogManager to test initialization with disabled coordination
    disabled_log_manager = LogManager()
    
    print(f"New LogManager HDFS enabled: {disabled_log_manager.hdfs_copy_enabled}")
    
    # Try to start HDFS copy (should be skipped)
    disabled_log_manager.start_hdfs_copy(
        copy_name="should_be_skipped",
        path_patterns=[str(log_dir / "*.log")],
        hdfs_destination=str(hdfs_dir / "skipped"),
        copy_interval=60
    )
    
    operations = disabled_log_manager.list_hdfs_copy_operations()
    print(f"Operations after disabled start: {len(operations)}")
    
except Exception as e:
    print(f"❌ Error testing disabled coordination: {e}")

finally:
    # Restore original environment
    if original_env is None:
        os.environ.pop('DISABLE_HDFS_COPY', None)
    else:
        os.environ['DISABLE_HDFS_COPY'] = original_env
    print("\n✅ Environment restored")

In [None]:
# Simulate distributed environment scenarios
print("\n=== SIMULATING DISTRIBUTED SCENARIOS ===")

def simulate_worker_node():
    """Simulate a worker node with HDFS copy disabled"""
    original_env = os.environ.get('DISABLE_HDFS_COPY', None)
    
    try:
        # Set worker environment
        os.environ['DISABLE_HDFS_COPY'] = 'true'
        
        # Create worker LogManager
        worker_manager = LogManager()
        
        # Try to start copy operation
        worker_manager.start_hdfs_copy(
            copy_name="worker_copy",
            path_patterns=[str(log_dir / "*.log")],
            hdfs_destination=str(hdfs_dir / "worker"),
            copy_interval=60
        )
        
        status = worker_manager.get_hdfs_copy_status()
        operations = worker_manager.list_hdfs_copy_operations()
        
        return {
            'node_type': 'worker',
            'hdfs_enabled': status['hdfs_copy_enabled'],
            'reason': status['reason'],
            'operations_count': len(operations)
        }
        
    finally:
        # Restore environment
        if original_env is None:
            os.environ.pop('DISABLE_HDFS_COPY', None)
        else:
            os.environ['DISABLE_HDFS_COPY'] = original_env

def simulate_coordinator_node():
    """Simulate a coordinator node with HDFS copy enabled"""
    # Ensure no disable flag is set
    os.environ.pop('DISABLE_HDFS_COPY', None)
    
    # Create coordinator LogManager
    coordinator_manager = LogManager()
    
    # Start copy operation
    coordinator_manager.start_hdfs_copy(
        copy_name="coordinator_copy",
        path_patterns=[str(log_dir / "*.log")],
        hdfs_destination=str(hdfs_dir / "coordinator"),
        copy_interval=60
    )
    
    status = coordinator_manager.get_hdfs_copy_status()
    operations = coordinator_manager.list_hdfs_copy_operations()
    
    # Stop the operation for cleanup
    coordinator_manager.stop_hdfs_copy("coordinator_copy")
    
    return {
        'node_type': 'coordinator',
        'hdfs_enabled': status['hdfs_copy_enabled'],
        'reason': status['reason'],
        'operations_count': len(operations)
    }

# Run simulations
try:
    worker_result = simulate_worker_node()
    coordinator_result = simulate_coordinator_node()
    
    print("Simulation Results:")
    print(f"Worker Node:")
    print(f"  - HDFS Enabled: {worker_result['hdfs_enabled']}")
    print(f"  - Reason: {worker_result['reason']}")
    print(f"  - Operations: {worker_result['operations_count']}")
    
    print(f"\nCoordinator Node:")
    print(f"  - HDFS Enabled: {coordinator_result['hdfs_enabled']}")
    print(f"  - Reason: {coordinator_result['reason']}")
    print(f"  - Operations: {coordinator_result['operations_count']}")
    
    print("\n✅ Distributed coordination simulation completed")
    
except Exception as e:
    print(f"❌ Error in simulation: {e}")

## 7. Cleanup and Resource Management

Properly stop all operations and clean up resources.

In [None]:
# Stop all HDFS copy operations
print("=== CLEANUP: STOPPING ALL HDFS OPERATIONS ===")

try:
    # Stop all operations with verbose output
    failed_operations = log_manager.stop_all_hdfs_copy(timeout=15.0, verbose=True)
    
    if failed_operations:
        print(f"\n⚠️  Some operations failed to stop: {failed_operations}")
    else:
        print("\n✅ All HDFS operations stopped successfully")
        
    # Verify no operations are running
    remaining_operations = log_manager.list_hdfs_copy_operations()
    print(f"Remaining operations: {len(remaining_operations)}")
    
except Exception as e:
    print(f"❌ Error during operation cleanup: {e}")

In [None]:
# Clean up test files and directories
print("\n=== CLEANUP: REMOVING TEST FILES ===")

try:
    # Remove temporary test directory
    if test_dir.exists():
        shutil.rmtree(test_dir)
        print(f"✅ Removed test directory: {test_dir}")
    
    # Remove any log files created in current directory
    current_dir = Path.cwd()
    log_files = list(current_dir.glob("test_*.log"))
    
    for log_file in log_files:
        try:
            log_file.unlink()
            print(f"✅ Removed log file: {log_file.name}")
        except Exception as e:
            print(f"⚠️  Could not remove {log_file.name}: {e}")
    
    print("\n✅ File cleanup completed")
    
except Exception as e:
    print(f"❌ Error during file cleanup: {e}")

In [None]:
# Final status and summary
print("\n=== FINAL TEST SUMMARY ===")

try:
    # Get final status
    final_status = log_manager.get_hdfs_copy_status()
    final_operations = log_manager.list_hdfs_copy_operations()
    
    print("LogManager Final State:")
    print(f"  - Handlers: {len(log_manager._handlers_map)}")
    print(f"  - Loggers: {len(log_manager._loggers_map)}")
    print(f"  - HDFS Copy Enabled: {final_status['hdfs_copy_enabled']}")
    print(f"  - Active Operations: {len(final_operations)}")
    
    print("\n🎉 COMPREHENSIVE TESTING COMPLETED SUCCESSFULLY!")
    print("\nTests covered:")
    print("  ✅ Logger CRUD operations (Create, Read, Update, Delete)")
    print("  ✅ Handler CRUD operations")
    print("  ✅ Duplicate file warning detection")
    print("  ✅ HDFS copy operations with various configurations")
    print("  ✅ Distributed coordination with environment variables")
    print("  ✅ Manual copy triggering")
    print("  ✅ Resource cleanup and management")
    
except Exception as e:
    print(f"❌ Error getting final status: {e}")

print("\n" + "="*50)
print("LogManager testing completed. Review the output above for any warnings or errors.")
print("="*50)