In [1]:
!pip install --upgrade luxai-s3

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com


In [2]:
from luxai_s3.wrappers import LuxAIS3GymEnv, RecordEpisode
import numpy as np

np.set_printoptions(linewidth=200)

In [3]:
env = RecordEpisode(
    LuxAIS3GymEnv(numpy_output=True)
)

In [4]:
type(env)

luxai_s3.wrappers.RecordEpisode

In [5]:
obs_all, info = env.reset()

In [6]:
env_cfg = info['params']

In [7]:
obs = obs_all['player_0']

In [8]:
env_cfg.keys()

dict_keys(['max_units', 'match_count_per_episode', 'max_steps_in_match', 'map_height', 'map_width', 'num_teams', 'unit_move_cost', 'unit_sap_cost', 'unit_sap_range', 'unit_sensor_range'])

In [9]:
obs.keys()

dict_keys(['units', 'units_mask', 'sensor_mask', 'map_features', 'relic_nodes', 'relic_nodes_mask', 'team_points', 'team_wins', 'steps', 'match_steps'])

In [10]:
def find_opposite_corner_coords(array, row, col):
    """
    Given a 2D array and a coordinate (row, col), this function returns the opposite corner coordinates.

    :param array: 2D list or NumPy array
    :param row: Row index of the given point
    :param col: Column index of the given point
    :return: (row', col') - Opposite corner coordinates
    """
    num_rows = len(array)
    num_cols = len(array[0]) if num_rows > 0 else 0

    # Opposite coordinates
    opp_row = num_rows - 1 - row
    opp_col = num_cols - 1 - col

    return (opp_row, opp_col)

