Planen: ./../../libMultiRobotPlanning-master/build/ecbs -i test.yaml -o output.yaml

Visualisieren:
python3 ../../libMultiRobotPlanning-master/example/visualize.py test.yaml output.yaml

Basic Info:
[North, East, South, West]
DO_NOTHING= 0 MOVE_FORWARD= 2 MOVE_LEFT= 1 MOVE_RIGHT= 3 STOP_MOVING= 4

In [1]:
import yaml
import random
import sys
import time
from typing import Optional, List, Dict
import subprocess

import numpy as np

from flatland.core.env import Environment
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid_utils import coordinate_to_position
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import complex_rail_generator
from flatland.envs.schedule_generators import complex_schedule_generator
from flatland.utils.ordered_set import OrderedSet
from flatland.utils.rendertools import RenderTool
from flatland.core.grid.grid4_utils import get_new_position

from flatland.utils.misc import str2bool


random.seed(100)
np.random.seed(100)

direction_to_str = {0: "North", 1: "East", 2: "South", 3: "West"}

In [2]:
class FirstStepsAgent:

    def __init__(self, schedule, handle):
        self.step = 0
        self.iterations = 100
        
        self.schedule = schedule
        self.handle = handle
        
        self.computed_schedule = []
        self.start_own_schedule = False
        
        

    def act(self, obs):
        """
        :param state: input is the observation of the agent
        :return: returns an action
        """
        
        # "Ignore" the first step and just start with a forward move
        if self.step == 0:
            self.step += 1
            return 2

        
        # If the count of steps is higher than the schedule, the agent is at the goal
        if self.step >= len(self.schedule):
            return 0
        
        
        # Get the position and the direction of the agent based on the observation
        pos = obs.position
        direction = obs.direction
        
        # The Agent has already reached the goal, maybe by compuing the schedule
        if pos == None:
            print(f"<{self.handle}>, has reached the target!")
            self.step = len(self.schedule)
            return 0
        
        
        # If there was an position, which wasn't reachable, then use the computed action path
        if len(self.computed_schedule) > 0 and self.start_own_schedule:
            
            print(self.computed_schedule)
            return self.computed_schedule.pop(0)
    
        # We have reached the end of the schedule and need to increase the steps by one
        elif len(self.computed_schedule) == 0 and self.start_own_schedule:
            self.start_own_schedule = False
            self.step += 1
        
        
        # Get the the calculated position where the agent has to move
        goal = (self.schedule[self.step]['x'], self.schedule[self.step]['y'])


        # Try one iteration to get to the calculated position
        result = a_star(self.handle, pos, direction, goal, iterations=1)
        
        
        # DEBUG
        print(f"{result}, from: {pos}, to: {goal}")

        

        # The calculated scheudule is really bad, so it can be that the next position isn't reachable in one iteration
        if result.status == A_Star_result.fail:
            print(f"<{self.handle}>, trys more iterations...")
        
            
            for i in range(2, len(self.schedule[self.step:])):
                # Try to get to the next position in the calculated scheudule
                self.step += 1
                
                goal = (self.schedule[self.step]['x'], self.schedule[self.step]['y'])
                
                # Increase the count of iterations to eventually reach the end
                result = a_star(self.handle, pos, direction, goal, iterations= i ** i)
                
                if result.status == A_Star_result.succes:
                    
                    print(f"<{self.handle}>, found the Solution Path: {result.actions} to {goal}")
                    
                    self.start_own_schedule = True
                    self.computed_schedule = result.actions[1:]
                    
                    print(result)
                    
                    return result.get_n_action(0)


            else:
                print(f"Couln't find a Solution to a calculated traget!")
                

        elif result.is_goal():
            print(f"<{self.handle}>, has reached the End!")
            self.step = len(self.schedule)
            return 0


        self.step += 1

        return result.get_n_action(0)


In [3]:
class Observation():
    def __init__(self, position, direction, target):
        self.position = position 
        self.direction = direction
        self.target = target

class ObservePredictions(ObservationBuilder):

    def __init__(self):
        super().__init__()

    def reset(self):
        pass

    def get_many(self, handles: Optional[List[int]] = None) -> Dict[int, np.ndarray]:
        observations = {}
        

        # Collect all the different observation for all the agents
        for h in handles:
            observations[h] = self.get(h)

        return observations

    def get(self, handle: int = 0) -> np.ndarray:
        
        agent = self.env.agents[handle]
        
        # self.env.dev_pred_dict[handle] = get_computed_path(handle)
        
        return Observation(agent.position, agent.direction, agent.target)

    def set_env(self, env: Environment):
        super().set_env(env)

In [10]:
render = True

