In [1]:
import random
import time
import numpy as np
from typing import Dict, List, Optional, Tuple
import copy

class HillClimbingBudgetPlanner:
    def __init__(self, salary: float, categories: Dict[str, Dict[str, float]], 
                 fixed_assignments: Dict[str, float], priorities: Dict[str, float], 
                 tuple_data: List[Dict[str, float]]):
        self.salary = salary
        self.categories = categories
        self.fixed_assignments = fixed_assignments
        self.priorities = priorities
        self.tuple_data = tuple_data
        
        # Include all relevant categories
        self.all_categories = list(set(categories.keys()).union(set(fixed_assignments.keys())))
        
        # Initialize domains with delta values
        self.domains = self._initialize_domains()
        self.deltas = {cat: self.compute_delta_from_data(cat) for cat in self.all_categories}
        
    def compute_delta_from_data(self, category: str) -> float:
        """Computes delta for a category based on gaps between actual spending values."""
        values = [t[category] for t in self.tuple_data if category in t]
        if not values:
            return 1.0  # Default delta if no data
        
        unique_values = sorted(list(set(values)))
        if len(unique_values) == 1:
            return max(unique_values[0] * 0.01, 1.0)  # Default: 1% of value (or 1.0)
        
        gaps = [unique_values[i+1] - unique_values[i] for i in range(len(unique_values)-1)]
        return float(np.median(gaps))  # Use median gap as delta
    
    def _initialize_domains(self) -> Dict[str, List[float]]:
        """Initialize domains for each category based on min/max and fixed assignments."""
        domains = {}
        for category in self.all_categories:
            if category in self.fixed_assignments:
                domains[category] = [self.fixed_assignments[category]]
            else:
                min_val = self.categories[category]['min']
                max_val = self.categories[category]['max']
                
                # Generate domain: {min, min + delta, ..., max}   
                domain = []
                current = min_val
                delta = self.compute_delta_from_data(category)
                while current <= max_val + 1e-9:  # Tolerance for floating point
                    domain.append(current)
                    current += delta
                
                # Ensure max is included
                if not domain or abs(domain[-1] - max_val) > 1e-9:
                    domain.append(max_val)
                
                domains[category] = sorted(list(set(domain)))
        return domains
    
    def generate_random_solution(self) -> Dict[str, float]:
        """Generate a random initial solution within constraints."""
        solution = self.fixed_assignments.copy()
        remaining_categories = [cat for cat in self.all_categories if cat not in solution]
        remaining_budget = self.salary - sum(solution.values())
        
        # Assign random values to remaining categories
        for cat in remaining_categories:
            min_val = self.categories[cat]['min']
            max_val = min(self.categories[cat]['max'], remaining_budget)
            
            # Generate possible values within remaining budget
            possible_values = [v for v in self.domains[cat] if v <= max_val]
            if not possible_values:
                possible_values = [min_val]  # Fallback to minimum
            
            solution[cat] = random.choice(possible_values)
            remaining_budget -= solution[cat]
        
        return solution
    
    def evaluate_solution(self, solution: Dict[str, float]) -> float:
        """Evaluate the quality of a solution based on priorities and constraints."""
        # Penalty for exceeding salary
        total = sum(solution.values())
        if total > self.salary + 0.01:  # Allow small floating point tolerance
            return -float('inf')
        
        # Calculate priority score (higher is better)
        score = sum(solution[cat] * self.priorities.get(cat, 0) for cat in solution)
        
        # Small penalty for not using full budget (encourage using available money)
        unused_budget = self.salary - total
        score -= unused_budget * 0.01  # Small penalty
        
        return score
    
    def get_neighbors(self, solution: Dict[str, float]) -> List[Dict[str, float]]:
        """Generate neighboring solutions by adjusting values using delta steps."""
        neighbors = []
        
        for cat in solution:
            if cat in self.fixed_assignments:
                continue  # Skip fixed categories
            
            current_val = solution[cat]
            delta = self.deltas[cat]
            
            # Generate neighbors by increasing and decreasing by delta
            for direction in [-1, 1]:
                new_val = current_val + direction * delta
                
                # Check if new value is within domain bounds
                if (new_val >= self.categories[cat]['min'] and 
                    new_val <= self.categories[cat]['max']):
                    
                    neighbor = solution.copy()
                    neighbor[cat] = new_val
                    neighbors.append(neighbor)
        
        return neighbors
    
    def simple_hill_climbing(self, max_iter: int = 1000) -> Dict[str, float]:
        """Simple hill climbing implementation."""
        current = self.generate_random_solution()
        current_score = self.evaluate_solution(current)
        
        for _ in range(max_iter):
            neighbors = self.get_neighbors(current)
            if not neighbors:
                break
                
            neighbor = random.choice(neighbors)
            neighbor_score = self.evaluate_solution(neighbor)
            
            if neighbor_score > current_score:
                current = neighbor
                current_score = neighbor_score
        
        return current
    
    def steepest_ascent_hill_climbing(self, max_iter: int = 1000) -> Dict[str, float]:
        """Steepest ascent hill climbing implementation."""
        current = self.generate_random_solution()
        current_score = self.evaluate_solution(current)
        
        for _ in range(max_iter):
            neighbors = self.get_neighbors(current)
            if not neighbors:
                break
                
            # Evaluate all neighbors and pick the best
            best_neighbor = None
            best_score = -float('inf')
            
            for neighbor in neighbors:
                score = self.evaluate_solution(neighbor)
                if score > best_score:
                    best_score = score
                    best_neighbor = neighbor
            
            if best_score <= current_score:
                break  # No better neighbor found
                
            current = best_neighbor
            current_score = best_score
        
        return current
    
    def stochastic_hill_climbing(self, max_iter: int = 1000) -> Dict[str, float]:
        """Stochastic hill climbing implementation."""
        current = self.generate_random_solution()
        current_score = self.evaluate_solution(current)
        
        for _ in range(max_iter):
            neighbors = self.get_neighbors(current)
            if not neighbors:
                break
                
            # Randomly select a neighbor and accept if better
            neighbor = random.choice(neighbors)
            neighbor_score = self.evaluate_solution(neighbor)
            
            if neighbor_score > current_score:
                current = neighbor
                current_score = neighbor_score
        
        return current
    
    def random_restart_hill_climbing(self, restarts: int = 10, max_iter: int = 100) -> Dict[str, float]:
        """Random restart hill climbing implementation."""
        best_solution = None
        best_score = -float('inf')
        
        for _ in range(restarts):
            current = self.steepest_ascent_hill_climbing(max_iter)
            current_score = self.evaluate_solution(current)
            
            if current_score > best_score:
                best_score = current_score
                best_solution = current
        
        return best_solution
    
    def simulated_annealing(self, max_iter: int = 1000, initial_temp: float = 100.0, 
                           cooling_rate: float = 0.95) -> Dict[str, float]:
        """Simulated annealing implementation (a more advanced stochastic search)."""
        current = self.generate_random_solution()
        current_score = self.evaluate_solution(current)
        best = current.copy()
        best_score = current_score
        temp = initial_temp
        
        for i in range(max_iter):
            neighbors = self.get_neighbors(current)
            if not neighbors:
                break
                
            neighbor = random.choice(neighbors)
            neighbor_score = self.evaluate_solution(neighbor)
            
            # Always accept better solutions
            if neighbor_score > current_score:
                current = neighbor
                current_score = neighbor_score
                
                if current_score > best_score:
                    best = current.copy()
                    best_score = current_score
            else:
                # Accept worse solutions with some probability
                delta = neighbor_score - current_score
                probability = np.exp(delta / temp)
                if random.random() < probability:
                    current = neighbor
                    current_score = neighbor_score
            
            # Cool the temperature
            temp *= cooling_rate
        
        return best
    
    def optimize(self, method: str = 'steepest', **kwargs) -> Dict[str, float]:
        """Main optimization method that selects the appropriate hill climbing variant."""
        if method == 'simple':
            return self.simple_hill_climbing(**kwargs)
        elif method == 'steepest':
            return self.steepest_ascent_hill_climbing(**kwargs)
        elif method == 'stochastic':
            return self.stochastic_hill_climbing(**kwargs)
        elif method == 'random_restart':
            return self.random_restart_hill_climbing(**kwargs)
        elif method == 'annealing':
            return self.simulated_annealing(**kwargs)
        else:
            raise ValueError(f"Unknown method: {method}")


