In [1]:
import os
import random
import pickle
from typing import Tuple, Union
import warnings
warnings.simplefilter("ignore", UserWarning)
from tqdm import tqdm
import numpy as np
import torch
import torch.multiprocessing as mp
from environment import Environment
from model import Network
import config
import copy
import ollama
from openai import OpenAI
import json

os.environ["OPENAI_API_KEY"] = ""
client = OpenAI()

with open('./test_set/{}_{}agents.pth'.format('warehouse', 32), 'rb') as f:
    tests = pickle.load(f)
env = Environment()
env.load(np.array(tests[0][0]), np.array(tests[0][1]), np.array(tests[0][2]))
obs = env.observe()
obs_agents = env.observe_agents()

# 방향 정의
directiondict = {
    'stay': 4, 'north': 0, 'south': 1, 'west': 2, 'east': 3
}
reverse_directiondict = {v: k for k, v in directiondict.items()}

  @autocast()


In [2]:
env.load(np.array([[1, 1, 1, 1, 1, 1, 1], [1, 1, 1, 0, 1, 1, 1], [1, 0, 0, 0, 0, 0, 1], [1, 1, 1, 0, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1]]), np.array([[2, 1], [2, 2], [2, 3], [2, 4], [2, 5]]), np.array([[2, 5], [2, 4], [2, 3], [2, 2], [2, 1]]))
obs = env.observe()
obs_agents = env.observe_agents()

In [3]:
def get_sorted_agents(agent_groups):
    super_agents = []
    for set in agent_groups:
        agent_super = max(set, key=lambda i: np.sum(np.abs(env.agents_pos[i] - env.goals_pos[i])))
        super_agents.append(agent_super)

    # 각 에이전트와 목표 사이의 거리 계산
    agent_distances = [(agent, np.sum(np.abs(env.agents_pos[agent] - env.goals_pos[agent]))) for agent in super_agents]

    # 거리를 기준으로 내림차순 정렬
    sorted_agents = sorted(agent_distances, key=lambda x: x[1], reverse=True)

    # 정렬된 에이전트 ID 추출
    sorted_agent_groups = [agent for agent, distance in sorted_agents]
    return sorted_agent_groups

In [4]:
# super heuristic 이동 백트래킹 구현

def get_possible_directions(obs, obs_agents, agent_idx, agents_not_exchangeable, agents_fixed):
    directions = []
    directions_pushed_agents = []
    if obs[0][agent_idx][1][3, 4] == 0:
        directions.append('north')
    if obs[0][agent_idx][1][5, 4] == 0:
        directions.append('south')
    if obs[0][agent_idx][1][4, 3] == 0:
        directions.append('west')
    if obs[0][agent_idx][1][4, 5] == 0:
        directions.append('east')

    direction_conditions = [
        ('north', obs_agents[agent_idx][3, 4] - 1),
        ('south', obs_agents[agent_idx][5, 4] - 1),
        ('west', obs_agents[agent_idx][4, 3] - 1),
        ('east', obs_agents[agent_idx][4, 5] - 1)
    ]

    for direction, agent_value in direction_conditions:
        if agent_value in agents_not_exchangeable or agent_value in agents_fixed:
            if direction in directions:
                directions.remove(direction)

    for direction, agent_value in direction_conditions:
        if direction in directions:
            directions_pushed_agents.append((direction, None if agent_value == -1 else agent_value))

    return directions_pushed_agents


def get_possible_directions_super(obs, obs_agents, agent_idx, agents_fixed):
    directions = []
    directions_pushed_agents = []
    if obs[0][agent_idx][2][4, 4] == 1:
        directions.append('north')
    if obs[0][agent_idx][3][4, 4] == 1:
        directions.append('south')
    if obs[0][agent_idx][4][4, 4] == 1:
        directions.append('west')
    if obs[0][agent_idx][5][4, 4] == 1:
        directions.append('east')

    direction_conditions = [
        ('north', obs_agents[agent_idx][3, 4] - 1),
        ('south', obs_agents[agent_idx][5, 4] - 1),
        ('west', obs_agents[agent_idx][4, 3] - 1),
        ('east', obs_agents[agent_idx][4, 5] - 1)
    ]

    for direction, agent_value in direction_conditions:
        if agent_value in agents_fixed:
            if direction in directions:
                directions.remove(direction)

    for direction, agent_value in direction_conditions:
        if direction in directions:
            directions_pushed_agents.append((direction, None if agent_value == -1 else agent_value))

    return directions_pushed_agents