try:
    
    
    # Pass the Predictor to the observation builder
    custom_obs_builder = ObservePredictions()

    # Initiate Environment
    env = RailEnv(width=10, height=15,
                  rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=1, min_dist=8, max_dist=99999,
                                                        seed=1), schedule_generator=complex_schedule_generator(),
                  number_of_agents=3, obs_builder_object=custom_obs_builder)

    obs, info = env.reset()
    env_renderer = RenderTool(env, screen_width=2000, screen_height=2000)

    # We render the initial step
    if render : env_renderer.render_env(show=True, frames=False, show_observations=False, show_predictions=False)
        
        
    # Calculate all the schedule of the Agents
    schedules = calc_schedules(env)["schedule"]
        
        
    # Initialize the agent    
    agent = [FirstStepsAgent(schedules["agent" + str(i)], i) for i in range(env.get_num_agents())]
    
    
    # Empty action dictionary which has the predicted actions in it for each step
    action_dict = dict()
    
    
    
    # For Loop with all the steps predicted by the agent
    for step in range(25):
        
        for a in range(env.get_num_agents()):
        
            action_dict[a] = agent[a].act(obs[a])
        
        print()
        # Do the actual step in the Enviroment based on the action_dict computed previously 
        obs, all_rewards, done, info = env.step(action_dict)
        
        for handle, action in action_dict.items():
            print(f"<{handle}> Action: |{action}|, Position: {env.agents[handle].position}, Target: {env.agents[handle].target}, Direction: {direction_to_str[env.agents[handle].direction]}")
        
    

        print("Rewards: ", all_rewards, "  [done=", done, "]", end="\n" + 50 * "-" + "\n\n")
        
        if render: env_renderer.render_env(show=True, frames=False, show_observations=False, show_predictions=True)
            
        if render: time.sleep(1)

        if done["__all__"]:
            print("All done!")
            break
            
finally:
    if render : env_renderer.close_window()


open_window - pyglet

<0> Action: |2|, Position: (13, 0), Target: (4, 2), Direction: South
<1> Action: |2|, Position: (0, 2), Target: (7, 6), Direction: North
<2> Action: |2|, Position: (5, 9), Target: (10, 2), Direction: East
Rewards:  {0: -1.0, 1: -1.0, 2: -1.0}   [done= {0: False, 1: False, 2: False, '__all__': False} ]
--------------------------------------------------

Succes! actions: [2], positions: [(12, 0)], from: (13, 0), to: (12, 0)
Succes! actions: [2], positions: [(1, 2)], from: (0, 2), to: (1, 2)
Succes! actions: [2], positions: [(5, 8)], from: (5, 9), to: (5, 8)

<0> Action: |2|, Position: (12, 0), Target: (4, 2), Direction: North
<1> Action: |2|, Position: (1, 2), Target: (7, 6), Direction: South
<2> Action: |2|, Position: (5, 8), Target: (10, 2), Direction: West
Rewards:  {0: -1.0, 1: -1.0, 2: -1.0}   [done= {0: False, 1: False, 2: False, '__all__': False} ]
--------------------------------------------------

Succes! actions: [2], positions: [(11, 0)], from: (12, 0), t

TypeError: exceptions must derive from BaseException

In [57]:
schedules["agent1"]

[{'x': 0, 'y': 2, 't': 0},
 {'x': 1, 'y': 2, 't': 1},
 {'x': 2, 'y': 2, 't': 2},
 {'x': 3, 'y': 2, 't': 3},
 {'x': 4, 'y': 2, 't': 4},
 {'x': 5, 'y': 2, 't': 5},
 {'x': 5, 'y': 3, 't': 6},
 {'x': 6, 'y': 3, 't': 7},
 {'x': 7, 'y': 3, 't': 8},
 {'x': 7, 'y': 4, 't': 9},
 {'x': 7, 'y': 5, 't': 10},
 {'x': 7, 'y': 6, 't': 11}]

In [5]:
custom_obs_builder = ObservePredictions()

env = RailEnv(width=10, height=15,
                  rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=1, min_dist=8, max_dist=99999,
                                                        seed=1), schedule_generator=complex_schedule_generator(),
                  number_of_agents=3, obs_builder_object=custom_obs_builder)

obs, info = env.reset()

env_renderer = RenderTool(env, screen_width=2000, screen_height=2000)
env_renderer.render_env(show=True, frames=False, show_observations=False, show_predictions=False)


open_window - pyglet


