# 03: Uniform Cost Search

## Learning Objectives
- Understand how UCS finds optimal paths with variable costs
- Implement UCS using a priority queue
- Compare UCS with BFS and DFS
- Explore real-world applications with different cost functions
- Understand the relationship between UCS and Dijkstra's algorithm

## 1. Introduction to UCS

**Uniform Cost Search (UCS)** expands nodes in order of their path cost from the start node.

### Key Differences from BFS:
- BFS: Expands by **depth** (number of steps)
- UCS: Expands by **cost** (cumulative path cost)

### Properties:
- **Complete**: Yes (if step costs ≥ ε > 0)
- **Optimal**: Yes
- **Time**: O(b^(C*/ε)) where C* is optimal cost
- **Space**: O(b^(C*/ε))

In [None]:
# Import required modules
import heapq
from typing import Any, List, Tuple, Dict, Set
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from collections import defaultdict

In [None]:
# Base classes
class SearchProblem:
    def get_start_state(self) -> Any:
        raise NotImplementedError
    
    def is_goal_state(self, state: Any) -> bool:
        raise NotImplementedError
    
    def get_successors(self, state: Any) -> List[Tuple[Any, str, float]]:
        raise NotImplementedError

## 2. Priority Queue Implementation

UCS requires an efficient priority queue. Python's `heapq` provides a min-heap:

In [None]:
class PriorityQueue:
    """Priority queue for UCS and other algorithms"""
    
    def __init__(self):
        self.heap = []
        self.counter = 0  # Tie-breaker for equal priorities
        self.entry_finder = {}  # Map from state to heap entry
        self.REMOVED = '<removed>'  # Sentinel for removed entries
    
    def push(self, state: Any, priority: float, data: Any = None):
        """Add state with priority"""
        # Remove old entry if exists
        if state in self.entry_finder:
            self.remove(state)
        
        # Add new entry
        entry = [priority, self.counter, state, data]
        self.counter += 1
        self.entry_finder[state] = entry
        heapq.heappush(self.heap, entry)
    
    def pop(self) -> Tuple[Any, float, Any]:
        """Remove and return lowest priority state"""
        while self.heap:
            priority, _, state, data = heapq.heappop(self.heap)
            if state != self.REMOVED:
                del self.entry_finder[state]
                return state, priority, data
        raise KeyError('Pop from empty priority queue')
    
    def remove(self, state: Any):
        """Mark state as removed"""
        entry = self.entry_finder.pop(state)
        entry[2] = self.REMOVED
    
    def is_empty(self) -> bool:
        """Check if queue is empty"""
        return len(self.entry_finder) == 0
    
    def __len__(self) -> int:
        return len(self.entry_finder)
    
    def __contains__(self, state: Any) -> bool:
        return state in self.entry_finder

# Test priority queue
pq = PriorityQueue()
pq.push('A', 5)
pq.push('B', 2)
pq.push('C', 7)
pq.push('B', 1)  # Update priority

print("Priority Queue Test:")
while not pq.is_empty():
    state, priority, _ = pq.pop()
    print(f"  {state}: priority={priority}")

## 3. Uniform Cost Search Implementation

In [None]:
class UniformCostSearch:
    """UCS implementation with path tracking"""
    
    def __init__(self):
        self.nodes_expanded = 0
        self.max_frontier_size = 0
        self.exploration_order = []
        self.cost_when_expanded = {}  # Track cost when each node was expanded
    
    def search(self, problem: SearchProblem) -> Tuple[List[str], float]:
        """
        Search for optimal solution using UCS
        Returns: (actions, total_cost)
        """
        start_state = problem.get_start_state()
        
        # Check if start is goal
        if problem.is_goal_state(start_state):
            return [], 0
        
        # Priority queue: stores (state) with priority=cost
        frontier = PriorityQueue()
        frontier.push(start_state, 0, [])
        
        # Track best cost to each state
        best_costs = {start_state: 0}
        
        # Explored set
        explored = set()
        
        while not frontier.is_empty():
            # Update statistics
            self.max_frontier_size = max(self.max_frontier_size, len(frontier))
            
            # Get lowest cost node
            state, cost, path = frontier.pop()
            
            # Skip if already explored
            if state in explored:
                continue
            
            # Mark as explored
            explored.add(state)
            self.nodes_expanded += 1
            self.exploration_order.append(state)
            self.cost_when_expanded[state] = cost
            
            # Check if goal
            if problem.is_goal_state(state):
                return path, cost
            
            # Expand node
            for successor, action, step_cost in problem.get_successors(state):
                if successor not in explored:
                    new_cost = cost + step_cost
                    
                    # Only add if we found a better path
                    if successor not in best_costs or new_cost < best_costs[successor]:
                        best_costs[successor] = new_cost
                        frontier.push(successor, new_cost, path + [action])
        
        # No solution found
        return None, float('inf')
    
    def get_statistics(self):
        return {
            'nodes_expanded': self.nodes_expanded,
            'max_frontier_size': self.max_frontier_size,
            'states_explored': len(self.exploration_order)
        }