def push_recursive(obs, obs_agents, agent_super, agents_fixed):
    relayed_actions = []
    agents_not_exchangeable = []

    current_agent = agent_super
    depth = 0

    # 스택에 (현재 에이전트, 남은 방향들, depth) 저장
    stack = []

    while True:
        # 가능한 방향들 계산
        if current_agent == agent_super:
            possible_directions = get_possible_directions_super(obs, obs_agents, current_agent, agents_fixed)
        else:
            possible_directions = get_possible_directions(obs, obs_agents, current_agent, agents_not_exchangeable, agents_fixed)

        while not possible_directions:
            if not stack:
                # 백트래킹할 곳이 없으면 종료'
                return [(agent_super, 'stay')]

            # 스택에서 이전 상태로 백트래킹
            last_agent, last_possible_directions, last_depth = stack.pop()

            # 남은 방향이 있다면 그 중 하나를 선택하고 진행
            if last_possible_directions:
                relayed_actions = relayed_actions[:last_depth]  # 이전 선택을 지우고 다시 선택
                current_agent = last_agent
                possible_directions = last_possible_directions
                depth = last_depth
            else:
                # 백트래킹할 방향이 없으면 계속 백트래킹
                possible_directions = []

        # 랜덤으로 가능한 방향 중 하나 선택
        choosen_action = random.choice(possible_directions)
        possible_directions.remove(choosen_action)

        relayed_actions.append((current_agent, choosen_action[0]))

        # 더 이상 밀 에이전트가 없으면 종료
        if choosen_action[1] is None:
            break

        # depth에 따른 agents_not_exchangeable 처리
        if depth == 1:
            agents_not_exchangeable = []
        agents_not_exchangeable.append(current_agent)

        # 스택에 현재 상태를 저장
        stack.append((current_agent, possible_directions, depth))

        # 다음 에이전트를 선택하고 루프를 계속
        current_agent = choosen_action[1]
        depth += 1

    return relayed_actions

In [12]:
push_recursive(obs, obs_agents, 0, [])

[(0, 'east'), (1, 'east'), (2, 'north')]

In [6]:
# 백트래킹 기반 방사형 이동 구현
def get_possible_directions_radiation(obs, obs_agents, center_coordinates, agent_idx, agents_fixed):
    directions = []
    directions_pushed_agents = []
    if obs[0][agent_idx][1][3, 4] == 0:
        directions.append('north')
    if obs[0][agent_idx][1][5, 4] == 0:
        directions.append('south')
    if obs[0][agent_idx][1][4, 3] == 0:
        directions.append('west')
    if obs[0][agent_idx][1][4, 5] == 0:
        directions.append('east')
    
    row_diff = center_coordinates[0] - obs[2][agent_idx][0]
    col_diff = center_coordinates[1] - obs[2][agent_idx][1]

    if row_diff < 0:  # 에이전트가 중앙보다 아래에 있으면 북쪽으로 이동 불가
        if 'north' in directions:
            directions.remove('north')
    elif row_diff > 0:  # 에이전트가 중앙보다 위에 있으면 남쪽으로 이동 불가
        if 'south' in directions:
            directions.remove('south')
    if col_diff < 0:  # 에이전트가 중앙보다 오른쪽에 있으면 서쪽으로 이동 불가
        if 'west' in directions:
            directions.remove('west')
    elif col_diff > 0:  # 에이전트가 중앙보다 왼쪽에 있으면 동쪽으로 이동 불가
        if 'east' in directions:
            directions.remove('east')

    direction_conditions = [
        ('north', obs_agents[agent_idx][3, 4] - 1),
        ('south', obs_agents[agent_idx][5, 4] - 1),
        ('west', obs_agents[agent_idx][4, 3] - 1),
        ('east', obs_agents[agent_idx][4, 5] - 1)
    ]

    for direction, agent_value in direction_conditions:
        if agent_value in agents_fixed:
            if direction in directions:
                directions.remove(direction)

    for direction, agent_value in direction_conditions:
        if direction in directions:
            directions_pushed_agents.append((direction, None if agent_value == -1 else agent_value))

    return directions_pushed_agents