In [4]:
def calc_schedules(env):
    agents = []

    # Map the Coordinates from Top left Origin ---> Bottom left Origin
    # return [position[1] , env.height - 1 - position[0]]
    
    
    def calc_coord(position):
        return [position[0] , position[1]]


    for handle, a in enumerate(env.agents):
        agent = {"goal" : calc_coord(a.target), "name" : "agent" + str(handle), "start" : calc_coord(a.initial_position)}

        agents.append(agent)


    obstacles = []
    for y in range(env.width):
        for x in range(env.height):
            if not True in env.get_valid_directions_on_grid(x, y):
                obstacles.append(calc_coord((x, y)))

    res_dict = {"agents" : agents, "map" : {"dimensions" : [env.height, env.width], "obstacles" : obstacles}}


    f = open("test.yaml", "w")
    yaml.dump(res_dict, f)
    f.close()


    subprocess.run(["./../../libMultiRobotPlanning-master/build/ecbs",  "-i",  "test.yaml" , "-o", "output.yaml"], check=True)

    with open("output.yaml") as output_file:
          yaml_out = yaml.load(output_file, Loader=yaml.FullLoader)
            
    return yaml_out

In [5]:

Debug = False

# Position, action, cost, direction
def calc_next_cell(position, direction, handle):
    
    possible_transitions = env.rail.get_transitions(*position, direction)

    if all(d == 0 for d in possible_transitions):
        print("This Direction is not permissable!")
        return -1
    
    next_cells = []
    # Loop trough all the possible dirrections the agent can reach from current direction
    for d in [(direction + i) % 4 for i in range(-1, 2)]:
        
        if possible_transitions[d]:
            
            # Die neue Position, wenn man die jeweilige direction 
            new_position = get_new_position(position, d)
            
            
            # Die Distanz von einer Position zum Ziel des jeweiligen Agenten
            dist = env.distance_map.get()[handle, new_position[0], new_position[1], d]
            
            # Check the given directions and map it to the corresponding action
            if d == direction:
                if Debug: print(f"Action forward, to: {new_position}, dist: {dist}")
                next_cells.append((new_position, 2, dist, d))
                
            
            elif (d + 1) % 4 == direction:
                if Debug: print(f"Action left, to: {new_position}, dist: {dist}")
                next_cells.append((new_position, 1, dist, d))
                
            elif (d - 1) % 4 == direction:
                if Debug: print(f"Action right, to: {new_position}, dist: {dist}") 
                next_cells.append((new_position, 3, dist, d))
    
    else:
    
        # Check if the transition is an dead End
        if possible_transitions[(direction + 2) % 4] == 1:
            direction = (direction + 2) % 4

            # Die neue Position, wenn man die jeweilige direction 
            new_position = get_new_position(position, direction)


            # Die Distanz von einer Position zum Ziel des jeweiligen Agenten
            dist = env.distance_map.get()[handle, new_position[0], new_position[1], direction]

            if Debug: print(f"Dead End, to: {new_position}, dist: {dist}")
            next_cells.append((new_position, 2, dist, direction))
    
    return next_cells
            

for i in range(4):
    print(direction_to_str[i], i)
    print(calc_next_cell((2, 1), i, 2))
    print("-" * 50 + "\n")

North 0


NameError: name 'env' is not defined

In [6]:
import heapq



class A_Star_result():
    succes = 1
    fail = 2
    
    def __init__(self, status, actions=[], positions=[], iterations=-1):
        self.status = status
        self.actions = actions
        self.positions = positions
        self.iterations = iterations

    def __repr__(self):
        if self.status == A_Star_result.succes:
            return f"Succes! actions: {self.actions}, positions: {self.positions}"
        else:
            return f"Failed in {self.iterations} iterations..."
        
    def is_goal(self):
        return len(self.actions) == 0
    
    def get_n_action(self, n):
        if n >= len(self.actions):
            raise "Error: Trying to get acces to an action with an undifinded timestep!"
            
        return self.actions[n]
    
    


class Node:
    def __init__(self, position, parent, action, cost, direction):
        self.position = position
        self.parent = parent
        self.action = action
        self.cost = cost
        self.direction = direction
        
        self.state = (position, direction)
    
    def __lt__(self, other):
        return self.cost < other.cost
    
    def __repr__(self):
        return f"|p: {self.position}, a: {self.action}, c: {self.cost}|"
        
def make_root_node(postion, direction):
    return Node(postion, None, None, 0, direction)

def make_node(position, parent, action, cost, direction):
    return Node(position, parent, action, parent.cost + cost, direction)

def extract_solution(node):
    sol = []
    sol_pos = []
    while node.parent is not None:
        sol.append(node.action)
        sol_pos.append(node.position)
        node = node.parent
    
    return (sol[::-1], sol_pos[::-1])



