In [1]:
import numpy as np
import ray

@ray.remote
class HousingPricesParameterServer(object):
    def __init__(self, keys, values):
        #values = [value.copy() for value in values]
        self.weights = dict(zip(keys, values))

    def push(self, keys, values):
        for key, value in zip(keys, values):
            self.weights[key] += value

    def pull(self, keys):
        return [self.weights[key] for key in keys]

In [2]:
@ray.remote
class Worker(object):
    def __init__(self, weight_shard: np.ndarray, keys, values, config):
        self.params = weight_shard.copy()
        self.config = config
        self.keys = keys
        self.values = values

    def compute_grad(self):
        return np.ones_like(self.params)

    def async_update_loop(self, parameter_server):
        """Updates the parameter server and updates own weights."""
        for i in range(self.config["iterations"]):
            grads = self.compute_grad()
            self.values += grads
            self.params = ray.get(parameter_server.push.remote(self.keys,self.values))

In [8]:
ray.shutdown()
import csv

with open('realestate.csv', mode='r') as infile:
    reader = csv.reader(infile)

    for rows in reader:
        size = rows[6]
        price = rows[9]

keys= [0,1]
values = [size, price]

ray.init()
num_workers = 1

weight = np.random.rand(12)
# This launches our Housing parameter server process.
ps = HousingPricesParameterServer.remote(keys, values)

# This launches 4 workers
workers = [Worker.remote(weight, keys, values,{"iterations": 1}) for i in range(4)]

# Here, each workers will update the Parameter Server independently in
# an asynchronous fashion.
#futures = [w.async_update_loop.remote(ps) for w in workers]

# This call blocks until all above workers have finished their loops.
#ray.get(futures)
# Print the weights
print(ray.get(ps.pull.remote(keys)))

2020-08-02 03:44:04,323	INFO resource_spec.py:212 -- Starting Ray with 5.13 GiB memory available for workers and up to 2.58 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-08-02 03:44:04,692	INFO services.py:1165 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


['1362', '235738']
