## Train on Ray cluster

Now that we've discovered ray train, let's try to use on a ray cluster. 
Let's start with a reminder on how to build a ray cluster on kubernetes using minikube.

### Install Ray cluster

Now that we've experimented a little with ray locally, let's deploy it on a k8 cluster.

#### Prerequisites

Make sure you have installed:

- `minikube`
- `helm`

#### Deploy Ray on Kubernetes

The first step is to setup a k8 cluster

```shell
minikube start
```

We are going to use the minikube dashboard in order to monitor our cluster's activity:

```shell
minikube dashboard
```

Then using helm, let's import the repository implementing ray on a kubernetes cluster:

```shell
helm repo add kuberay https://ray-project.github.io/kuberay-helm/
```

The first step is to install the kuberay-operator, which will serve as our ray cluster management tool on kubernetes.

```shell
helm install kuberay-operator kuberay/kuberay-operator --version 1.0.0
```

Once the KubeRay operator is running, we are ready to deploy a RayCluster. To do so, we create a RayCluster Custom Resource (CR) in the default namespace.

```shell
helm install raycluster kuberay/ray-cluster --version 1.0.0
```

It is possible to monitor our ray cluster as well as connect to it through the following service. Let's find our what port it is available on.

```shell
kubectl get service raycluster-kuberay-head-svc
```

Now that we have the name and address of the service, we can use port-forwarding to access the Ray Dashboard port (8265 by default).

```shell
kubectl port-forward --address 0.0.0.0 service/raycluster-kuberay-head-svc 8265:8265
```

You may get a `port already in use` error, in that case, your cluster may already be available on [localhost:8265](http://localhost:8265)

Let's submit our first job to our brand new cluster.  The following job's logs will show the Ray cluster's total resource capacity, including 2 CPUs.
```shell
ray job submit --address http://localhost:8265 -- python -c "import ray; ray.init(); print(ray.cluster_resources())"
```


## Main Ray Components

Now Ray is a vast library. At the heart of Ray is the concept of distributing tasks accross a cluster of computers. Here is everything you can do:

![crack](https://lead-program-assets.s3.eu-west-3.amazonaws.com/M01-Distributed_machine_learning/Ray_components.png)



Let's take advantage of the cluster to launch a ray train job.

```python
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    # model.to("cuda")  # This is done by `prepare_model`
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            # This is done by `prepare_data_loader`!
            # images, labels = images.to("cuda"), labels.to("cuda")
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        #metrics = {"loss": loss.item(), "epoch": epoch}
        #with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
        #    torch.save(
        #        model.module.state_dict(),
        #        os.path.join(temp_checkpoint_dir, "model.pt")
        #    )
        #    ray.train.report(
        #        metrics,
        #        checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
        #    )
        #if ray.train.get_context().get_world_rank() == 0:
        #    print(metrics)
# [4] Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=3, use_gpu=False)

# [5] Launch distributed training job.
trainer = ray.train.torch.TorchTrainer(
    train_func,
    scaling_config=scaling_config,
    # [5a] If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()            

```

Copy the above script in a file you will call "ray_train_demo.py" and then run in your terminal:

```shell
ray job submit --runtime-env-json='{"working_dir": "./", "pip": ["ray[train]", "torch", "torchvision", "numpy"]}' --address="http://127.0.0.1:8265" -- python ray_train_demo.py
```

## Resources 📚📚

[Ray Train](https://docs.ray.io/en/latest/train/distributed-tensorflow-keras.html)