In [11]:
def prep_llm_input(env_cfg, obs):

    ### env_cfg information
    max_units = f"Maximum possible number of units: {env_cfg['max_units']}."
    match_count_per_episode = f"Number of matches per game: {env_cfg['match_count_per_episode']}."
    max_steps_in_match = f"Number of steps per match: {env_cfg['max_steps_in_match']}."
    map_height = f"Map height: {env_cfg['map_height']}."
    map_width = f"Map width: {env_cfg['map_width']}."
    num_teams = f"Number of teams: {env_cfg['num_teams']}."
    unit_move_cost = f"Unit move energy cost: {env_cfg['unit_move_cost']}."
    unit_sap_cost = f"Unit sap energy cost: {env_cfg['unit_sap_cost']}."
    unit_sap_range = f"Unit sap range: {env_cfg['unit_sap_range']}."
    unit_sensor_range = f"Unit sensor range: {env_cfg['unit_sensor_range']}."

    ### obs information
    unit_position_warning = "Unit position: -1, -1 means the unit is not spawned yet or not visible."

    # unit positions
    obs_my_unit_positions = obs['units']['position'][self.team_id]
    my_unit_positions_list = []
    for i in range(my_unit_positions.shape[0]):
        pos = my_unit_positions[i]
        my_unit_positions_list.append(f"My unit {i} position: {pos[0]}, {pos[1]}.")
    my_unit_positions = " ".join(my_unit_positions_list)

    obs_enemy_unit_positions = obs['units']['position'][self.opp_team_id]
    enemy_unit_positions_list = []
    for i in range(enemy_unit_positions.shape[0]):
        pos = enemy_unit_positions[i]
        enemy_unit_positions_list.append(f"Enemy unit {i} position: {pos[0]}, {pos[1]}.")
    enemy_unit_positions = " ".join(enemy_unit_positions_list)

    # unit energys
    obs_my_unit_energys = obs['units']['energy'][self.team_id]
    my_unit_energys_list = []
    for i in range(obs_my_unit_energys.shape[0]):
        energy = obs_my_unit_energys[i]
        my_unit_energys_list.append(f"My unit {i} energy: {energy}.")
    my_unit_energys = " ".join(my_unit_energys_list)

    obs_enemy_unit_energys = obs['units']['energy'][self.opp_team_id]
    enemy_unit_energys_list = []
    for i in range(obs_enemy_unit_energys.shape[0]):
        energy = obs_enemy_unit_energys[i]
        enemy_unit_energys_list.append(f"Enemy unit {i} energy: {energy}.")
    enemy_unit_energys = " ".join(enemy_unit_energys_list)

    # unit masks
    obs_my_units_mask = obs['units_mask'][self.team_id]
    my_units_mask_list = []
    for i in range(obs_my_units_mask.shape[0]):
        mask = obs_my_units_mask[i]
        my_units_mask_list.append(f"My unit {i} visibility: {mask}.")
    my_units_mask = " ".join(my_units_mask_list)

    obs_enemy_units_mask = obs['units_mask'][self.opp_team_id]
    enemy_units_mask_list = []
    for i in range(obs_enemy_units_mask.shape[0]):
        mask = obs_enemy_units_mask[i]
        enemy_units_mask_list.append(f"Enemy unit {i} visibility: {mask}.")
    enemy_units_mask = " ".join(enemy_units_mask_list)

    # sensor mask
    obs_sensor_mask = obs['sensor_mask']
    sensor_mask_list = []
    for i in range(obs_sensor_mask.shape[0]):
        sensor_mask_list.append(f"Sensor mask row {i}: {str(obs_sensor_mask[i]).replace("[", "").replace("]", "")}.")
    sensor_mask = " ".join(sensor_mask_list)

    # map features - energy
    obs_map_features_energy = obs['map_features']['energy']
    map_features_energy_list = []
    for i in range(obs_map_features_energy.shape[0]):
        map_features_energy_list.append(f"Map energy row {i}: {str(obs_map_features_energy[i]).replace("[", "").replace("]", "")}.")
    map_features_energy = " ".join(map_features_energy_list)

    # map features - tile_type
    obs_map_features_tile_type = obs['map_features']['tile_type']
    map_features_tile_type_list = []
    for i in range(obs_map_features_tile_type.shape[0]):
        map_features_tile_type_list.append(f"Map node type row {i}: {str(obs_map_features_tile_type[i]).replace("[", "").replace("]", "")}.")
    map_features_tile_type = " ".join(map_features_tile_type_list)

    # relic nodes
    relic_node_warning = "Relic node position: -1, -1 means the relic node is not yet discoverd."
    obs_relic_nodes = obs['relic_nodes']
    relic_nodes_list = []
    for i in range(obs_relic_nodes.shape[0]):
        relic_nodes_list.append(f"Relic node {i} position: {obs_relic_nodes[i][0]}, {obs_relic_nodes[i][1]}.")
    relic_nodes = " ".join(relic_nodes_list)

    # relic nodes mask
    obs_relic_nodes_mask = obs['relic_nodes_mask']
    relic_nodes_mask_list = []
    for i in range(obs_relic_nodes_mask.shape[0]):
        relic_nodes_mask_list.append(f"Relic node {i} visibility: {obs_relic_nodes_mask[i]}.")
    relic_nodes_mask = " ".join(relic_nodes_mask_list)

    # team points
    my_team_points = f"My current point for this match is: {obs['team_points'][self.team_id]}."
    enemy_team_points = f"Enemy current point for this match is: {obs['team_points'][self.opp_team_id]}."

    # team wins
    my_team_wins = f"I have won {obs['team_wins'][self.team_id]} matches."
    enemy_team_wins = f"Enemy has won {obs['team_wins'][self.opp_team_id]} matches."

    # steps
    steps = f"This is step {obs['steps']} of the game."

    # match_steps
    match_steps = f"This is step {obs['match_steps']} of the match."

    '''
    # SET THESE AT AGENT INIT
    self.my_spawn_location = None
    self.enemy_spawn_location = None
    self.first_spawn = False

    # SET THESE AT AGENT ACTION
    if available_unit_ids.shape[0] == 0:
        pass
    else:
        if self.first_spawn == False:
            first_unit_id = available_unit_ids[0]
            first_unit_pos = unit_positions[first_unit_id]
            self.my_spawn_location = (first_unit_pos[0], first_unit_pos[1])
            self.enemy_spawn_location = find_opposite_corner_coords(, first_unit_pos[0], first_unit_pos[1])
            self.first_spawn = True
    '''
    
    all_variables = " ".join([
        max_units, match_count_per_episode, max_steps_in_match, map_height, map_width, num_teams, unit_move_cost, unit_sap_cost, unit_sap_range, unit_sensor_range, unit_position_warning, my_unit_positions,
        enemy_unit_positions, my_unit_energys, enemy_unit_energys, my_units_mask, enemy_units_mask, sensor_mask, map_features_energy, map_features_tile_type, relic_nodes, relic_nodes_mask,
        my_team_points, enemy_team_points, my_team_wins, enemy_team_wins, steps, match_steps
    ])

    return all_variables

In [12]:
llm_input = prep_llm_input(env_cfg, obs)

NameError: name 'self' is not defined