In [None]:
"""
Excel Model Validator - Phase 2: Testing and Validation
Comprehensive testing framework to ensure model integrity after optimization
"""

In [None]:
import xlwings as xw
import json
import time
import numpy as np
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import pandas as pd
from collections import defaultdict

In [None]:
@dataclass
class ValidationResult:
    """Container for validation test results"""
    test_name: str
    passed: bool
    actual_value: Any
    expected_value: Any = None
    tolerance: float = 0.001
    message: str = ""
    execution_time: float = 0.0
    
    def to_dict(self) -> Dict:
        return {
            'test_name': self.test_name,
            'passed': self.passed,
            'actual_value': self.actual_value,
            'expected_value': self.expected_value,
            'tolerance': self.tolerance,
            'message': self.message,
            'execution_time': round(self.execution_time, 4)
        }

In [None]:
class ExcelModelValidator:
    """
    Comprehensive Excel Model Validation and Testing Framework
    Ensures model integrity before and after optimization
    """
    
    def __init__(self, excel_file_path: str, baseline_data: Optional[Dict] = None):
        self.excel_file_path = Path(excel_file_path)
        self.baseline_data = baseline_data
        self.app = None
        self.wb = None
        
        # Test results tracking
        self.validation_results = []
        self.performance_metrics = {}
        self.functional_tests = {}
        
        # Test configuration
        self.test_config = {
            'numerical_tolerance': 0.001,
            'performance_tolerance': 2.0,  # 2x slower is acceptable
            'required_macros': ['UpdateFinancialModel', 'SolveForLeasePayment', 'ResetModel'],
            'critical_cells': {
                'Inputs': ['B4', 'B5', 'B13', 'B14'],  # Capex, Production, Target IRR, Lease Payment
                'Results': ['B4', 'B5', 'B6']  # IRR, NPV, Lease Payment
            },
            'test_scenarios': [
                {'name': 'Default Values', 'inputs': {}},
                {'name': 'High Growth', 'inputs': {'B5': 150000, 'B7': 0.05}},
                {'name': 'Low IRR Target', 'inputs': {'B13': 0.08}},
                {'name': 'High Capex', 'inputs': {'B4': 15000000}}
            ]
        }
        
        print(f"🧪 Excel Model Validator initialized for: {self.excel_file_path.name}")
    
    def __enter__(self):
        """Context manager entry"""
        self._open_excel()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self._close_excel()
    
    def _open_excel(self):
        """Open Excel application and workbook"""
        try:
            self.app = xw.App(visible=False)
            self.wb = self.app.books.open(self.excel_file_path)
            print(f"✅ Excel workbook opened for validation")
        except Exception as e:
            print(f"❌ Failed to open Excel file: {e}")
            raise
    
    def _close_excel(self):
        """Close Excel application"""
        if self.wb:
            self.wb.close()
        if self.app:
            self.app.quit()
    
    def _run_test(self, test_func, test_name: str) -> ValidationResult:
        """Run a single validation test with timing"""
        start_time = time.time()
        
        try:
            result = test_func()
            if isinstance(result, ValidationResult):
                result.execution_time = time.time() - start_time
                return result
            else:
                return ValidationResult(
                    test_name=test_name,
                    passed=bool(result),
                    actual_value=result,
                    execution_time=time.time() - start_time
                )
        except Exception as e:
            return ValidationResult(
                test_name=test_name,
                passed=False,
                actual_value=None,
                message=f"Test failed with error: {str(e)}",
                execution_time=time.time() - start_time
            )
    
    def test_model_functionality(self) -> Dict:
        """
        Step 1: Test that optimized model functions identically to original
        """
        print("\n🔧 Step 1: Testing Model Functionality...")
        
        functionality_results = {
            'macro_tests': {},
            'calculation_tests': {},
            'formula_integrity': {},
            'data_consistency': {}
        }
        
        # Test 1: Macro Functionality
        print("   Testing macro functionality...")
        for macro_name in self.test_config['required_macros']:
            test_result = self._run_test(
                lambda m=macro_name: self._test_macro_execution(m),
                f"Macro: {macro_name}"
            )
            functionality_results['macro_tests'][macro_name] = test_result.to_dict()
            self.validation_results.append(test_result)
        
        # Test 2: Key Calculation Integrity
        print("   Testing calculation integrity...")
        calc_result = self._run_test(
            self._test_calculation_integrity,
            "Calculation Integrity"
        )
        functionality_results['calculation_tests'] = calc_result.to_dict()
        self.validation_results.append(calc_result)
        
        # Test 3: Formula Integrity
        print("   Testing formula integrity...")
        formula_result = self._run_test(
            self._test_formula_integrity,
            "Formula Integrity"
        )
        functionality_results['formula_integrity'] = formula_result.to_dict()
        self.validation_results.append(formula_result)
        
        # Test 4: Data Consistency
        print("   Testing data consistency...")
        data_result = self._run_test(
            self._test_data_consistency,
            "Data Consistency"
        )
        functionality_results['data_consistency'] = data_result.to_dict()
        self.validation_results.append(data_result)
        
        passed_tests = sum(1 for r in self.validation_results if r.passed)
        total_tests = len(self.validation_results)
        
        print(f"   ✅ Functionality tests: {passed_tests}/{total_tests} passed")
        
        return functionality_results
    
    def _test_macro_execution(self, macro_name: str) -> ValidationResult:
        """Test that a macro executes without errors"""
        try:
            # Store original values
            original_values = self._capture_key_cell_values()
            
            # Execute macro
            self.wb.macro(macro_name)()
            
            # Check that macro executed (some cells should change or stay valid)
            new_values = self._capture_key_cell_values()
            
            # Verify we get reasonable results
            results_valid = self._validate_results_reasonableness(new_values)
            
            return ValidationResult(
                test_name=f"Macro Execution: {macro_name}",
                passed=results_valid,
                actual_value=f"Macro executed, results valid: {results_valid}",
                message="Macro executed successfully" if results_valid else "Macro executed but results seem invalid"
            )
            
        except Exception as e:
            return ValidationResult(
                test_name=f"Macro Execution: {macro_name}",
                passed=False,
                actual_value=None,
                message=f"Macro execution failed: {str(e)}"
            )
    
    def _test_calculation_integrity(self) -> ValidationResult:
        """Test that calculations produce consistent results"""
        try:
            # Set known inputs and test calculations
            inputs = self.wb.sheets['Inputs']
            results = self.wb.sheets['Results']
            
            # Set test values
            inputs.range('B4').value = 10000000  # Capex
            inputs.range('B5').value = 100000    # Production
            inputs.range('B13').value = 0.12     # Target IRR
            inputs.range('B14').value = 1200000  # Lease Payment
            
            # Force recalculation
            self.wb.macro('UpdateFinancialModel')()
            
            # Get calculated results
            irr = results.range('B4').value
            npv = results.range('B5').value
            lease_payment = results.range('B6').value
            
            # Validate reasonableness
            checks = [
                ('IRR is numeric', isinstance(irr, (int, float))),
                ('IRR is reasonable', 0.01 <= irr <= 1.0 if isinstance(irr, (int, float)) else False),
                ('NPV is numeric', isinstance(npv, (int, float))),
                ('Lease Payment matches input', abs(lease_payment - 1200000) < 1000 if isinstance(lease_payment, (int, float)) else False)
            ]
            
            all_passed = all(check[1] for check in checks)
            failed_checks = [check[0] for check in checks if not check[1]]
            
            return ValidationResult(
                test_name="Calculation Integrity",
                passed=all_passed,
                actual_value={
                    'IRR': irr,
                    'NPV': npv,
                    'Lease Payment': lease_payment
                },
                message=f"Failed checks: {failed_checks}" if failed_checks else "All calculations valid"
            )
            
        except Exception as e:
            return ValidationResult(
                test_name="Calculation Integrity",
                passed=False,
                actual_value=None,
                message=f"Calculation test failed: {str(e)}"
            )
    
    def _test_formula_integrity(self) -> ValidationResult:
        """Test that formulas are intact and not broken"""
        try:
            error_cells = []
            total_formulas = 0
            
            # Check all sheets for formula errors
            for sheet in self.wb.sheets:
                if sheet.used_range:
                    for cell in sheet.used_range:
                        if cell.formula:
                            total_formulas += 1
                            value = cell.value
                            
                            # Check for common Excel errors
                            if isinstance(value, str) and value.startswith('#'):
                                error_cells.append(f"{sheet.name}!{cell.address}: {value}")
            
            has_errors = len(error_cells) > 0
            
            return ValidationResult(
                test_name="Formula Integrity",
                passed=not has_errors,
                actual_value=f"{total_formulas} formulas checked, {len(error_cells)} errors",
                message=f"Formula errors found: {error_cells[:5]}" if has_errors else "All formulas valid"
            )
            
        except Exception as e:
            return ValidationResult(
                test_name="Formula Integrity",
                passed=False,
                actual_value=None,
                message=f"Formula integrity test failed: {str(e)}"
            )
    
    def _test_data_consistency(self) -> ValidationResult:
        """Test data consistency across related cells"""
        try:
            # Test consistency between related values
            inputs = self.wb.sheets['Inputs']
            results = self.wb.sheets['Results']
            
            # Get key values
            input_lease = inputs.range('B14').value
            result_lease = results.range('B6').value
            target_irr = inputs.range('B13').value
            actual_irr = results.range('B4').value
            
            consistency_checks = []
            
            # Check lease payment consistency
            if isinstance(input_lease, (int, float)) and isinstance(result_lease, (int, float)):
                lease_consistent = abs(input_lease - result_lease) < 1000
                consistency_checks.append(('Lease Payment Consistency', lease_consistent))
            
            # Check IRR reasonableness
            if isinstance(actual_irr, (int, float)) and isinstance(target_irr, (int, float)):
                irr_reasonable = 0.01 <= actual_irr <= 1.0
                consistency_checks.append(('IRR Reasonableness', irr_reasonable))
            
            all_consistent = all(check[1] for check in consistency_checks)
            failed_checks = [check[0] for check in consistency_checks if not check[1]]
            
            return ValidationResult(
                test_name="Data Consistency",
                passed=all_consistent,
                actual_value={
                    'input_lease': input_lease,
                    'result_lease': result_lease,
                    'target_irr': target_irr,
                    'actual_irr': actual_irr
                },
                message=f"Failed consistency checks: {failed_checks}" if failed_checks else "All data consistent"
            )
            
        except Exception as e:
            return ValidationResult(
                test_name="Data Consistency",
                passed=False,
                actual_value=None,
                message=f"Data consistency test failed: {str(e)}"
            )
    
    def test_solver_performance(self) -> Dict:
        """
        Step 2: Test solver and optimization performance
        """
        print("\n🎯 Step 2: Testing Solver Performance...")
        
        solver_results = {
            'goal_seek_tests': {},
            'convergence_tests': {},
            'performance_comparison': {}
        }
        
        # Test Goal Seek functionality
        print("   Testing Goal Seek performance...")
        goal_seek_result = self._run_test(
            self._test_goal_seek_performance,
            "Goal Seek Performance"
        )
        solver_results['goal_seek_tests'] = goal_seek_result.to_dict()
        self.validation_results.append(goal_seek_result)
        
        # Test convergence with different scenarios
        print("   Testing convergence scenarios...")
        convergence_results = {}
        
        for scenario in self.test_config['test_scenarios']:
            scenario_result = self._run_test(
                lambda s=scenario: self._test_scenario_convergence(s),
                f"Convergence: {scenario['name']}"
            )
            convergence_results[scenario['name']] = scenario_result.to_dict()
            self.validation_results.append(scenario_result)
        
        solver_results['convergence_tests'] = convergence_results
        
        # Performance comparison with baseline
        if self.baseline_data:
            print("   Comparing with baseline performance...")
            perf_result = self._run_test(
                self._test_performance_comparison,
                "Performance Comparison"
            )
            solver_results['performance_comparison'] = perf_result.to_dict()
            self.validation_results.append(perf_result)
        
        print(f"   ✅ Solver tests completed")
        
        return solver_results
    
    def _test_goal_seek_performance(self) -> ValidationResult:
        """Test Goal Seek performance and accuracy"""
        try:
            inputs = self.wb.sheets['Inputs']
            results = self.wb.sheets['Results']
            
            # Set initial conditions
            inputs.range('B4').value = 10000000  # Capex
            inputs.range('B13').value = 0.12     # Target IRR
            inputs.range('B14').value = 1000000  # Initial lease payment
            
            # Measure Goal Seek performance
            start_time = time.time()
            
            target_cell = results.range('B4')  # IRR
            changing_cell = inputs.range('B14')  # Lease Payment
            target_value = 0.12
            
            # Run Goal Seek
            target_cell.api.GoalSeek(Goal=target_value, ChangingCell=changing_cell.api)
            
            goal_seek_time = time.time() - start_time
            
            # Check results
            achieved_irr = target_cell.value
            final_lease = changing_cell.value
            
            # Validate convergence
            if isinstance(achieved_irr, (int, float)):
                convergence_error = abs(achieved_irr - target_value)
                converged = convergence_error < 0.001
            else:
                converged = False
                convergence_error = float('inf')
            
            return ValidationResult(
                test_name="Goal Seek Performance",
                passed=converged and goal_seek_time < 10.0,  # Should converge in < 10 seconds
                actual_value={
                    'execution_time': goal_seek_time,
                    'achieved_irr': achieved_irr,
                    'target_irr': target_value,
                    'convergence_error': convergence_error,
                    'final_lease_payment': final_lease
                },
                message=f"Goal Seek {'converged' if converged else 'failed'} in {goal_seek_time:.2f}s"
            )
            
        except Exception as e:
            return ValidationResult(
                test_name="Goal Seek Performance",
                passed=False,
                actual_value=None,
                message=f"Goal Seek test failed: {str(e)}"
            )
    
    def _test_scenario_convergence(self, scenario: Dict) -> ValidationResult:
        """Test convergence for a specific scenario"""
        try:
            inputs = self.wb.sheets['Inputs']
            
            # Apply scenario inputs
            for cell, value in scenario['inputs'].items():
                inputs.range(cell).value = value
            
            # Run solver
            start_time = time.time()
            self.wb.macro('SolveForLeasePayment')()
            solve_time = time.time() - start_time
            
            # Check convergence
            results = self.wb.sheets['Results']
            target_irr = inputs.range('B13').value
            achieved_irr = results.range('B4').value
            
            if isinstance(achieved_irr, (int, float)) and isinstance(target_irr, (int, float)):
                convergence_error = abs(achieved_irr - target_irr)
                converged = convergence_error < 0.001
            else:
                converged = False
                convergence_error = float('inf')
            
            return ValidationResult(
                test_name=f"Scenario Convergence: {scenario['name']}",
                passed=converged,
                actual_value={
                    'scenario': scenario['name'],
                    'solve_time': solve_time,
                    'converged': converged,
                    'convergence_error': convergence_error,
                    'target_irr': target_irr,
                    'achieved_irr': achieved_irr
                },
                message=f"Scenario {'converged' if converged else 'failed'} in {solve_time:.2f}s"
            )
            
        except Exception as e:
            return ValidationResult(
                test_name=f"Scenario Convergence: {scenario['name']}",
                passed=False,
                actual_value=None,
                message=f"Scenario test failed: {str(e)}"
            )
    
    def _test_performance_comparison(self) -> ValidationResult:
        """Compare performance with baseline data"""
        try:
            if not self.baseline_data or 'solver_performance' not in self.baseline_data:
                return ValidationResult(
                    test_name="Performance Comparison",
                    passed=True,
                    actual_value="No baseline data available",
                    message="Skipped: No baseline performance data"
                )
            
            # Run current performance test
            start_time = time.time()
            self.wb.macro('SolveForLeasePayment')()
            current_time = time.time() - start_time
            
            # Get baseline time
            baseline_time = self.baseline_data['solver_performance'].get('goal_seek', {}).get('duration_seconds', 0)
            
            if baseline_time > 0:
                performance_ratio = current_time / baseline_time
                acceptable_performance = performance_ratio <= self.test_config['performance_tolerance']
            else:
                acceptable_performance = True
                performance_ratio = 1.0
            
            return ValidationResult(
                test_name="Performance Comparison",
                passed=acceptable_performance,
                actual_value={
                    'current_time': current_time,
                    'baseline_time': baseline_time,
                    'performance_ratio': performance_ratio,
                    'acceptable': acceptable_performance
                },
                message=f"Performance {'acceptable' if acceptable_performance else 'degraded'}: {performance_ratio:.1f}x baseline"
            )
            
        except Exception as e:
            return ValidationResult(
                test_name="Performance Comparison",
                passed=False,
                actual_value=None,
                message=f"Performance comparison failed: {str(e)}"
            )
    
    def measure_optimization_metrics(self) -> Dict:
        """
        Step 3: Measure and track optimization metrics
        """
        print("\n📊 Step 3: Measuring Optimization Metrics...")
        
        metrics = {
            'performance_metrics': {},
            'memory_metrics': {},
            'calculation_metrics': {},
            'file_metrics': {}
        }
        
        # Performance metrics
        print("   Measuring performance metrics...")
        perf_tests = [
            ('Full Recalculation', lambda: self.app.api.Calculate()),
            ('Update Model Macro', lambda: self.wb.macro('UpdateFinancialModel')()),
            ('Solve Lease Payment', lambda: self.wb.macro('SolveForLeasePayment')())
        ]
        
        for test_name, test_func in perf_tests:
            times = []
            for _ in range(3):  # 3 runs for average
                start_time = time.time()
                try:
                    test_func()
                    times.append(time.time() - start_time)
                except:
                    times.append(float('inf'))
            
            metrics['performance_metrics'][test_name.lower().replace(' ', '_')] = {
                'average_time': sum(times) / len(times),
                'min_time': min(times),
                'max_time': max(times),
                'runs': times
            }
        
        # File metrics
        file_size = self.excel_file_path.stat().st_size / (1024 * 1024)  # MB
        metrics['file_metrics'] = {
            'file_size_mb': round(file_size, 2),
            'worksheet_count': len(self.wb.sheets)
        }
        
        print(f"   ✅ Optimization metrics measured")
        
        return metrics
    
    def _capture_key_cell_values(self) -> Dict:
        """Capture values from key cells for comparison"""
        values = {}
        
        try:
            for sheet_name, cells in self.test_config['critical_cells'].items():
                sheet = self.wb.sheets[sheet_name]
                values[sheet_name] = {}
                
                for cell_addr in cells:
                    try:
                        values[sheet_name][cell_addr] = sheet.range(cell_addr).value
                    except:
                        values[sheet_name][cell_addr] = None
        except:
            pass
        
        return values
    
    def _validate_results_reasonableness(self, values: Dict) -> bool:
        """Check if results are reasonable/valid"""
        try:
            results = values.get('Results', {})
            
            # Check IRR
            irr = results.get('B4')
            if not isinstance(irr, (int, float)) or not (0.01 <= irr <= 1.0):
                return False
            
            # Check NPV
            npv = results.get('B5')
            if not isinstance(npv, (int, float)):
                return False
            
            # Check Lease Payment
            lease = results.get('B6')
            if not isinstance(lease, (int, float)) or lease <= 0:
                return False
            
            return True
            
        except:
            return False
    
    def run_full_validation(self) -> Dict:
        """
        Run complete validation suite
        """
        print(f"🧪 Starting Full Model Validation: {self.excel_file_path.name}")
        print("=" * 60)
        
        start_time = time.time()
        
        # Run all validation steps
        try:
            # Step 1: Functionality Tests
            functionality_results = self.test_model_functionality()
            
            # Step 2: Solver Performance Tests
            solver_results = self.test_solver_performance()
            
            # Step 3: Optimization Metrics
            optimization_metrics = self.measure_optimization_metrics()
            
            # Compile results
            total_time = time.time() - start_time
            
            validation_summary = {
                'validation_completed': datetime.now().isoformat(),
                'total_validation_time': round(total_time, 2),
                'excel_file': str(self.excel_file_path),
                'test_results': {
                    'functionality_tests': functionality_results,
                    'solver_tests': solver_results,
                    'optimization_metrics': optimization_metrics
                },
                'summary_stats': {
                    'total_tests_run': len(self.validation_results),
                    'tests_passed': sum(1 for r in self.validation_results if r.passed),
                    'tests_failed': sum(1 for r in self.validation_results if not r.passed),
                    'pass_rate': round(sum(1 for r in self.validation_results if r.passed) / max(len(self.validation_results), 1) * 100, 1)
                },
                'all_test_details': [r.to_dict() for r in self.validation_results]
            }
            
            print(f"\n🎉 Validation Complete! Total time: {total_time:.2f}s")
            print(f"📊 Tests passed: {validation_summary['summary_stats']['tests_passed']}/{validation_summary['summary_stats']['total_tests_run']} ({validation_summary['summary_stats']['pass_rate']}%)")
            
            return validation_summary
            
        except Exception as e:
            print(f"❌ Validation failed: {e}")
            raise

In [None]:
def main():
    """
    Main function for Excel Model Validator
    """
    excel_file = "project_finance_lease_model.xlsm"
    
    if not Path(excel_file).exists():
        print(f"❌ Excel file not found: {excel_file}")
        return
    
    # Load baseline data if available
    baseline_file = Path(excel_file).stem + "_profiling_results.json"
    baseline_data = None
    
    if Path(baseline_file).exists():
        with open(baseline_file, 'r') as f:
            baseline_data = json.load(f).get('analysis_results', {})
        print(f"📂 Loaded baseline data from: {baseline_file}")
    
    # Run validation
    with ExcelModelValidator(excel_file, baseline_data) as validator:
        results = validator.run_full_validation()
        
        # Save validation results
        output_file = Path(excel_file).stem + "_validation_results.json"
        with open(output_file, 'w') as f:
            json.dump(results, f, indent=2)
        
        print(f"💾 Validation results saved to: {output_file}")

In [None]:
if __name__ == "__main__":
    main()