# Ray Core: Quick Intro and Image Processing

In [None]:
import ray

if ray.is_initialized:
    ray.shutdown()

ray.init(runtime_env={"working_dir": "."})

### Ray core: primitves for fast, simple remote tasks, actors, and shared storage

In [None]:
@ray.remote
def sum(arr):
    return arr.sum()

In [None]:
import numpy as np

i = np.eye(10)

In [None]:
result = sum.remote(i)
result

In [None]:
ray.get(result)

Tasks can include arbitrary dependencies, whether or not those are finished

In [None]:
@ray.remote
def double(arr):
    return 2*arr

In [None]:
i = np.eye(10)
j = double.remote(i)
k = sum.remote(j)
k

We can block to get any values we need ... although it's best to do so only when necessary

In [None]:
ray.get(k)

Where are the objects before we "get" them? In the object store. We can directly place data there as well -- e.g., if we would like to share it across nodes and workers

In [None]:
handle = ray.put(np.eye(5))

In [None]:
handle

Like the results of remote calls, these handles can be transparently passed to remote functions

In [None]:
another_sum = sum.remote(handle)
another_sum

In [None]:
ray.get(another_sum)

## How to use Ray distributed tasks for image transformation and computation
For this example, we will simulate a compute-intensive task by transforming and computing some operations on large high-resolution images. These tasks are not uncommon in image classification in a DNN for training and transposing
images. 

