# Ray Data User Testing

In this notebook, you will learn how to use Ray Data for distributed model training. Ray Data is used for data loading for model training. You are asked to fill in the missing code to finish the following 3 tasks:

- Task 1: Read training data from S3
- Task 2: Preprocess training data
- Task 3: Run distributed training with 4 GPU workers

You can refer to the Ray Data Documentation for user guides and APIs: https://docs.ray.io/en/master/data/data.html

## Task 1: Read training data from S3

In this section, you will read training data from AWS S3 (https://aws.amazon.com/s3/). The training data consists of a list of files in Parquet format (https://parquet.apache.org/). The training data contains images and their labels.

Success Criteria for this section:
- Successfully create a Ray Dataset to read Parquet files from S3.
- Successfully inspect metadata of Ray Dataset to get the names of columns, and the number of images.

In [1]:
# Instruction: just run this cell.
# Import required dependencies.
import numpy as np
from torchvision.transforms import Compose, Normalize, ToTensor
from tqdm import tqdm
import ray.train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
from util import prepare_model

path = "s3://air-example-data/data-cuj/train"

In [None]:
# Instruction: add your code here.
# Read from S3 bucket above: s3://air-example-data/data-cuj/train





Please fill in the blanks below:

|                     |                      |
|---------------------|----------------------|
| Columns names       | ___x___              |
| Number of images    | ___x___              |

## Task 2: Preprocess images

In this section, we'll preprocess images to normalize them for training. You are given the preprocess code below to normalize a single image with TorchVision function. You are expected to use the Ray Data API to parallelize the preprocessing logic among all images. 

Success Criteria for this section:
- Successfully use correct Ray Data API to run preprocessing for all images.

In [None]:
# Instruction: just run this cell.
# The preprocessing function to be applied to every image.
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])

# Example of running the preprocessing on a toy image.
example_image = np.array([[1, 0, 1], [1, 1, 0.5]], np.double)
transform(example_image)

In [6]:
# Instruction: add your code here.
# Use Ray Data to parallelize the preprocessing logic above.
# NOTE: Do not drop labels during preprocessing!






## Task 3: Run distributed training with 4 GPU workers

In this section, let's do distributed model training for preprocessed images. To do distributed model training, you are given a script with everything working, except the part of data loading is missing. You are expected to fill in the data loading code and start training!

Success Criteria for this section:
- Successfully use correct Ray Data API to get shard of Dataset.
- Successfully use correct Ray Data API to iterate the Dataset and pass batch of (images, labels) to model.

In [7]:
# The function executed on each training worker.
def train_func_per_worker():
    # Prepare model.
    model, loss_fn, optimizer = prepare_model()

    # Model training for 2 epochs.
    for epoch in range(2):
        # Instruction: add your code here to get Dataset.
        # NOTE: each training data batch should have 32 images and labels.
        # ...
        # train_data_iterator = ...

        model.train()
        # Model training for each batch.
        for batch in tqdm(train_data_iterator, desc=f"Train Epoch {epoch}"):
            images = batch["image"]
            labels = batch["label"]
            pred = model(images)
            loss = loss_fn(pred, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()

In [None]:
# Initialize a Ray TorchTrainer
trainer = TorchTrainer(
    train_loop_per_worker=train_func_per_worker,
    scaling_config=ScalingConfig(num_workers=1, use_gpu=True),
    # Instruction: add your code here to pass the Dataset variable.
    datasets={"train": ...}
)

# Start model training!
result = trainer.fit()
print(f"Training result: {result}")

Please fill in the blanks below:

|                            |                      |
|----------------------------|----------------------|
| Number of epochs trained   | ___x___ (epochs)     |
| End-to-end training time   | ___x___ (seconds)    |