def a_star(handle, position, direction, goal_position, iterations=100):
    open_list = []
    heapq.heappush(open_list, make_root_node(position, direction))
    
    closed_list = []
    distance = {}
    
    i = 0
    while len(open_list) > 0 and i <= iterations:
        
        node = heapq.heappop(open_list)
        
        if node.state not in closed_list or node.cost < distance[node.state]:
            
            closed_list.append(node.state)
            distance[node.state] = node.cost
            
            if node.position == goal_position:
                res = extract_solution(node)
                return A_Star_result(A_Star_result.succes, res[0], res[1])
            
            succ = calc_next_cell(node.position, node.direction, handle)
            
            for n in succ:
                heapq.heappush(open_list, make_node(n[0], node, n[1], n[2], n[3]))
        
        
        
        i += 1  # Make sure the Iterations are not that big
                
    return A_Star_result(A_Star_result.fail, iterations=i)

# Position, action, direction, goal position
a_star(2, (7, 5), 2, (8, 3))
# a_star(2, (2, 0), 3, (2, 4))

NameError: name 'env' is not defined

In [65]:
# Initiate Environment
env = RailEnv(width=10, height=15,
                  rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=1, min_dist=8, max_dist=99999,
                                                        seed=1), schedule_generator=complex_schedule_generator(),
                  number_of_agents=3, obs_builder_object=custom_obs_builder)

obs, info = env.reset()
env.agents[0]


EnvAgent(initial_position=(13, 0), initial_direction=2, direction=2, target=(4, 2), moving=False, speed_data={'position_fraction': 0.0, 'speed': 1.0, 'transition_action_on_cellexit': 0.0}, malfunction_data={'malfunction': 0, 'malfunction_rate': 0.0, 'next_malfunction': 0, 'nr_malfunctions': 0, 'moving_before_malfunction': False}, handle=0, status=<RailAgentStatus.READY_TO_DEPART: 0>, position=None, old_direction=None, old_position=None)

In [7]:
S = calc_schedules(env)["schedule"]



def get_computed_path(handle):
    global S

    s = S["agent" + str(handle)]
    
    position_dict = []
    for t in range(len(s)):
        x, y, _ = s[t].values()

        position_dict.append((x, y))

    return(position_dict)


NameError: name 'env' is not defined

In [156]:
S = calc_schedules(env)["schedule"]
S

{'agent0': [{'x': 13, 'y': 0, 't': 0},
  {'x': 12, 'y': 0, 't': 1},
  {'x': 11, 'y': 0, 't': 2},
  {'x': 10, 'y': 0, 't': 3},
  {'x': 9, 'y': 0, 't': 4},
  {'x': 8, 'y': 0, 't': 5},
  {'x': 7, 'y': 0, 't': 6},
  {'x': 6, 'y': 0, 't': 7},
  {'x': 6, 'y': 1, 't': 8},
  {'x': 5, 'y': 1, 't': 9},
  {'x': 5, 'y': 2, 't': 10},
  {'x': 4, 'y': 2, 't': 11}],
 'agent1': [{'x': 0, 'y': 2, 't': 0},
  {'x': 1, 'y': 2, 't': 1},
  {'x': 2, 'y': 2, 't': 2},
  {'x': 3, 'y': 2, 't': 3},
  {'x': 4, 'y': 2, 't': 4},
  {'x': 5, 'y': 2, 't': 5},
  {'x': 5, 'y': 3, 't': 6},
  {'x': 6, 'y': 3, 't': 7},
  {'x': 7, 'y': 3, 't': 8},
  {'x': 7, 'y': 4, 't': 9},
  {'x': 7, 'y': 5, 't': 10},
  {'x': 7, 'y': 6, 't': 11}],
 'agent2': [{'x': 5, 'y': 9, 't': 0},
  {'x': 5, 'y': 8, 't': 1},
  {'x': 5, 'y': 7, 't': 2},
  {'x': 5, 'y': 6, 't': 3},
  {'x': 6, 'y': 6, 't': 4},
  {'x': 7, 'y': 6, 't': 5},
  {'x': 7, 'y': 5, 't': 6},
  {'x': 7, 'y': 4, 't': 7},
  {'x': 8, 'y': 4, 't': 8},
  {'x': 9, 'y': 4, 't': 9},
  {'x': 

In [11]:
env.agents[1]
        
        
        
    

EnvAgent(initial_position=(0, 2), initial_direction=0, direction=2, target=(7, 6), moving=False, speed_data={'position_fraction': 0.0, 'speed': 1.0, 'transition_action_on_cellexit': 3}, malfunction_data={'malfunction': 0, 'malfunction_rate': 0.0, 'next_malfunction': 0, 'nr_malfunctions': 0, 'moving_before_malfunction': False}, handle=1, status=<RailAgentStatus.DONE_REMOVED: 3>, position=None, old_direction=1, old_position=None)