In [1]:
'''Grid'''
class Map:

    # 
    def __init__(self):
        '''
        Default constructor
        '''

        self.width = 0
        self.height = 0
        self.cells = []
    

    def ReadFromString(self, cellStr, width, height):
        '''
        Converting a string (with '#' representing obstacles and '.' representing free cells) to a grid
        '''
        self.width = width
        self.height = height
        self.cells = [[0 for _ in range(width)] for _ in range(height)]
        cellLines = cellStr.split("\n")
        i = 0
        j = 0
        for l in cellLines:
            if len(l) != 0:
                j = 0
                for c in l:
                    if c == '.':
                        self.cells[i][j] = 0
                    elif c == '#':
                        self.cells[i][j] = 1
                    else:
                        continue
                    j += 1
                if j != width:
                    raise Exception("Size Error. Map width = ", j, ", but must be", width )
                
                i += 1

        if i != height:
            raise Exception("Size Error. Map height = ", i, ", but must be", height )
    
     
    def SetGridCells(self, width, height, gridCells):
        '''
        Initialization of map by list of cells.
        '''
        self.width = width
        self.height = height
        self.cells = gridCells


    def inBounds(self, i, j):
        '''
        Check if the cell is on a grid.
        '''
        return (0 <= j < self.width) and (0 <= i < self.height)
    

    def Traversable(self, i, j):
        '''
        Check if the cell is not an obstacle.
        '''
        return not self.cells[i][j]


    def GetNeighbors(self, i, j):
        '''
        Get a list of neighbouring cells as (i,j) tuples.
        '''   
        neighbors = [[i, j]]
        delta = [[0, 1], [1, 0], [0, -1], [-1, 0]]

        for d in delta:
            if self.inBounds(i + d[0], j + d[1]) and self.Traversable(i + d[0], j + d[1]):
                neighbors.append((i + d[0], j + d[1]))
        return neighbors

In [20]:
class HighNode:
    '''
    HighNode class represents a high-level search node

    - vertexCons: vertex constraints of the node
    - edgeCons: edge constraints of the node
    - sol: solution of the node
    - g: cost of the solution of the node 
    - h: h-value of the node
    - F: f-value of the node
    - parent: pointer to the parent-node 

    '''

    def __init__(self, vertexCons=[], edgeCons=[], sol=[], g=0, h=0, F=None, parent=None):
        self.vertexCons = vertexCons
        self.edgeCons = edgeCons
        self.sol = sol
        self.g = g
        self.h = h
        if F is None:
            self.F = g + h
        else:
            self.F = F        
        self.parent = parent
    
    
    def __eq__(self, other):
        return (self.cons == other.cons) and (self.sol == other.sol) and (self.g == other.g)
    
    def __lt__(self, other):
        return self.F < other.F or ((self.F == other.F) and (self.h < other.h)) \
        or ((self.F == other.F) and (self.h == other.h))

In [3]:
class LowNode:
    '''
    LowNode class represents a low-level search node

    - i, j: coordinates of corresponding grid element
    - g: g-value of the node 
    - h: h-value of the node
    - F: f-value of the node
    - parent: pointer to the parent-node 
    '''

    def __init__(self, coord, g0, h=0, F=None, parent=None):
        self.i = coord[0]
        self.j = coord[1]
        self.g = g
        self.h = h
        if F is None:
            self.F = g + h
        else:
            self.F = F        
        self.parent = parent
    
    
    def __eq__(self, other):
        return (self.i == other.i) and (self.j == other.j) and (self.g == other.g)
    
    def __lt__(self, other):
        return self.F < other.F or ((self.F == other.F) and (self.h < other.h)) \
        or ((self.F == other.F) and (self.h == other.h))

In [4]:
class MetaAgent:
    '''
    MetaAgent class represents a meta-agent
    
    - agents: set of agents of the meta-agent
    '''
    
    def __init__(self, agents):
        self.agents = agents

In [8]:
class OpenHigh:
    
    def __init__(self):
        self.heap = []
        self.elements = {}
    
    def __iter__(self):
        return iter(self.elements.values())
    
    def __len__(self):
        return len(self.elements)

    def isEmpty(self):
        if len(self.elements) != 0:
            return False
        return True

    def AddNode(self, node : HighNode, *args):
        if self.elements.get((node.cons, node.sol, node.g)) is None or node < self.elements[(node.cons, node.sol, node.g)]:
            self.elements[(node.cons, node.sol, node.g)] = node
            heappush(self.heap, node)
        return

    def GetBestNode(self, CLOSED, *args):
        best = heappop(self.heap)        
        while CLOSED.WasExpanded(best):
            best = heappop(self.heap)
        del self.elements[(best.cons, best.sol, best.g)]
        return best