## 4. Variable Cost Grid Problem

Let's create a grid where different terrains have different movement costs:

In [None]:
class TerrainGridProblem(SearchProblem):
    """Grid with variable terrain costs"""
    
    # Terrain types and their costs
    TERRAIN_COSTS = {
        '.': 1,    # Plains (easy)
        '~': 3,    # Water (slow)
        '^': 5,    # Mountains (difficult)
        '#': float('inf')  # Wall (impassable)
    }
    
    def __init__(self, terrain_map: List[str], start: Tuple[int, int], goal: Tuple[int, int]):
        self.terrain = terrain_map
        self.start = start
        self.goal = goal
        self.rows = len(terrain_map)
        self.cols = len(terrain_map[0]) if terrain_map else 0
    
    def get_start_state(self):
        return self.start
    
    def is_goal_state(self, state):
        return state == self.goal
    
    def get_successors(self, state):
        successors = []
        row, col = state
        
        moves = [
            ((-1, 0), 'UP'),
            ((1, 0), 'DOWN'),
            ((0, -1), 'LEFT'),
            ((0, 1), 'RIGHT')
        ]
        
        for (dr, dc), action in moves:
            new_row, new_col = row + dr, col + dc
            
            if 0 <= new_row < self.rows and 0 <= new_col < self.cols:
                terrain_type = self.terrain[new_row][new_col]
                cost = self.TERRAIN_COSTS.get(terrain_type, 1)
                
                if cost != float('inf'):  # Not a wall
                    successors.append(((new_row, new_col), action, cost))
        
        return successors
    
    def visualize(self, path=None, exploration_order=None, costs=None):
        """Visualize the terrain and solution"""
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
        
        # Create color maps for terrain
        terrain_colors = {
            '.': [0.9, 0.9, 0.9],  # Light gray for plains
            '~': [0.3, 0.5, 0.8],  # Blue for water
            '^': [0.5, 0.3, 0.1],  # Brown for mountains
            '#': [0, 0, 0]         # Black for walls
        }
        
        # Left plot: Terrain with path
        display1 = np.zeros((self.rows, self.cols, 3))
        for i in range(self.rows):
            for j in range(self.cols):
                display1[i, j] = terrain_colors[self.terrain[i][j]]
        
        # Mark path if provided
        if path:
            current = self.start
            for action in path:
                if action == 'UP': current = (current[0]-1, current[1])
                elif action == 'DOWN': current = (current[0]+1, current[1])
                elif action == 'LEFT': current = (current[0], current[1]-1)
                elif action == 'RIGHT': current = (current[0], current[1]+1)
                
                if current != self.goal:
                    display1[current[0], current[1]] = [1, 1, 0]  # Yellow for path
        
        # Mark start and goal
        display1[self.start[0], self.start[1]] = [0, 1, 0]  # Green
        display1[self.goal[0], self.goal[1]] = [1, 0, 0]    # Red
        
        ax1.imshow(display1)
        ax1.set_title("Terrain Map with Solution Path")
        ax1.axis('off')
        
        # Add terrain costs as text
        for i in range(self.rows):
            for j in range(self.cols):
                if self.terrain[i][j] != '#':
                    cost = self.TERRAIN_COSTS[self.terrain[i][j]]
                    ax1.text(j, i, str(cost), ha='center', va='center', 
                            color='white' if self.terrain[i][j] == '^' else 'black',
                            fontsize=8, fontweight='bold')
        
        # Right plot: Exploration costs
        if costs:
            display2 = np.ones((self.rows, self.cols)) * -1
            max_cost = max(costs.values()) if costs else 1
            
            for state, cost in costs.items():
                display2[state[0], state[1]] = cost
            
            # Mask unexplored areas
            masked = np.ma.masked_where(display2 < 0, display2)
            
            im = ax2.imshow(masked, cmap='YlOrRd', vmin=0, vmax=max_cost)
            ax2.set_title("Cost When Expanded (UCS)")
            ax2.axis('off')
            
            # Add cost values as text
            for state, cost in costs.items():
                ax2.text(state[1], state[0], f"{cost:.1f}", 
                        ha='center', va='center', fontsize=8)
            
            plt.colorbar(im, ax=ax2, label='Path Cost')
        
        # Add legend
        legend_elements = [
            patches.Patch(color='green', label='Start'),
            patches.Patch(color='red', label='Goal'),
            patches.Patch(color='yellow', label='Path'),
            patches.Patch(color=[0.9,0.9,0.9], label='Plains (1)'),
            patches.Patch(color=[0.3,0.5,0.8], label='Water (3)'),
            patches.Patch(color=[0.5,0.3,0.1], label='Mountains (5)'),
            patches.Patch(color='black', label='Wall (∞)')
        ]
        ax1.legend(handles=legend_elements, loc='upper left', bbox_to_anchor=(1, 1))
        
        plt.tight_layout()
        plt.show()

