# A* algorithm

Given that we have structured our states, we can apply the A* algorithm. We require a
- generate_neighbours function, which sets both the path cost $f$ and remaining estimated cost $h$
- boolean method to determine whether a state is a target 

In [1]:
import copy
from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, Set, Self, Dict

T = TypeVar("T")

@dataclass
class Path:
    state: T
    previous_state: Optional[Self] = None
    message: Optional[str] = None
    f: float = 0
    h: float = float("inf")

    @property
    def id(self):
        return self.state.id

class AbstractAStarAlgorithm(ABC, Generic[T]):
    # https://en.wikipedia.org/wiki/A*_search_algorithm

    def __init__(self, initial_configuration: T):
        self.next_paths: Dict[int, Path] = {
            initial_configuration.id: Path(state=initial_configuration)
        }
        self.explored_states: Dict[int, T] = dict()

    @abstractmethod
    def neighbours(self, path: Path) -> Set[Path]:
        pass

    @abstractmethod
    def is_target_state(self, path: Path) -> bool:
        pass

    def add_new_path(self, path: Path) -> None:
        if path.id in self.explored_states.keys():
            self.explored_states[path.id] = min(
                [self.explored_states[path.id], path],
                key = lambda p: p.f
            )
        if path.id in self.next_paths.keys():
            # Keep the path leading to the current node, with the least distance travelled
            self.next_paths[path.id] = min(
                [self.next_paths[path.id], path],
                key = lambda p: p.f
            )
        else: self.next_paths[path.id] = self.explored_states[path.id]
                
    def explore_next_path(self) -> None:

        current_path = min(self.next_paths.values(), key=lambda pp: pp.f + pp.h)
        
        new_paths = self.neighbours(current_path)
        for path in new_paths:
            self.add_new_path(path)
        
        self.explored_states[current_path.id] = current_path
        del self.next_paths[current_path.id]

    def run_astar_algorithm(self):
        
        while True:
            
            is_target_reached = len([
                path
                for path
                in self.next_paths.values()
                if self.is_target_state(path)
            ]) > 0 
            if is_target_reached:
                return next(path for path in self.next_paths.values() if self.is_target_state(path))
            
            assert self.next_paths, "Exhausted all paths without reaching target"
            
            self.explore_next_path()

# Application: service migration

We want to migrate our machines and apps to a new system. To do so we consider having:
- $N$ machines, each having different CPU/Memory abilities.
- $A$ apps, which have distinct CPU/memory requirements and can run in any subset of the $N$ machines
- A source and target cluster manager. At first All physical machines are running in the source, and we want the all to run in the target cluster.

In [2]:
from dataclasses import dataclass
from typing import Set

@dataclass(frozen=True)
class Machine:
    name: str
    cpu: int
    memory: int
    migrated: bool = False
    
    @property 
    def id(self):
        return self.__hash__()

@dataclass(frozen=True)
class App:
    name: str
    cpu: int
    memory: int
    machines: Set[int]
    
    @property 
    def id(self):
        return self.__hash__()

In [3]:
@dataclass(frozen=True)
class State:
    machines: Dict[int, Machine]
    apps: Dict[int, App]
    
    @property 
    def id(self):
        return self.__hash__()
    
    # This method is good enough for finding solutions, but better slack functions exist
    def minimum_slack_for_an_app(self) -> float:
        minimum_cpu_slack = float("inf")
        minimum_memory_slack = float("inf")
        
        for app_id, app in self.apps:
            
            cpu_slack = 0
            memory_slack = 0
            
            for machine in app.machines.values():
                cpu_needed = sum([app.cpu for app in self.apps if machine in app.machines])
                cpu_slack += machine.cpu - cpu_needed
                memory_needed = sum([app.memory for app in self.apps if machine in app.machines])
                memory_slack += machine.cpu - memory_needed

            minimum_cpu_slack = min(minimum_cpu_slack, cpu_slack)
            minimum_memory_slack = min(minimum_memory_slack, memory_slack)
            
        return min(minimum_cpu_slack, minimum_memory_slack)


    def n_machines_to_migrate(self) -> int:
        return len([m for m in self.machines.values() if not m.migrated])
    
    def least_number_of_apps_in_non_migrated_machine(self) -> float:
        assert len([m for m in self.machines.values() if m.migrated==False]), "All machines have been migrated"
        return min([
            len([app for app in self.apps.values() if machine.id in app.machines.keys()])
            for machine
            in self.machines.values()
            if machine.migrated==False
        ])

In [4]:
import itertools

class MigratePlan(AbstractAStarAlgorithm[State]):
    
    def neighbours(self, path: Path) -> Set[Path]:
        next_paths = set()
        
        # Migrate machines that have no running apps
        for machine_id, machine in path.state.machines:
            
            running_apps = [
                app for app
                in path.state.apps.values()
                if machine_id not in app.machines.keys()
            ]
            
            if not running_apps:
                new_state = copy.deepcopy(path.state)
                new_state.machines[machine_id] = Machine(
                    name=machine.name,
                    cpu=machine.cpu,
                    memory=machine.memory,
                    migrated=True
                )
                next_paths.add(Path(
                    state= new_state,
                    previous_state=path.state,
                    message=f"Migrate machine {machine.name}",
                    f=path.f,
                    h=path.state.h - 1,
                ))

        for app_id, machine_id in itertools.product(path.state.apps.keys(), path.state.machines.keys()):
            
            # Remove app if in machine
            if machine_id in path.state.apps[app_id].machines.keys():
                new_state = copy.deepcopy(path.state)
                new_state.apps[app_id] = App(
                    name=new_state.apps[app_id].name,
                    cpu=new_state.apps[app_id].cpu,
                    memory=new_state.apps[app_id].memory,
                    machines=new_state.apps[app_id].machines - {machine_id}
                )
                next_paths.add(Path(
                    state= new_state,
                    previous_state=path.state,
                    message=f"Remove app {new_state.apps[app_id].name} from machine {new_state.machines[machine_id].name}",
                    f=path.f + .01,
                    h=new_state.n_machines_to_migrate(),
                ))
                
            # Add app if not in machine
            else:
                new_state = copy.deepcopy(path.state)
                new_state.apps[app_id] = App(
                    name=new_state.apps[app_id].name,
                    cpu=new_state.apps[app_id].cpu,
                    memory=new_state.apps[app_id].memory,
                    machines=new_state.apps[app_id].machines + {machine_id}
                )
                next_paths.add(Path(
                    state= new_state,
                    previous_state=path.state,
                    message=f"Remove app {new_state.apps[app_id].name} from machine {new_state.machines[machine_id].name}",
                    f=path.f + .01,
                    h=new_state.n_machines_to_migrate(),
                ))
                
        return next_paths
                
    def is_target_state(self, path: Path) -> bool:
        return all([machine.migrated for machine in path.state.machines])

# Example Run

In [8]:
machines = [
    Machine(name="M1", cpu=5, memory=6, migrated=False),
    Machine(name="M2", cpu=3, memory=4, migrated=False),
    Machine(name="M3", cpu=2, memory=2, migrated=False),
]
apps = [
    App(name="A1", cpu=1, memory=1, machines=frozenset(machines[0:2])),
    App(name="A2", cpu=1, memory=1, machines=frozenset(machines[2:3])),
]
state = State(
    machines={m.id: m for m in machines},
    apps={a.id:a for a in apps},
)

In [9]:
mp = MigratePlan(initial_configuration=state)

TypeError: unhashable type: 'dict'

In [10]:
mp.run_astar_algorithm()

TypeError: cannot unpack non-iterable Machine object

In [None]:
[] == []