In [9]:
class OpenLow:
    
    def __init__(self):
        self.heap = []
        self.elements = {}
    
    def __iter__(self):
        return iter(self.elements.values())
    
    def __len__(self):
        return len(self.elements)

    def isEmpty(self):
        if len(self.elements) != 0:
            return False
        return True

    def AddNode(self, node : LowNode, *args):
        if self.elements.get((node.i, node.j, node.g)) is None or node < self.elements[(node.i, node.j, node.g)]:
            self.elements[(node.i, node.j, node.g)] = node
            heappush(self.heap, node)
        return

    def GetBestNode(self, CLOSED, *args):
        best = heappop(self.heap)        
        while CLOSED.WasExpanded(best):
            best = heappop(self.heap)
        del self.elements[(best.i, best.j, best.g)]
        return best

In [16]:
class ClosedHigh:
    
    def __init__(self):
        self.elements = {}


    def __iter__(self):
        return iter(self.elements.values())
    

    def __len__(self):
        return len(self.elements)


    def AddNode(self, node : HighNode):
        self.elements[(node.cons, node.sol, node.g)] = node


    def WasExpanded(self, node : HighNode):
        return not self.elements.get((node.cons, node.sol, node.g)) is None

In [17]:
class ClosedLow:
    
    def __init__(self):
        self.elements = {}


    def __iter__(self):
        return iter(self.elements.values())
    

    def __len__(self):
        return len(self.elements)


    def AddNode(self, node : LowNode):
        self.elements[(node.i, node.j, node.g)] = node


    def WasExpanded(self, node : LowNode):
        return not self.elements.get((node.i, node.j, node.g)) is None

In [12]:
def ManhattanDistance(i1, j1, i2, j2):
    return abs(i1 - i2) + abs(j1 - j2)

In [13]:
def MakePath(goal):
    '''
    Creates a path by tracing parent pointers from the goal node to the start node
    It also returns path's length.
    '''

    length = goal.g
    current = goal
    path = []
    while current.parent:
        path.append(current)
        current = current.parent
    path.append(current)
    return path[::-1], length

In [1]:
class AstarTimesteps:
    def __init__(self, gridMap, iStart, jStart, iGoal, jGoal, vertexCons, edgeCons):
        self.vertexCons = vertexCons
        self.edgeCons = edgeCons
        self.CLOSED = ClosedLow()
        self.gridMap = gridMap
        self.iStart = iStart
        self.jStart = jStart
        self.iGoal = iGoal
        self.jGoal = jGoal
        self.OPEN = OpenLow()
        self.path = []
        
    def CheckMove(self, i1, j1, i2, j2, t):
        for obs in self.vertexCons:
            if obs[2] == t + 1 and obs[1] == (i2, j2):
                return False
        
        for obs in self.edgeCons:
            if obs[3] == t + 1 and obs[1] == (i1, j1) and obs[2] == (i2, j2):
                return False
                
        return True
    
    def FindPath(self):
        startNode = LowNode(coord=(self.iStart, self.jStart))
        self.OPEN.AddNode(startNode)
    
        while not self.OPEN.isEmpty():
            s = self.OPEN.GetBestNode(self.CLOSED) 
            self.CLOSED.AddNode(s)       
            if s.i == self.iGoal and s.j == self.jGoal:
                return (True, s, self.OPEN, self.CLOSED)
            for nbr in self.gridMap.GetNeighbors(s.i, s.j):
                if self.CheckMove(s.i, s.j, nbr[0], nbr[1], s.g) and \
                not self.CLOSED.WasExpanded(LowNode(coord=nbr, g=s.g + 1)):
                    nbrNode = LowNode(coord=nbr, g=s.g + 1, h=ManhattanDistance(nbr[0], nbr[1], self.iGoal, self.jGoal), \
                                      parent=s)
                    self.OPEN.AddNode(nbrNode)
        return (False, None, self.OPEN, self.CLOSED)

