<img src="img/Ray.png" width="800">

<br/><br/>
<br/><br/>

## Bottleneck of Current ML 
### Goal: Distribute the job to as many as workers!
<img src="img/workers.png">



<br/><br/>
<br/><br/>

## Competitors
<img src="img/competitors.png" width="800">
<br/><br/>
<br/><br/>

## The RISELab
Previously UC berkeley AMPLab, develop and open-sourced Spark, Tachyon (now Alluxio), and Mesos.
<img src="img/spark.png">

# Ray Core

In [None]:
!pip install ray

In [1]:
import ray
import time

ray.init()

@ray.remote
def f(i):
    time.sleep(1)
    return i

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

2021-09-01 16:06:02,411	INFO services.py:1272 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


[0, 1, 2, 3]


In [3]:
ray.shutdown()

## Ray Init

<img src="img/architect.png">

<br></br>
<br></br>

In [4]:
ray.init(num_cpus=4)

2021-09-01 16:08:05,163	INFO services.py:1272 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.100.204',
 'raylet_ip_address': '192.168.100.204',
 'redis_address': '192.168.100.204:6379',
 'object_store_address': '/tmp/ray/session_2021-09-01_16-08-03_170786_2186/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-09-01_16-08-03_170786_2186/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2021-09-01_16-08-03_170786_2186',
 'metrics_export_port': 63850,
 'node_id': 'ce7db6210db290a98ab7f1e63adb52c103e230c24b53d40829bf8a94'}

## Ray Get/Put

### Expensive serialization and deserialization as well as data copying are a common performance bottleneck in distributed computing.
<br></br>
<img src="img/arrow.png" width="500">

<br></br>
<br></br>

In [13]:
x_id = ray.put("example")
x_id

ClientObjectRef(50bc1e9bf6927fe48e7a10f3f79930f8b913e4f702a09fe301000000)

In [14]:
ray.get(x_id)

'example'

## Remote Function

In [9]:
@ray.remote
def add2(a, b):
    return a, [b,b]

### Invoke a task and Object Ref

In [10]:
x_id = add2.remote(1, 2)
x_id

ObjectRef(480a853c2c4c6f27ffffffffffffffffffffffff0100000001000000)

In [11]:
ray.get(x_id)

(1, [2, 2])

In [12]:
@ray.remote(num_returns=3)
def return_multiple():
    return 1, 2, 3

a_id, b_id, c_id = return_multiple.remote()

In [13]:
a_id

ObjectRef(623b26bdd75b28e9ffffffffffffffffffffffff0100000001000000)

In [14]:
b_id

ObjectRef(623b26bdd75b28e9ffffffffffffffffffffffff0100000002000000)

In [15]:
c_id

ObjectRef(623b26bdd75b28e9ffffffffffffffffffffffff0100000003000000)

## For loop to assign tasks

### futures = [f.remote(i) for i in range(4)]

In [16]:
import ray
import time

ray.shutdown()
ray.init()

@ray.remote
def f(i):
    time.sleep(1)
    return i

futures = (f.remote(i) for i in range(4))
print(ray.get(futures))

2021-09-01 16:16:29,724	INFO services.py:1272 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


ValueError: 'object_refs' must either be an object ref or a list of object refs.

In [17]:
ray.shutdown()





<br/><br/>



<br/><br/>

# Ray Actor

In [34]:
import ray

ray.shutdown()
ray.init()

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0
    def increment(self):
        self.n += 1
    def read(self):
        return self.n
    
counters = [Counter.remote() for i in range(4)]
print(counters)
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))

2021-09-01 15:09:26,151	INFO services.py:1272 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m
2021-09-01 15:09:28,324	INFO logservicer.py:102 -- New logs connection established. Total clients: 1


[ClientActorHandle(5b4c2817f0d5e3a74ceae6ae01000000), ClientActorHandle(70f9d44f3938ba6143103d1001000000), ClientActorHandle(0adf7b103b53f8486aa5f13a01000000), ClientActorHandle(7ab7b024c3b4ffb7ab785d8901000000)]
[1, 1, 1, 1]


## Parameter Server

<br></br>

<img src="img/parameter_server.png" width="700">

<br></br>

### Utils

In [18]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from filelock import FileLock
import numpy as np

import ray


def get_data_loader():
    """Safely downloads data. Returns training/validation set dataloader."""
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])

    # We add FileLock here because multiple workers will want to
    # download data, and this may cause overwrites since
    # DataLoader is not threadsafe.
    with FileLock(os.path.expanduser("~/data.lock")):
        train_loader = torch.utils.data.DataLoader(
            datasets.MNIST(
                "~/data",
                train=True,
                download=True,
                transform=mnist_transforms),
            batch_size=128,
            shuffle=True)
        test_loader = torch.utils.data.DataLoader(
            datasets.MNIST("~/data", train=False, transform=mnist_transforms),
            batch_size=128,
            shuffle=True)
    return train_loader, test_loader


def evaluate(model, test_loader):
    """Evaluates the accuracy of the model on a validation dataset."""
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            # This is only set to finish evaluation faster.
            if batch_idx * len(data) > 1024:
                break
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    return 100. * correct / total

### Define Network

In [19]:
class ConvNet(nn.Module):
    """Small ConvNet for MNIST."""

    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

    def get_weights(self):
        return {k: v.cpu() for k, v in self.state_dict().items()}

    def set_weights(self, weights):
        self.load_state_dict(weights)

    def get_gradients(self):
        grads = []
        for p in self.parameters():
            grad = None if p.grad is None else p.grad.data.cpu().numpy()
            grads.append(grad)
        return grads

    def set_gradients(self, gradients):
        for g, p in zip(gradients, self.parameters()):
            if g is not None:
                p.grad = torch.from_numpy(g)