def push_recursive_radiation(obs, obs_agents, center_coordinates, agent_idx, agents_fixed):

    relayed_actions = []
    agents_not_exchangeable = []
    
    current_agent = agent_idx
    depth = 0

    # 스택에 (현재 에이전트, 남은 방향들, depth) 저장
    stack = []

    while True:
        # 가능한 방향들 계산
        if current_agent == agent_idx:
            possible_directions = get_possible_directions_radiation(obs, obs_agents, center_coordinates, current_agent, agents_fixed)
        else:
            possible_directions = get_possible_directions(obs, obs_agents, current_agent, agents_not_exchangeable, agents_fixed)

        while not possible_directions:
            if not stack:
                # 백트래킹할 곳이 없으면 종료'
                return [(agent_idx, 'stay')]

            # 스택에서 이전 상태로 백트래킹
            last_agent, last_possible_directions, last_depth = stack.pop()

            # 남은 방향이 있다면 그 중 하나를 선택하고 진행
            if last_possible_directions:
                relayed_actions = relayed_actions[:last_depth]  # 이전 선택을 지우고 다시 선택
                current_agent = last_agent
                possible_directions = last_possible_directions
                depth = last_depth
            else:
                # 백트래킹할 방향이 없으면 계속 백트래킹
                possible_directions = []

        # 랜덤으로 가능한 방향 중 하나 선택
        choosen_action = random.choice(possible_directions)
        possible_directions.remove(choosen_action)

        relayed_actions.append((current_agent, choosen_action[0]))

        # 더 이상 밀 에이전트가 없으면 종료
        if choosen_action[1] is None:
            break

        # depth에 따른 agents_not_exchangeable 처리
        if depth == 1:
            agents_not_exchangeable = []
        agents_not_exchangeable.append(current_agent)

        # 스택에 현재 상태를 저장
        stack.append((current_agent, possible_directions, depth))

        # 다음 에이전트를 선택하고 루프를 계속
        current_agent = choosen_action[1]
        depth += 1

    return relayed_actions

In [7]:
push_recursive_radiation(obs, obs_agents, (2, 2), 2, [])

[(2, 'south')]

In [8]:
# not deadlock 이동 구현

def get_possible_directions_not_deadlock(obs, obs_agents, agent_idx, agent_action, agents_fixed):
    directions = [agent_action]
    directions_pushed_agents = []

    direction_conditions = [
        ('north', obs_agents[agent_idx][3, 4] - 1),
        ('south', obs_agents[agent_idx][5, 4] - 1),
        ('west', obs_agents[agent_idx][4, 3] - 1),
        ('east', obs_agents[agent_idx][4, 5] - 1)
    ]

    for direction, agent_value in direction_conditions:
        if agent_value in agents_fixed:
            if direction in directions:
                directions.remove(direction)

    for direction, agent_value in direction_conditions:
        if direction in directions:
            directions_pushed_agents.append((direction, None if agent_value == -1 else agent_value))

    return directions_pushed_agents