In [2]:
def CBS(gridMap, Starts, Goals):
    root = HighNode()
    OPEN = OpenHigh();
    CLOSED = ClosedHigh();
    agentsNum = len(Starts)
    
    for a in range(agentsNum):
        planner = AstarTimesteps(gridMap, Starts[a].i, Starts[a].j, Goals[a].i, Goals[a].j, [])
        result = planner.FindPath()
        if result[0]:
            path = MakePath(result[1])
            root.sol.append(path)
        else:
            return (False, None, OPEN, CLOSED)
    
    OPEN.AddNode(root)
    
    while not OPEN.isEmpty():
        s = OPEN.GetBestNode(CLOSED)
        CLOSED.AddNode(s)       
        newVertexCons = [];
        newEdgeCons = [];
        
        for a in range(agentsNum):
            for b in range(a + 1, agentsNum):
                for step in range(min(len(s.sol[a]), len(s.sol[b]))):
                    if s.sol[a][step].i == s.sol[b][step].i and s.sol[a][step].j == s.sol[b][step].j:
                        newVertexCons.append((a, b, (s.sol[a][step].i, s.sol[a][step].j), step))
                    if s.sol[a][step].i == s.sol[b][step + 1].i and s.sol[a][step].j == s.sol[b][step + 1].j and \
                       s.sol[a][step + 1].i == s.sol[b][step].i and s.sol[a][step + 1].i == s.sol[b][step].i:
                        newEdgeCons.append(
                            a,
                            b,
                            (s.sol[a][step].i, s.sol[a][step].j),
                            (s.sol[a][step + 1].i, s.sol[a][step + 1].j),
                            step,
                        )
                        
        if len(newVertexCons) == 0 and len(newEdgeCons) == 0:
            return s;
        
        # Тут будет merge
        
        # Сейчас сначала разрешаются вершинные конфликты, потом реберные
        if len(newVertexCons) > 0:
            a, b, (i, j), t = newVertexCons[0]
            A = HighNode(
                vertexCons=s.vertexCons + [(a, (i, j), t)],
                edgeCons=s.edgeCons,
                sol=s.sol,
                parent=s,
            )
            
            vertexCons = [con for con in A.vertexCons if con[0] == a]
            edgeCons = [con for con in A.edgeCons if con[0] == a]
            
            planner = AstarTimesteps(gridMap, Starts[a].i, Starts[a].j, Goals[a].i, Goals[a].j, vertexCons, edgeCons)
            result = planner.FindPath()
            if result[0]:
                path = MakePath(result[1])
                A.sol[a] = path
                A.g = sum([len(path) for path in A.sol]) # SIC, можно использовать другой cost; добавить подсчет h, F
                OPEN.AddNode(A)
                
            B = HighNode(
                vertexCons=s.vertexCons + [(b, (i, j), t)],
                edgeCons=s.edgeCons,
                sol=s.sol,
                parent=s,
            )
            
            vertexCons = [con for con in A.vertexCons if con[0] == b]
            edgeCons = [con for con in A.edgeCons if con[0] == b]
            
            planner = AstarTimesteps(gridMap, Starts[b].i, Starts[b].j, Goals[b].i, Goals[b].j, vertexCons, edgeCons)
            result = planner.FindPath()
            if result[0]:
                path = MakePath(result[1])
                B.sol[b] = path
                B.g = sum([len(path) for path in B.sol]) # SIC, можно использовать другой cost; добавить подсчет h, F
                OPEN.AddNode(B)
                
        elif len(newEdgeCons) > 0:
            a, b, (i1, j1), (i2, j2), t = newVertexCons[0]
            A = HighNode(
                vertexCons=s.vertexCons,
                edgeCons=s.edgeCons + [(a, (i1, j1), (i2, j2), t)],
                sol=s.sol,
                parent=s,
            )
            
            vertexCons = [con for con in A.vertexCons if con[0] == a]
            edgeCons = [con for con in A.edgeCons if con[0] == a]
            
            planner = AstarTimesteps(gridMap, Starts[a].i, Starts[a].j, Goals[a].i, Goals[a].j, vertexCons, edgeCons)
            result = planner.FindPath()
            if result[0]:
                path = MakePath(result[1])
                A.sol[a] = path
                A.g = sum([len(path) for path in A.sol]) # SIC, можно использовать другой cost; добавить подсчет h, F
                OPEN.AddNode(A)
                
            B = HighNode(
                vertexCons=s.vertexCons,
                edgeCons=s.edgeCons + [(b, (i1, j1), (i2, j2), t)],
                sol=s.sol,
                parent=s,
            )
            
            vertexCons = [con for con in A.vertexCons if con[0] == b]
            edgeCons = [con for con in A.edgeCons if con[0] == b]
            
            planner = AstarTimesteps(gridMap, Starts[b].i, Starts[b].j, Goals[b].i, Goals[b].j, vertexCons, edgeCons)
            result = planner.FindPath()
            if result[0]:
                path = MakePath(result[1])
                B.sol[b] = path
                B.g = sum([len(path) for path in B.sol]) # SIC, можно использовать другой cost; добавить подсчет h, F
                OPEN.AddNode(B)