# Pettingzoo inside ray

In [1]:
from pettingzoo.mpe import simple_tag_v2
from pettingzoo.test import render_test
from pettingzoo.test import performance_benchmark
from pettingzoo.test import test_save_obs
import ray

import time
import random

In [2]:
@ray.remote
def test_run_env():
    cycles = 20
    agent_count = 4
    current_cycle = 0
    action_queue = []
    
    full_report = []
    env = simple_tag_v2.env(render_mode="rgb_array", max_cycles=cycles)
    env.reset()
    for agent in env.agent_iter():
        if current_cycle >= cycles * agent_count:
            break
        if current_cycle % agent_count == 0:
            adversary_0_action = random.choice([0, 1, 2, 3, 4])
            adversary_1_action = random.choice([0, 1, 2, 3, 4])
            adversary_2_action = random.choice([0, 1, 2, 3, 4])
            good_agent_action = random.choice([0, 1, 2, 3, 4])

            action_queue += [
                adversary_0_action,
                adversary_1_action,
                adversary_2_action,
                good_agent_action
            ]
            full_report.append({
                "adversary_0": {"action": adversary_0_action},
                "adversary_1": {"action": adversary_1_action},
                "adversary_2": {"action": adversary_2_action},
                "agent_0": {"action": good_agent_action}
            })
        #print(agent)
        env.render()
        # obs, reward, done, info = env.last()
        observation, cumulative_rewards, terminations, truncations, infos = env.last()
        #print(observation, cumulative_rewards, terminations, truncations, infos)
        action = action_queue.pop(0)
        env.step(action)
        current_cycle += 1
        full_report[-1][agent]["observation"] = observation
        full_report[-1][agent]["cumulative_rewards"] = cumulative_rewards
        full_report[-1][agent]["terminations"] = terminations
        full_report[-1][agent]["truncations"] = truncations
        full_report[-1][agent]["infos"] = infos
        
    else:
        env.close()
    return full_report

In [3]:
batches_num = 10
task_handles = [] 
for _ in range(batches_num):
    task_handles.append(test_run_env.remote())
output = ray.get(task_handles)
print(output)

2023-03-21 17:26:02,165	INFO worker.py:1553 -- Started a local Ray instance.


[2m[36m(test_run_env pid=16476)[0m None
[2m[36m(test_run_env pid=16712)[0m None
[2m[36m(test_run_env pid=2656)[0m None
[2m[36m(test_run_env pid=17304)[0m None
[2m[36m(test_run_env pid=17624)[0m None
[[], [], [], [], [], [], [], [], [], []]
[2m[36m(test_run_env pid=9740)[0m None
[2m[36m(test_run_env pid=19172)[0m None
[2m[36m(test_run_env pid=9004)[0m None
[2m[36m(test_run_env pid=6088)[0m None
[2m[36m(test_run_env pid=16748)[0m None


## info

`observation, cumulative_rewards, terminations, truncations, infos = env.last()`

examples of the above returned values:

`observation = [ 0.          0.         -0.81671894 -0.35830274  0.7397504   0.87728226  0.24053054 -0.3338143   0.4263175 -0.13701786  1.0103662  -0.5890981 0.5039304   0.7280103   0.          0.        ]`

observations are ordered like this:

`[self_vel, self_pos, landmark_rel_positions[..], other_agent_rel_positions[..], other_agent_velocities[..]]`

`cumulative_rewards = 0.0`

`terminations = False `

`truncations = False `

`infos = {}`


then for agent names and actions, they are:


`agent names: adversary_0, adversary_1 ..., agent_0`

`actions: 0, 1, 2, 3 or 4 (meaning no_action, move_left, move_right, move_down, move_up)`

# Original ray testing code

In [None]:
# Approximate pi using random sampling. Generate x and y randomly between 0 and 1. 
#  if x^2 + y^2 < 1 it's inside the quarter circle. x 4 to get pi. 
import ray
import random

In [None]:

# Let's start Ray
ray.init()

SAMPLES = 1000000; 
# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def pi4_sample():
    in_count = 0
    for _ in range(SAMPLES):
        x, y = random.random(), random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return in_count

# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process. Get retreives the result. 
future = pi4_sample.remote()
pi = ray.get(future) * 4.0 / SAMPLES
print(f'{pi} is an approximation of pi') 

# Now let's do this 100,000 times. 
# With regular python this would take 11 hours
# Ray on a modern laptop, roughly 2 hours
# On a 10-node Ray cluster, roughly 10 minutes 
BATCHES = 100000
results = [] 
for _ in range(BATCHES):
    results.append(pi4_sample.remote())
output = ray.get(results)
pi = sum(output) * 4.0 / BATCHES / SAMPLES
print(f'{pi} is a way better approximation of pi') 

In [None]:
for i in results:
    ray.cancel(i) 

# Test with multiple function calling

In [12]:
import ray
import random 
# Let's start Ray
# ray.init()

def get_random_numbers_that_are_distant(how_much):
    x = 0
    y = 0
    while abs(y - x) < how_much:
        x = random.random()
        y = random.random()
    return x, y

SAMPLES = 1000000; 
@ray.remote
def pi4_sample():
    in_count = 0
    for _ in range(SAMPLES):
        x, y = get_random_numbers_that_are_distant(0.5)
        if x*x + y*y <= 1:
            in_count += 1
    return in_count

BATCHES = 100
results = [] 
for _ in range(BATCHES):
    results.append(pi4_sample.remote())
output = ray.get(results)
pi = sum(output) * 4.0 / BATCHES / SAMPLES
print(f'{pi} is a way WORSE approximation of pi') 

3.49298832 is a way worse approximation of pi


In [11]:
for i in results:
    ray.cancel(i) 

2023-03-11 12:58:54,426	ERROR worker.py:399 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::pi4_sample()[39m (pid=27256, ip=127.0.0.1)
  File "python\ray\_raylet.pyx", line 1072, in ray._raylet.execute_task_with_cancellation_handler
  File "python\ray\_raylet.pyx", line 805, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 850, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 857, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 861, in ray._raylet.execute_task
  File "C:\Users\rober\AppData\Local\Temp\ipykernel_18124\1607450648.py", line 18, in pi4_sample
ray.exceptions.TaskCancelledError: Task: TaskID(7486c9c5cb2b345effffffffffffffffffffffff01000000) was cancelled
2023-03-11 12:58:54,428	ERROR worker.py:399 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::pi4_sample()[39m (pid=19192, ip=127.0.0.1)
  File "python\ray\_raylet.pyx", line 1072, in ray._raylet.execute_task_with_