def push_recursive_not_deadlock(obs, obs_agents, agent_idx, agent_action, agents_fixed):

    relayed_actions = []
    agents_not_exchangeable = []
    
    current_agent = agent_idx
    depth = 0

    # 스택에 (현재 에이전트, 남은 방향들, depth) 저장
    stack = []

    while True:
        # 가능한 방향들 계산
        if current_agent == agent_idx:
            possible_directions = get_possible_directions_not_deadlock(obs, obs_agents, agent_idx, agent_action, agents_fixed)
        else:
            possible_directions = get_possible_directions(obs, obs_agents, current_agent, agents_not_exchangeable, agents_fixed)

        while not possible_directions:
            if not stack:
                # 백트래킹할 곳이 없으면 종료'
                return [(agent_idx, 'stay')]

            # 스택에서 이전 상태로 백트래킹
            last_agent, last_possible_directions, last_depth = stack.pop()

            # 남은 방향이 있다면 그 중 하나를 선택하고 진행
            if last_possible_directions:
                relayed_actions = relayed_actions[:last_depth]  # 이전 선택을 지우고 다시 선택
                current_agent = last_agent
                possible_directions = last_possible_directions
                depth = last_depth
            else:
                # 백트래킹할 방향이 없으면 계속 백트래킹
                possible_directions = []

        # 랜덤으로 가능한 방향 중 하나 선택
        choosen_action = random.choice(possible_directions)
        possible_directions.remove(choosen_action)

        relayed_actions.append((current_agent, choosen_action[0]))

        # 더 이상 밀 에이전트가 없으면 종료
        if choosen_action[1] is None:
            break

        # depth에 따른 agents_not_exchangeable 처리
        if depth == 1:
            agents_not_exchangeable = []
        agents_not_exchangeable.append(current_agent)

        # 스택에 현재 상태를 저장
        stack.append((current_agent, possible_directions, depth))

        # 다음 에이전트를 선택하고 루프를 계속
        current_agent = choosen_action[1]
        depth += 1

    return relayed_actions

In [20]:
push_recursive_not_deadlock(obs, obs_agents, 4, 'east', [])

[(4, 'east')]

In [19]:
get_possible_directions_not_deadlock(obs, obs_agents, 4, [], [])

[]

In [22]:
json_data = [{'agent_id': [24, 25, 26], 'deadlock': 'yes', 'solution': 'prime'}, {'agent_id': [30], 'deadlock': 'yes', 'solution': 'prime'}, {'agent_id': [31], 'deadlock': 'no'}, {'agent_id': [1, 2], 'deadlock': 'yes', 'solution': 'radiation'}, {'agent_id': [3, 4], 'deadlock': 'yes', 'solution': 'radiation'}, {'agent_id': [7], 'deadlock': 'no'}]

In [27]:
num_agents = 32
manual_actions = [4 for _ in range(num_agents)]
ml_planned_actions = [4 for _ in range(num_agents)]
ml_planned_actions[7] = 3

deadlock_exists = any(item['deadlock'] == 'yes' for item in json_data)

prime_agents = [item['agent_id'] for item in json_data if item['deadlock'] == 'yes' and item['solution'] == 'prime']
radiation_agents = [item['agent_id'] for item in json_data if item['deadlock'] == 'yes' and item['solution'] == 'radiation']
no_deadlock_agents = [item['agent_id'] for item in json_data if item['deadlock'] == 'no']

prime_agents = [[agent for agent in group if agent < num_agents] for group in prime_agents]
radiation_agents = [[agent for agent in group if agent < num_agents] for group in radiation_agents]
no_deadlock_agents = [[agent for agent in group if agent < num_agents] for group in no_deadlock_agents]

sorted_prime_agents = get_sorted_agents(prime_agents)
sorted_no_deadlock_agents = get_sorted_agents(no_deadlock_agents)

fixed_agents = []
for super_agent in sorted_prime_agents:
    for relayed_action in push_recursive(obs, obs_agents, super_agent, fixed_agents):
        manual_actions[relayed_action[0]] = directiondict[relayed_action[1]]
        fixed_agents.append(relayed_action[0])

for set in radiation_agents:
    x_values = []
    y_values = []
    for agent_idx in set:
        x_values.append(obs[2][agent_idx][0])
        y_values.append(obs[2][agent_idx][1])
    avg_x = sum(x_values) / len(x_values)
    avg_y = sum(y_values) / len(y_values)
    average_position = (avg_x, avg_y)
    if len(x_values) == 0 or len(y_values) == 0:
        continue

    for radiation_agent in set:
        for relayed_action in push_recursive_radiation(obs, obs_agents, average_position, radiation_agent, fixed_agents):
            manual_actions[relayed_action[0]] = directiondict[relayed_action[1]]
            fixed_agents.append(relayed_action[0])

for no_deadlock_agent in sorted_no_deadlock_agents:
    for relayed_action in push_recursive_not_deadlock(obs, obs_agents, no_deadlock_agent, reverse_directiondict[ml_planned_actions[no_deadlock_agent]], fixed_agents):
        manual_actions[relayed_action[0]] = directiondict[relayed_action[1]]
        fixed_agents.append(relayed_action[0])