In [1]:
import logging
import time
import ray
import random
from random import randint
import numpy as np

if ray.is_initialized:
    ray.shutdown()
ray.init()

2023-04-16 16:02:48,158	INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.6
Ray version:,2.3.1
Dashboard:,http://127.0.0.1:8265


In [3]:
@ray.remote
class MethodStateCounter:
    def __init__(self):
        self.invokers = {"A": 0, "B": 0, "C": 0}

    def invoke(self, name):
        time.sleep(0.5)
        self.invokers[name] += 1
        return self.invokers[name]

    def get_invoker_state(self, name):
        return self.invokers[name]

    def get_all_invoker_state(self):
        return self.invokers

worker_invoker = MethodStateCounter.remote()
print(worker_invoker)

for _ in range(10):
    name = random.choice(CALLERS)
    worker_invoker.invoke.remote(name)

print("------------------")
for _ in range(5):
    random_name_invoker = random.choice(["A", "B", "C"])
    times_invoked = ray.get(worker_invoker.invoke.remote(random_name_invoker))
    print(f"Named caller: {random_name_invoker} called {times_invoked}")

print(ray.get(worker_invoker.get_all_invoker_state.remote()))


Actor(MethodStateCounter, f591b3a917146e72976ccb9901000000)
Method callers: 
Named caller: A called 4
Named caller: C called 3
Named caller: B called 6
Named caller: C called 4
Named caller: B called 7
{'A': 4, 'B': 7, 'C': 4}


In [4]:
@ray.remote
class ParameterSever:
    def __init__(self):
        self.params = np.zeros(10)

    def get_params(self):
        return self.params

    def update_params(self, grad):
        self.params -= grad

@ray.remote
def worker(ps):
    for i in range(25):
        time.sleep(1.5)
        grad = np.ones(10)
        ps.update_params.remote(grad)

param_server = ParameterSever.remote()
print(param_server)

print(f"Initial params: {ray.get(param_server.get_params.remote())}")

print([worker.remote(param_server) for _ in range(3)])

print("------------------")

for _i in range(20):
    print(f"Updated params: {ray.get(param_server.get_params.remote())}")
    time.sleep(1)


Actor(ParameterSever, 90e0a676311d6eaa6e8515dc01000000)
Initial params: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[ObjectRef(aa3d5d11e415fe88ffffffffffffffffffffffff0100000001000000), ObjectRef(a6d6d59239756144ffffffffffffffffffffffff0100000001000000), ObjectRef(c7528efcb2fd36edffffffffffffffffffffffff0100000001000000)]
Updated params: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Updated params: [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
Updated params: [-3. -3. -3. -3. -3. -3. -3. -3. -3. -3.]
Updated params: [-3. -3. -3. -3. -3. -3. -3. -3. -3. -3.]
Updated params: [-6. -6. -6. -6. -6. -6. -6. -6. -6. -6.]
Updated params: [-9. -9. -9. -9. -9. -9. -9. -9. -9. -9.]
Updated params: [-9. -9. -9. -9. -9. -9. -9. -9. -9. -9.]
Updated params: [-12. -12. -12. -12. -12. -12. -12. -12. -12. -12.]
Updated params: [-15. -15. -15. -15. -15. -15. -15. -15. -15. -15.]
Updated params: [-15. -15. -15. -15. -15. -15. -15. -15. -15. -15.]
Updated params: [-18. -18. -18. -18. -18. -18. -18. -18. -18. -18.]
Updated params: [-21. -21. -21

In [None]:
def model_factory(m: str, func: object):
    return Model(m, func)

class Model:
    def __init__(self, m: str, func: object):
        self._model = m
        self._func = func

    def train(self):
        self._func()

@ray.remote
class Worker(object):
    def __init__(self, m: str, func: object):
        self._model = m
        self._func = func

    def state(self) -> str:
        return random.choice(["RUNNING", "PENDING", "DONE"])

    def work(self) -> None:
        model_factory(self._model, self._func).train()

def lf_func():
    time.sleep(1)
    return 0

def cl_func():
    time.sleep(1)
    return 0

def nn_func():
    time.sleep(1)
    return 0


In [None]:

@ray.remote
class Supervisor:
    def __init__(self):
        self.workers = [
            Worker.remote(name, func)
            for (name, func) in [("lr", lf_func), ("cl", cl_func), ("nn", nn_func)]
        ]

    def work(self):
        [worker.work.remote()for worker in self.workers]

    def terminate(self):
        [ray.kill(worker) for worker in self.workers]

    def state(self):
        return ray.get([worker.state.remote() for worker in self.workers])

sup = Supervisor.remote()

print(sup.work.remote())

while True:
    states = ray.get(sup.state.remote())
    print(states)
    result = all("DONE" == e for e in states)
    if result:
        sup.terminate.remote()
        ray.kill(sup)
        break