PyTorch `torchvision.transforms` API provides many transformation APIs. We will use a couple here, along with some `numpy` and `torch.tensor` operations. Our tasks will perform the following compute-intensive transformations:

 1. Use PIL APIs to [blur the image](https://pillow.readthedocs.io/en/stable/reference/ImageFilter.html) with a filter intensity
 2. Use Torchvision random [trivial wide augmentation](https://pytorch.org/vision/stable/generated/torchvision.transforms.TrivialAugmentWide.html#torchvision.transforms.TrivialAugmentWide)
 3. Convert images into numpy array and tensors and do numpy and torch tensor operations such as [transpose](https://pytorch.org/docs/stable/generated/torch.transpose.html), element-wise [multiplication](https://pytorch.org/docs/stable/generated/torch.mul.html) with a random integers
 4. Do more exponential [tensor power](https://pytorch.org/docs/stable/generated/torch.pow.html) and [multiplication with tensors](https://pytorch.org/docs/stable/generated/torch.mul.html)

The goal is to compare execution times running these task serially vs. distributed as a Ray Task.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_Core/images_for_transformation.png" width="80%" height="30%"> |
|:--|
|High resolution images for transformation and computation.|

In [None]:
import os
import time
import logging
import math
import random

from pathlib import Path
from typing import Tuple, List
from PIL import Image

import numpy as np
import pandas as pd
import pyarrow.parquet as pq
from tqdm.notebook import tqdm
import tasks_helper_utils as t_utils

Define some constants that can be tweaked for experimentation with different batch sizes as part of your exercsie.

In [None]:
DATA_DIR = Path("/tmp/task_images")
BATCHES = [5, 10, 20] # , 30, 40, 50]
SERIAL_BATCH_TIMES = []
DISTRIBUTED_BATCH_TIMES = []

In [None]:
import torch

torch.set_num_threads(1)

In [None]:
# Define a function to run these transformation tasks serially, on a single node, single core
def run_serially(img_list: List) -> List[Tuple[int, float]]:
    return [t_utils.transform_image(Image.open(image)) for image in tqdm(img_list)]

Let's download 100 large images, each betwen 5-20 MB+ with high-resolution greater (4000, 3500) pixels. It will only download once.

In [None]:
# Check if dir exists. If so ignore download.
# Just assume we have done from a prior run
if not os.path.exists(DATA_DIR):
    os.mkdir(DATA_DIR)
    print(f"downloading images ...")
    for url in tqdm(t_utils.URLS):
        t_utils.download_images(url, DATA_DIR)

In [None]:
# Fetch the the entire image list
image_list = list(DATA_DIR.glob("*.jpg"))
image_list[:2]

In [None]:
# Let's look at some of random images, five for now, we are working with. Nice to be one with the data.
t_utils.display_random_images(image_list, n=5)

### Run serially: each image transformation with a Python function

We will iterate through the images with batches of 10 (this can be changed 20 or 25, etc) and process them. To simulate a computer-intensive operation on images, we are doing the tensor transformation and computations described above.

In [None]:
for idx in BATCHES:
    # Use the index to get N number of URLs to images
    image_batch_list = image_list[:idx]
    print(f"\nRunning {len(image_batch_list)} tasks serially....")
    
    # Run each one serially
    start = time.perf_counter()
    serial_results = run_serially(image_batch_list)
    end = time.perf_counter()
    elapsed = end - start
    
    # Keep track of batches, execution times as a Tuple
    SERIAL_BATCH_TIMES.append((idx, round(elapsed, 2)))
    print(f"Serial transformations/computations of {len(image_batch_list)} images: {elapsed:.2f} sec")

### Run distributed: each image transformation with a Ray task

Let's create a Ray task for an image within each batch and process them. Since 
our images are large, let's put them in the [Ray Distributed object store](https://docs.ray.io/en/latest/ray-core/key-concepts.html#objects).

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/object_store.png" width="70%" loading="lazy">|
|:--|
|Diagram of workers in worker nodes using `ray.put()` to store values and using `ray.get()` to retrieve them from each node's object store.|

In [None]:
# Put images in object store
object_refs_list = [ray.put(Image.open(img)) for img in tqdm(image_list)]

object_refs_list[:2], len(object_refs_list)

In [None]:
# Define a Ray task to transform, augment and do some compute intensive tasks on an image
@ray.remote
def augment_image_distributed(img: Image) -> List[object]:
    return t_utils.transform_image(img)

In [None]:
# Define function to run these transformation tasks distributed
def run_distributed(obj_ref_list:List) ->  List[Tuple[int, float]]:
    return ray.get([augment_image_distributed.remote(img) for img in tqdm(obj_ref_list)])

In [None]:
# Iterate over batches of 10, launching Ray task for each image within the processing
# batch
for idx in BATCHES:
    image_obj_ref_batch_list = object_refs_list[:idx]
    print(f"\nRunning {len(image_obj_ref_batch_list)} tasks distributed....")
    
    # Run each one serially
    start = time.perf_counter()
    distributed_results = run_distributed(image_obj_ref_batch_list)
    end = time.perf_counter()
    elapsed = end - start
    
     # Keep track of batchs, execution times as a Tuple
    DISTRIBUTED_BATCH_TIMES.append((idx, round(elapsed, 2)))
    print(f"Distributed transformations/computations of {len(image_obj_ref_batch_list)} images: {elapsed:.2f} sec")

### Compare and plot the serial vs. distributed computational times

In [None]:
# Print times for each and plot them for comparison
print(f"Serial times & batches     : {SERIAL_BATCH_TIMES}")
print(f"Distributed times & batches: {DISTRIBUTED_BATCH_TIMES}")

In [None]:
t_utils.plot_times(BATCHES, SERIAL_BATCH_TIMES, DISTRIBUTED_BATCH_TIMES)

### Recap

We can clearly observe that the overall execution times by Ray tasks is in order of **3-4x** faster 🚅 than serial. Converting an existing serial compute-intensive Python function is as simple as adding the `ray.remote(...)` operator to your Python function. And Ray will handle all the hard bits: scheduling, execution, scaling, memory management, etc.

As you can see the benefits are tangible in execution times with Ray tasks.

### References

1. [Modern Parallel and Distributed Python: A Quick Tutorial on Ray](https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8) by Robert Nishihara, co-creator of Ray and co-founder Anyscale
2. [Ray Core Introduction](https://www.anyscale.com/events/2022/02/03/introduction-to-ray-core-and-its-ecosystem) by Jules S. Damji