In [1]:
from MuJoCo_Gym.mujoco_rl import MuJoCo_RL
import numpy as np
import random

### Implement a language channel
This language channel sends one token per timestep two the other agent. Which token is determined by which number the agents picks between 0 and 2. As observations the agents receives the token the other agent uttered one timestep earlier.

In [2]:
class Language:
    def __init__(self, environment):
        self.environment = environment
        self.observation_space = {"low": [0, 0, 0], "high": [1, 1, 1]}
        self.action_space = {"low": [0, 0, 0], "high": [1, 1, 1]}
        self.dataStore = {}

    def dynamic(self, agent, actions):
        if "utterance" not in self.environment.dataStore[agent].keys():
            self.environment.dataStore[agent]["utterance"] = 0

        utterance = [0, 0, 0]
        utterance[np.argmax(actions)] = 1
        self.environment.dataStore[agent]["utterance"] = utterance

        otherAgent = [other for other in self.environment.agents if other != agent][0]

        if "utterance" in self.environment.dataStore[otherAgent]:
            utteranceOtherAgent = self.environment.dataStore[otherAgent]["utterance"]
            return 0, np.array(utteranceOtherAgent)
        else:
            return 0, np.array([0, 0, 0])

### Implement a reward function
In version 3.0, the reward function is not a class, but an actual callable function, which is also handed over as an argument in the config dictionary. First the data fields are created in the dataStore. This will be done every time the environment has been reset, as the datastore is cleared back to {agent:{}, agent2:{} etc.} during reset.<br>
After that the agent gets a reward for getting closer to the target. To achieve this the reward function simply calculates the difference between the current distance and the distance at the previous timestep. If the agent gets closer to the target, the difference is positive and therefor the reward is positive as well.

In [3]:
def reward_function(mujoco_gym, agent):
    # Creates all the necessary fields to store the needed data within the dataStore at timestep 0 
    if "targets" not in mujoco_gym.dataStore[agent].keys():
        mujoco_gym.dataStore["targets"] = mujoco_gym.filterByTag("target")
        mujoco_gym.dataStore[agent]["current_target"] = mujoco_gym.dataStore["targets"][random.randint(0, len(mujoco_gym.dataStore["targets"]) - 1)]["name"]
        distance = mujoco_gym.distance(agent, mujoco_gym.dataStore[agent]["current_target"])
        mujoco_gym.dataStore[agent]["distance"] = distance
        new_reward = 0
    else: # Calculates the distance between the agent and the current target
        distance = mujoco_gym.distance(agent, mujoco_gym.dataStore[agent]["current_target"])
        new_reward = mujoco_gym.dataStore[agent]["distance"] - distance
        mujoco_gym.dataStore[agent]["distance"] = distance
    reward = new_reward * 10
    return reward

### Done function
The current simulation run is over if the agent gets closer than one distance unit to the target. Note that the data field for distance does not have to be created again. This is because the reward functions are executed before the done function inside the environment class. This means that the distance fields already exist, even at timestep 0.

In [4]:
def done_function(mujoco_gym, agent):
    if mujoco_gym.dataStore[agent]["distance"] <= 1:
        return True
    else:
        return False

### Starting the environment
The path to the mujoco xml file and additional json info file are handed over, as well as the agents mujoco names and the reward/done function and the environment dynamic. The render mode is also set to true, meaning that the environment is rendered on screen while running. This should only be done for inference though, as rendering is quite ressource intensive.

In [5]:
environment_path = "Environment/SingleBoxEnv.xml"
info_path = "Environment/info_example.json"
agents = ["agent1_torso", "agent2_torso"]
config_dict = {"xmlPath":environment_path, "infoJson":info_path, "agents":agents, "rewardFunctions":[reward_function], "doneFunctions":[done_function], "environmentDynamics":[Language], "freeJoint":True, "renderMode":True, "maxSteps":4096}
environment = MuJoCo_RL(config_dict)

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


### Running the simulation
The simulation is run with sample data from the action spaces of the individual agents. They are being reset if either truncations or terminations return true.

In [6]:
environment.reset()
while True:
    try:
        # print(test_env._action_space)
        action = {"agent1_torso": environment._action_space["agent1_torso"].sample(), "agent2_torso": environment._action_space["agent2_torso"].sample()}
        observations, current_rewards, terminations, truncations, infos = environment.step(action)

        if terminations["__all__"] == True or truncations["__all__"] == True:
            environment.reset()
    except KeyboardInterrupt:
        break

[-1.0, 0.0, 1.0, 0.0] [0 0 0]
[1.4000000000000001, 0.0, 1.0, 0.0] [1 0 0]
[-1.0, 0.0, 1.0, 0.0] [1 0 0]
[1.4000000000000001, 0.0, 1.0, 0.0] [1 0 0]
[-1.0, 0.0, 1.0, 0.0] [0 0 1]
[1.4000000000000001, 0.0, 1.0, 0.0] [1 0 0]
[-1.0, 0.0, 1.0, 0.0] [0 1 0]
[1.4000000000000001, 0.0, 1.0, 0.0] [1 0 0]