In [None]:
def run_budget_optimizer():
    print("=== HILL CLIMBING BUDGET OPTIMIZER ===")
    
    # Sample configuration (can be modified or taken from user input)
    categories = {
        'housing': {'min': 12000, 'max': 20000},
        'food': {'min': 5000, 'max': 10000},
        'transport': {'min': 3000, 'max': 8000},
        'education': {'min': 2000, 'max': 15000},
        'savings': {'min': 0, 'max': 15000},
        'entertainment': {'min': 1000, 'max': 5000}
    }
    
    # Generate some synthetic historical spending data
    tuple_data = [
        {'housing': 15000, 'food': 7000, 'transport': 5000, 'education': 5000, 'savings': 3000, 'entertainment': 2000},
        {'housing': 18000, 'food': 8000, 'transport': 4000, 'education': 3000, 'savings': 5000, 'entertainment': 1000},
        {'housing': 16000, 'food': 6000, 'transport': 6000, 'education': 7000, 'savings': 2000, 'entertainment': 3000}
    ]
    
    print("\n=== PRIORITIES ===")
    priorities = {}
    for cat in categories.keys():
        while True:
            try:
                p = float(input(f"Priority for {cat} (higher = more important, e.g., 1.0-5.0): "))
                priorities[cat] = p
                break
            except ValueError:
                print("Please enter a valid number")
    
    print("\n=== FIXED ASSIGNMENTS ===")
    fixed_assignments = {}
    for cat in categories.keys():
        fixed = input(f"Is {cat} amount fixed? (y/n): ").lower().strip()
        if fixed == 'y':
            while True:
                try:
                    val = float(input(f"Enter fixed amount for {cat} ({categories[cat]['min']}-{categories[cat]['max']}): "))
                    if categories[cat]['min'] <= val <= categories[cat]['max']:
                        fixed_assignments[cat] = val
                        break
                    print(f"Must be between {categories[cat]['min']} and {categories[cat]['max']}")
                except ValueError:
                    print("Invalid amount!")
    
    salary = float(input("\nEnter total budget: "))
    
    # Initialize planner
    planner = HillClimbingBudgetPlanner(
        salary=salary,
        categories=categories,
        fixed_assignments=fixed_assignments,
        priorities=priorities,
        tuple_data=tuple_data
    )
    
    # Test different optimization methods
    methods = {
        'simple': "Simple Hill Climbing",
        'steepest': "Steepest Ascent Hill Climbing",
        'stochastic': "Stochastic Hill Climbing",
        'random_restart': "Random Restart Hill Climbing",
        'annealing': "Simulated Annealing"
    }
    
    print("\n=== OPTIMIZATION RESULTS ===")
    results = {}
    
    for method_id, method_name in methods.items():
        start_time = time.time()
        solution = planner.optimize(method=method_id)
        elapsed = time.time() - start_time
        
        score = planner.evaluate_solution(solution)
        total = sum(solution.values())
        unused = salary - total
        
        results[method_id] = {
            'solution': solution,
            'score': score,
            'time': elapsed,
            'unused': unused
        }
        
        print(f"\n{method_name.upper():<30} | Score: {score:.2f} | Time: {elapsed:.4f}s")
        print(f"{'Category':<15} {'Amount':>10} {'Priority':>10}")
        print("-" * 40)
        for cat, amount in solution.items():
            print(f"{cat:<15} ${amount:>9.2f} {priorities.get(cat, 0):>10.1f}")
        print(f"\nTotal allocated: ${total:.2f} of ${salary:.2f}")
        print(f"Unused budget: ${unused:.2f}")
    
    # Compare all methods
    print("\n=== METHOD COMPARISON ===")
    print(f"{'Method':<25} {'Score':>10} {'Time (s)':>10} {'Unused $':>10}")
    print("-" * 55)
    for method_id, result in sorted(results.items(), key=lambda x: -x[1]['score']):
        print(f"{methods[method_id]:<25} {result['score']:>10.2f} {result['time']:>10.4f} {result['unused']:>10.2f}")
    
    # Visualize the best solution
    best_method = max(results.items(), key=lambda x: x[1]['score'])[0]
    best_solution = results[best_method]['solution']
    
    print(f"\n=== BEST SOLUTION ({methods[best_method]}) ===")
    for cat, amount in best_solution.items():
        print(f"{cat.capitalize():<15}: ${amount:>9.2f} (Priority: {priorities.get(cat, 0):.1f})")
    print(f"\nTotal: ${sum(best_solution.values()):.2f} of ${salary:.2f}")
    print(f"Unused budget: ${salary - sum(best_solution.values()):.2f}")
    print(f"Quality score: {results[best_method]['score']:.2f}")

if __name__ == "__main__":
    run_budget_optimizer()

=== HILL CLIMBING BUDGET OPTIMIZER ===

=== FINAL BUDGET ALLOCATION ===
Food: $5,000.00
Transport: $3,000.00
Education: $15,000.00
Savings: $12,000.00
Housing: $15,000.00
TOTAL: $50,000.00
TARGET: $50,000.00