## 5. Test UCS on Variable Terrain

In [None]:
# Create a terrain map
terrain_map = [
    "......~~~~~",
    "..##..~~~~.",
    "..##...~~..",
    "...^^^^....",
    "..^^^^^^...",
    "...^^^^....",
    "......~~...",
    ".....~~~~.."
]

# Create problem
terrain_problem = TerrainGridProblem(terrain_map, start=(0, 0), goal=(7, 10))

# Run UCS
ucs = UniformCostSearch()
solution, cost = ucs.search(terrain_problem)

print("=" * 50)
print("UNIFORM COST SEARCH ON VARIABLE TERRAIN")
print("=" * 50)
print(f"Solution found: {solution is not None}")
print(f"Total path cost: {cost}")
print(f"Path length: {len(solution) if solution else 'N/A'}")
print(f"Nodes expanded: {ucs.nodes_expanded}")
print(f"\nActions: {solution}")

# Visualize
terrain_problem.visualize(solution, ucs.exploration_order, ucs.cost_when_expanded)

## 6. UCS vs BFS Comparison

Let's compare how UCS and BFS handle the same problem:

In [None]:
from collections import deque

class BFSForComparison:
    """BFS that ignores costs"""
    
    def __init__(self):
        self.nodes_expanded = 0
        self.exploration_order = []
    
    def search(self, problem):
        start = problem.get_start_state()
        if problem.is_goal_state(start):
            return [], 0
        
        frontier = deque([(start, [], 0)])
        explored = set()
        
        while frontier:
            state, path, cost = frontier.popleft()
            
            if state in explored:
                continue
            
            explored.add(state)
            self.nodes_expanded += 1
            self.exploration_order.append(state)
            
            for successor, action, step_cost in problem.get_successors(state):
                if successor not in explored:
                    if problem.is_goal_state(successor):
                        return path + [action], cost + step_cost
                    frontier.append((successor, path + [action], cost + step_cost))
        
        return None, float('inf')

# Run BFS for comparison
bfs = BFSForComparison()
bfs_solution, bfs_cost = bfs.search(terrain_problem)

# Calculate actual cost of BFS path
def calculate_path_cost(problem, actions):
    """Calculate total cost of a path"""
    if not actions:
        return 0
    
    total_cost = 0
    current = problem.get_start_state()
    
    for action in actions:
        # Find the successor matching this action
        for successor, act, cost in problem.get_successors(current):
            if act == action:
                total_cost += cost
                current = successor
                break
    
    return total_cost

bfs_actual_cost = calculate_path_cost(terrain_problem, bfs_solution)

# Compare results
print("\n" + "=" * 50)
print("COMPARISON: UCS vs BFS")
print("=" * 50)
print(f"{'Algorithm':<15} {'Path Length':<15} {'Path Cost':<15} {'Nodes Expanded'}")
print("-" * 60)
print(f"{'UCS':<15} {len(solution):<15} {cost:<15.1f} {ucs.nodes_expanded}")
print(f"{'BFS':<15} {len(bfs_solution):<15} {bfs_actual_cost:<15.1f} {bfs.nodes_expanded}")
print(f"\nCost difference: {bfs_actual_cost - cost:.1f} (BFS is {(bfs_actual_cost/cost - 1)*100:.1f}% more expensive)")

## 7. Real-World Applications

### Example 1: Navigation with Traffic