## Define Sever

In [52]:
# @ray.remote
class ParameterServer(object):
    def __init__(self, lr):
        self.model = ConvNet()
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=lr)

    def apply_gradients(self, gradients):
#         gradients = ray.get(*gradients)
        print("gradeints in ps",gradients)
        summed_gradients = [
            np.stack(gradient_zip).sum(axis=0)
            for gradient_zip in zip(gradients)
        ]
        self.optimizer.zero_grad()
        self.model.set_gradients(summed_gradients) # gradient = summend_gradient
        self.optimizer.step() # w -= lr * (gradient + optimizer_gradient)
        return self.model.get_weights()

    def get_weights(self):
        return self.model.get_weights()

In [34]:
gradeint_1 = np.array([[1,1,1],[3,3,3]])
gradeint_2 = np.array([[2,2,2],[4,4,4]])

gradients = [gradeint_1, gradeint_2]

for gradient_zip in zip(*gradients):
    temp = np.array(gradient_zip).sum(axis=0)
    print("temp",temp)
#     stack = np.stack(gradient_zip).sum(axis=0)
#     print("stack",stack) 
#     print("gradient_zip",gradient_zip) 
    


temp [3 3 3]
temp [7 7 7]


## Define Worker

In [21]:
@ray.remote
class DataWorker(object):
    def __init__(self):
        self.model = ConvNet()
        self.data_iterator = iter(get_data_loader()[0])

    def compute_gradients(self, weights):
        self.model.set_weights(weights)
        try:
            data, target = next(self.data_iterator)
        except StopIteration:  # When the epoch ends, start a new epoch.
            self.data_iterator = iter(get_data_loader()[0])
            data, target = next(self.data_iterator)
        self.model.zero_grad()
        output = self.model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        return self.model.get_gradients()

In [41]:
workers = [DataWorker.remote() for i in range(2)]

[2m[36m(pid=587)[0m   return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)
[2m[36m(pid=587)[0m   return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)
[2m[36m(pid=582)[0m   return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)
[2m[36m(pid=582)[0m   return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


In [42]:
workers

[ClientActorHandle(a7a9ef38db64f85aa11dffee01000000),
 ClientActorHandle(44c79a7b4144f6c4919722dc01000000)]

In [53]:
iterations = 200
num_workers = 2

ray.shutdown()
ray.init(ignore_reinit_error=True)
# ps = ParameterServer.remote(1e-2)
ps = ParameterServer(1e-2)
workers = [DataWorker.remote() for i in range(num_workers)]

model = ConvNet()
test_loader = get_data_loader()[1]

print("Running synchronous parameter server training.")
# current_weights = ps.get_weights.remote()
current_weights = ps.get_weights()
# current_weights = ray.get()
for i in range(iterations):
    gradients = [
        worker.compute_gradients.remote(current_weights) for worker in workers
    ]
    print()
    # Calculate update after all gradients are available.
#     current_weights = ps.apply_gradients.remote(*gradients)
    gradients = ray.get(gradients)
#     print("gradients",gradients)
#     current_weights = ps.apply_gradients(*gradients)
    current_weights = ps.apply_gradients(gradients)

    if i % 10 == 0:
        # Evaluate the current model.
        model.set_weights(ray.get(current_weights))
        accuracy = evaluate(model, test_loader)
        print("Iter {}: \taccuracy is {:.1f}".format(i, accuracy))

print("Final accuracy is {:.1f}.".format(accuracy))
# Clean up Ray resources and processes before the next example.
ray.shutdown()

2021-09-01 17:28:45,290	INFO services.py:1272 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


Running synchronous parameter server training.



[2m[36m(pid=2876)[0m   return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)
[2m[36m(pid=2872)[0m   return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


gradeints in ps [[array([[[[ 0.35065064,  0.3205852 ,  0.2195741 ],
         [ 0.26225773,  0.20988001,  0.08071565],
         [ 0.1556414 ,  0.06346474,  0.02011846]]],


       [[[ 0.04883697,  0.03587123,  0.06884987],
         [ 0.13784017,  0.10605384,  0.06080383],
         [ 0.137815  ,  0.11534718,  0.05148438]]],


       [[[-0.04346804, -0.02496378, -0.01449243],
         [-0.02751596, -0.02145926,  0.00279573],
         [-0.02471375, -0.01859206,  0.00035154]]]], dtype=float32), array([0.03885732, 0.09216283, 0.06102588], dtype=float32), array([[ 0.00110306,  0.00133766,  0.00110306, ...,  0.03064501,
         0.02688601,  0.02755568],
       [ 0.00423381,  0.00460703,  0.00423381, ...,  0.0808224 ,
         0.10532589,  0.10709234],
       [-0.00047816, -0.00033962, -0.00047816, ..., -0.0156629 ,
        -0.01017844, -0.00732313],
       ...,
       [-0.00283323, -0.00272066, -0.00283323, ..., -0.08077073,
        -0.07326441, -0.07270986],
       [-0.00137135, -0.00127145,

[2m[36m(pid=2876)[0m   return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
  arrays = [asanyarray(arr) for arr in arrays]
[2m[36m(pid=2872)[0m   return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


TypeError: can't convert np.ndarray of type numpy.object_. The only supported types are: float64, float32, float16, complex64, complex128, int64, int32, int16, int8, uint8, and bool.



<br/><br/>

<br/><br/>