In [None]:
class CityNavigationProblem(SearchProblem):
    """City navigation with time-based costs"""
    
    def __init__(self, city_graph, start, goal, traffic_multipliers=None):
        """
        city_graph: dict mapping location -> [(neighbor, base_time)]
        traffic_multipliers: dict mapping (from, to) -> multiplier
        """
        self.graph = city_graph
        self.start = start
        self.goal = goal
        self.traffic = traffic_multipliers or {}
    
    def get_start_state(self):
        return self.start
    
    def is_goal_state(self, state):
        return state == self.goal
    
    def get_successors(self, state):
        successors = []
        
        for neighbor, base_time in self.graph.get(state, []):
            # Apply traffic multiplier if exists
            multiplier = self.traffic.get((state, neighbor), 1.0)
            actual_time = base_time * multiplier
            
            action = f"{state}->{neighbor}"
            successors.append((neighbor, action, actual_time))
        
        return successors

# Create a simple city network
city_graph = {
    'Home': [('A', 10), ('B', 15)],
    'A': [('C', 12), ('D', 8), ('Home', 10)],
    'B': [('C', 5), ('E', 20), ('Home', 15)],
    'C': [('Work', 15), ('A', 12), ('B', 5)],
    'D': [('Work', 25), ('A', 8)],
    'E': [('Work', 10), ('B', 20)],
    'Work': []
}

# Simulate traffic (rush hour)
traffic = {
    ('A', 'C'): 2.5,  # Heavy traffic
    ('C', 'Work'): 2.0,  # Heavy traffic
    ('D', 'Work'): 1.5,  # Moderate traffic
}

# Test with and without traffic
print("=" * 50)
print("CITY NAVIGATION EXAMPLE")
print("=" * 50)

for traffic_data, label in [(None, "Normal"), (traffic, "Rush Hour")]:
    city_problem = CityNavigationProblem(city_graph, 'Home', 'Work', traffic_data)
    ucs = UniformCostSearch()
    solution, time = ucs.search(city_problem)
    
    print(f"\n{label} Traffic:")
    print(f"  Best route: {' -> '.join([action.split('->')[0] for action in solution])} -> Work")
    print(f"  Total time: {time:.1f} minutes")
    print(f"  Nodes explored: {ucs.nodes_expanded}")

### Example 2: Resource-Constrained Path Planning

In [None]:
class FuelConstrainedProblem(SearchProblem):
    """Path planning with fuel constraints"""
    
    def __init__(self, graph, start, goal, initial_fuel, fuel_costs, fuel_stations):
        self.graph = graph
        self.start = start
        self.goal = goal
        self.initial_fuel = initial_fuel
        self.fuel_costs = fuel_costs  # Fuel needed for each edge
        self.fuel_stations = fuel_stations  # Locations where we can refuel
    
    def get_start_state(self):
        # State = (location, fuel_remaining)
        return (self.start, self.initial_fuel)
    
    def is_goal_state(self, state):
        location, fuel = state
        return location == self.goal
    
    def get_successors(self, state):
        location, fuel = state
        successors = []
        
        # Option 1: Refuel if at a station
        if location in self.fuel_stations and fuel < self.initial_fuel:
            # Refueling costs time (5 units)
            successors.append(
                ((location, self.initial_fuel), 'REFUEL', 5)
            )
        
        # Option 2: Move to neighbors
        for neighbor, distance in self.graph.get(location, []):
            fuel_needed = self.fuel_costs.get((location, neighbor), distance)
            
            if fuel >= fuel_needed:
                new_fuel = fuel - fuel_needed
                action = f"GO:{location}->{neighbor}"
                successors.append(
                    ((neighbor, new_fuel), action, distance)
                )
        
        return successors

# Create a graph with fuel constraints
fuel_graph = {
    'Start': [('A', 30), ('B', 50)],
    'A': [('C', 40), ('D', 20)],
    'B': [('D', 60), ('E', 30)],
    'C': [('Goal', 50)],
    'D': [('Goal', 70), ('C', 25)],
    'E': [('Goal', 40)],
    'Goal': []
}

# Fuel costs (some routes are more fuel-efficient)
fuel_costs = {
    ('Start', 'A'): 25,  # Efficient route
    ('Start', 'B'): 60,  # Inefficient route
    ('A', 'C'): 35,
    ('A', 'D'): 20,
    ('B', 'D'): 50,
    ('B', 'E'): 25,
    ('C', 'Goal'): 45,
    ('D', 'Goal'): 60,
    ('D', 'C'): 20,
    ('E', 'Goal'): 35
}

# Test with different fuel capacities
print("\n" + "=" * 50)
print("FUEL-CONSTRAINED PATH PLANNING")
print("=" * 50)

for fuel_capacity in [100, 80, 60]:
    fuel_problem = FuelConstrainedProblem(
        fuel_graph, 'Start', 'Goal', 
        fuel_capacity, fuel_costs, 
        fuel_stations={'A', 'D'}  # Can refuel at A and D
    )
    
    ucs = UniformCostSearch()
    solution, cost = ucs.search(fuel_problem)
    
    print(f"\nFuel capacity: {fuel_capacity}")
    if solution:
        print(f"  Solution found! Cost: {cost}")
        print(f"  Actions: {solution}")
    else:
        print(f"  No solution possible with this fuel capacity")

## 8. UCS Optimizations

### Optimization 1: Early Goal Test

In [None]:
class OptimizedUCS(UniformCostSearch):
    """UCS with early goal test optimization"""
    
    def search(self, problem: SearchProblem) -> Tuple[List[str], float]:
        start_state = problem.get_start_state()
        
        if problem.is_goal_state(start_state):
            return [], 0
        
        frontier = PriorityQueue()
        frontier.push(start_state, 0, [])
        best_costs = {start_state: 0}
        explored = set()
        
        while not frontier.is_empty():
            state, cost, path = frontier.pop()
            
            # Early goal test - can save expansions
            if problem.is_goal_state(state):
                return path, cost
            
            if state in explored:
                continue
                
            explored.add(state)
            self.nodes_expanded += 1
            
            for successor, action, step_cost in problem.get_successors(state):
                if successor not in explored:
                    new_cost = cost + step_cost
                    if successor not in best_costs or new_cost < best_costs[successor]:
                        best_costs[successor] = new_cost
                        frontier.push(successor, new_cost, path + [action])
        
        return None, float('inf')

# Compare standard vs optimized
print("=" * 50)
print("UCS OPTIMIZATION COMPARISON")
print("=" * 50)

standard_ucs = UniformCostSearch()
optimized_ucs = OptimizedUCS()

# Run on terrain problem
sol1, cost1 = standard_ucs.search(terrain_problem)
sol2, cost2 = optimized_ucs.search(terrain_problem)

print(f"Standard UCS:  Nodes expanded: {standard_ucs.nodes_expanded}, Cost: {cost1}")
print(f"Optimized UCS: Nodes expanded: {optimized_ucs.nodes_expanded}, Cost: {cost2}")
print(f"Savings: {standard_ucs.nodes_expanded - optimized_ucs.nodes_expanded} nodes")

## 9. Practice Exercises

### Exercise 1: Dijkstra's Algorithm
Implement Dijkstra's algorithm (UCS that finds shortest paths to all nodes):

In [None]:
class DijkstraAlgorithm:
    """
    TODO: Implement Dijkstra's algorithm
    - Find shortest paths from start to ALL reachable nodes
    - Return dictionary of {node: (cost, path)}
    """
    
    def find_all_paths(self, problem: SearchProblem) -> Dict:
        # TODO: Implement Dijkstra's algorithm
        # Hint: Similar to UCS but don't stop at goal
        pass

# Test your implementation
# dijkstra = DijkstraAlgorithm()
# all_paths = dijkstra.find_all_paths(terrain_problem)
# for state, (cost, path) in all_paths.items():
#     print(f"{state}: cost={cost}, path_length={len(path)}")

### Exercise 2: Multi-Goal UCS
Modify UCS to find the nearest of multiple goals:

In [None]:
class MultiGoalUCS:
    """
    TODO: Implement UCS for multiple goals
    - Find the shortest path to ANY of the goals
    - Return which goal was reached
    """
    
    def search(self, problem: SearchProblem, goals: Set) -> Tuple[Any, List[str], float]:
        # TODO: Return (goal_reached, path, cost)
        pass

### Exercise 3: Bounded Cost Search
Find all paths within a cost bound:

In [None]:
class BoundedCostSearch:
    """
    TODO: Find all states reachable within a given cost bound
    """
    
    def find_reachable(self, problem: SearchProblem, max_cost: float) -> Set:
        # TODO: Return set of all states reachable with cost <= max_cost
        pass

## 10. Key Takeaways

### UCS Properties:
✅ **Optimal**: Always finds least-cost path  
✅ **Complete**: Will find solution if one exists  
✅ **Systematic**: Explores in order of increasing cost  
❌ **Memory intensive**: Must store entire frontier  
❌ **No heuristic**: Explores in all directions

### When to Use UCS:
- When you need the **optimal solution**
- When costs are **non-uniform**
- When you have **enough memory**
- When the problem is **not too large**

### Relationship to Other Algorithms:
- **BFS**: Special case of UCS with uniform costs
- **Dijkstra**: UCS that finds paths to all nodes
- **A***: UCS with a heuristic (next notebook!)

## Next Steps
Next notebook: A* Search - Adding heuristics for dramatic speedups!