# Ray Data Test Notebook

The vast majority of this notebook is based off implementing the examples made available by the ray data [getting started](https://docs.ray.io/en/latest/data/getting-started.html#datasets-getting-started) docs. 



### What kinds of things should I use Ray Datasets for? 

According to their docs, Ray is, "designed to load and pre-process data for distributed ML training pipelines...Ray Datasets is not intended as a replacement for more general data processing systems"[[1]]. Its purpose is only to serve as a "last mile" distributed data processing tool. Therefore it is designed with the following 3 use cases in mind. 

* Last Mile Processing
* Parallel Batch Inference
* ML Training Ingest (Distributed training)

Below we will attempt to evaluate Ray for these different types of use cases. 

_Note: current testing / evaluation done on a local PC with 32GB memory. This will need to be scaled down to work on ODH with current pod resource sizes i think._ 

[1]: https://docs.ray.io/en/master/data/faq.html#what-should-i-use-ray-datasets-for

In [1]:
import ray
from ray.data.aggregate import Mean, Std

import os
import gc
import pandas as pd
import numpy as np

%load_ext memory_profiler

Connect to our remote ray cluster if we're on an ODH notebook image. 

# Part 1: Ray Dataset 

If we are going to test the capabilities of this Ray data tool, we are going to need a reasonably sized example data set. Let's create a CSV file that's almost 1GB and save it to our current file system.    

In [2]:
if os.path.exists("tmp/output") == False:
    print("creating dataset")
    %memit \
    ds = ray.data.range(1000)
    print("writing file")
    ds.repartition(1).write_csv("tmp/output")
    del ds
    gc.collect()
else:
    print("file exists")

file exists


Now that we've got our "BIG" dataset, let's read it in with Ray vs vanilla pandas, run some basic data transformations and compare each's memory foot print.   

In [3]:
ray.util.disconnect()

In [4]:
ray.init('ray://{ray_head}:10001'.format(ray_head=os.environ['RAY_CLUSTER']), runtime_env={"working_dir": "tmp/output/"})

2022-07-26 19:20:43,627	INFO packaging.py:388 -- Creating a file package for local directory 'tmp/output/'.
2022-07-26 19:20:43,629	INFO packaging.py:241 -- Pushing file package 'gcs://_ray_pkg_a8f1e79c71aa49ef.zip' (0.00MiB) to Ray cluster...
2022-07-26 19:20:43,631	INFO packaging.py:243 -- Successfully pushed file package 'gcs://_ray_pkg_a8f1e79c71aa49ef.zip'.


ClientContext(dashboard_url='10.128.3.105:8265', python_version='3.8.12', ray_version='1.12.1', ray_commit='4863e33856b54ccf8add5cbe75e41558850a1b75', protocol_version='2022-03-16', _num_clients=1, _context_to_restore=<ray.util.client._ClientContext object at 0x7fead5935bb0>)

In [5]:
file = os.listdir("tmp/output/")[0]
file

'47873bc951f247189d8aa97e11d13ed7_000000.csv'

## pandas

In [6]:
%%time
%memit ds_line = pd.read_csv(f"tmp/output/{file}")
ds_line.shape

peak memory: 170.59 MiB, increment: 1.30 MiB
CPU times: user 75.6 ms, sys: 23.3 ms, total: 98.9 ms
Wall time: 219 ms


(1000, 1)

In [7]:
%%time
%memit ds_line = pd.read_csv(f"tmp/output/{file}")
ds_line.shape

peak memory: 170.90 MiB, increment: 0.06 MiB
CPU times: user 85.5 ms, sys: 21.5 ms, total: 107 ms
Wall time: 225 ms


(1000, 1)

In [8]:
%%time
%memit ds_line[:1000000]

peak memory: 171.15 MiB, increment: 0.00 MiB
CPU times: user 78.5 ms, sys: 48.6 ms, total: 127 ms
Wall time: 257 ms


In [9]:
%%time
%memit ds_line.applymap(lambda x: x *2) 

peak memory: 171.40 MiB, increment: 0.21 MiB
CPU times: user 80.3 ms, sys: 22.5 ms, total: 103 ms
Wall time: 220 ms


In [10]:
%%time
%memit \
ds_line = ds_line[ds_line["value"] > 5]
ds_line.head(5)

peak memory: 172.02 MiB, increment: 0.58 MiB
CPU times: user 86.4 ms, sys: 38.9 ms, total: 125 ms
Wall time: 255 ms


Unnamed: 0,value
6,6
7,7
8,8
9,9
10,10


In [11]:
del ds_line

## Ray Data

In [12]:
### Wokers don't have PVC access...So this won't work like locally 
#%%time
ds_dst = ray.data.read_csv(file) #f"tmp/output/{file}")
print(ds_dst)

Dataset(num_blocks=1, num_rows=None, schema={value: int64})


In [13]:
%%time
%memit ds_dst.take(10)

peak memory: 180.79 MiB, increment: 0.57 MiB
CPU times: user 77 ms, sys: 26.7 ms, total: 104 ms
Wall time: 230 ms


In [14]:
%%time
%memit ds_dst.map_batches(lambda df:  df.applymap(lambda x: x *2), batch_format='pandas') 

Map Progress: 100%|██████████| 1/1 [00:01<00:00,  1.75s/it]

peak memory: 181.09 MiB, increment: 0.29 MiB
CPU times: user 114 ms, sys: 25 ms, total: 139 ms
Wall time: 1.95 s





In [15]:
%%time
%memit ds_dst = ds_dst.map_batches(lambda df: df[df["value"] > 5], batch_format="pandas")
ds_dst.take(10)

Map Progress: 100%|██████████| 1/1 [00:01<00:00,  1.48s/it]

peak memory: 181.12 MiB, increment: 0.02 MiB
CPU times: user 103 ms, sys: 28.8 ms, total: 132 ms
Wall time: 1.68 s





[{'value': 6},
 {'value': 7},
 {'value': 8},
 {'value': 9},
 {'value': 10},
 {'value': 11},
 {'value': 12},
 {'value': 13},
 {'value': 14},
 {'value': 15}]

Running all of the above cells looks to leave you with about a 20Gb memory load... May have to reset the kernel to move forward.

What if we have a distributed dataset? not 1 file and partition?

In [16]:
ray.util.disconnect()

In [17]:
if os.path.exists("tmp/output_dist") == False:
    print("creating dataset")
    %memit \
    ds = ray.data.range(1000)
    print("writing file")
    ds.write_csv("tmp/output_dist")
    del ds
    gc.collect()
else:
    print("files exists")

files exists


In [18]:
ray.util.disconnect()

In [19]:
ray.init('ray://{ray_head}:10001'.format(ray_head=os.environ['RAY_CLUSTER']), runtime_env={"working_dir": "tmp/output_dist/"})

2022-07-26 19:21:34,404	INFO packaging.py:388 -- Creating a file package for local directory 'tmp/output_dist/'.
2022-07-26 19:21:34,443	INFO packaging.py:241 -- Pushing file package 'gcs://_ray_pkg_316e8530ea543222.zip' (0.04MiB) to Ray cluster...
2022-07-26 19:21:34,446	INFO packaging.py:243 -- Successfully pushed file package 'gcs://_ray_pkg_316e8530ea543222.zip'.


ClientContext(dashboard_url='10.128.3.105:8265', python_version='3.8.12', ray_version='1.12.1', ray_commit='4863e33856b54ccf8add5cbe75e41558850a1b75', protocol_version='2022-03-16', _num_clients=1, _context_to_restore=<ray.util.client._ClientContext object at 0x7fead5935bb0>)

In [20]:
%%time
%memit ds_dst = ray.data.read_csv(f"./")
print(ds_dst)

peak memory: 182.55 MiB, increment: 0.87 MiB
Dataset(num_blocks=200, num_rows=None, schema={value: int64})
CPU times: user 110 ms, sys: 19 ms, total: 129 ms
Wall time: 4.52 s


In [21]:
%%time
%memit ds_dst.take(10)

peak memory: 182.55 MiB, increment: 0.00 MiB
CPU times: user 81.7 ms, sys: 16.4 ms, total: 98.1 ms
Wall time: 1.6 s


In [22]:
%%time
%memit ds_dst.map_batches(lambda df:  df.applymap(lambda x: x *2), batch_format='pandas') 

Map Progress: 100%|██████████| 200/200 [00:05<00:00, 37.93it/s]


peak memory: 186.73 MiB, increment: 4.18 MiB
CPU times: user 1.02 s, sys: 155 ms, total: 1.18 s
Wall time: 6.18 s


In [23]:
%%time
%memit ds_dst = ds_dst.map_batches(lambda df: df[df["value"] > 5], batch_format="pandas")
ds_dst.take(10)

Map Progress: 100%|██████████| 200/200 [00:04<00:00, 41.51it/s]

peak memory: 186.81 MiB, increment: 0.02 MiB
CPU times: user 1.14 s, sys: 217 ms, total: 1.36 s
Wall time: 6.45 s





[{'value': 6},
 {'value': 7},
 {'value': 8},
 {'value': 9},
 {'value': 10},
 {'value': 11},
 {'value': 12},
 {'value': 13},
 {'value': 14},
 {'value': 15}]

Above we have evaluated 3 scenarios: Using vanilla pandas with a single dataset, using Ray with a single dataset and using Ray with a distributed dataset for a number of different operations. 

Our generated datasets are 100,000,000 rows long and 1 column wide, consisting only of integers. Below we have recorded the timing and memory results for loading the data, retrieving a slice (subset), applying the square function to each element and applying a filter to the dataset along with the total time taken to perform each step and the memory still in use after the entire set of operations ran. 

#### Ray vs Pandas performance results

_These are the experimental results on an 8 core laptop with 32Gb Memory and should be repeated on an OPF cluster._ 

_TODO: Redo assessment once we get it working on the cluster._

|                   |  Load      | Slice     |  Square     |  Filter   | Total Change | 
|-------------------|------------|-----------|-------------|-----------|--------------|
|Pandas             | 10s, 800mb | 1s, 0mb   | 43s, 0mb    | 2s, 500mb | 56s, 1600mb  |
|Ray (single block) | 9s, 1600mb | 5s,400mb  | 128s,2400mb | 86s,0mb   | 228s, 4400mb |
|Ray (multi block)  | 1s, 1000mb | 5s, 400mb | 35s, 1100mb | 14s, 0mb  | 55s, 2500mb  |


<br/><br/>
From the table above we can see that using Ray data without dividing our dataset object into a reasonable number of blocks performs quite poorly. Its by far the slowest approach for the operations above and uses the most memory overall. 
For a smallish dataset like we are using here (~1Gb) vanilla pandas still works fairly well, however, it is still running as a single process and is not taking full advantage of the available resources. 
With the Ray Dataset divided into 200 blocks we get (in some cases) faster times than pandas with only about 1Gb more memory required. Furthermore, this approach maximizes use of the available resource on the machine. 

We are also able to convert Ray (single block) to Ray (multi block) and get the same increased performance by running a `ds.repartition(200)` command on our dataset. However, it is a somewhat expensive operation and should be avoided if possible.  


_note: These are the experimental results on an 8 core laptop with 32Gb Memory and should be repeated on an OPF cluster._

_note 2: The memory values recorded above from %memit did not seem to accurately capture the amount of memory used by the multiple Ray processes, so the chart reflects total usage from machine while running above code and not the %memit values._



# Part 2: ML Preprocessing 

In this section we will mostly follow the ["dataset ml preprocessing"](https://docs.ray.io/en/latest/data/examples/big_data_ingestion.html) section of the Ray data docs to evaluate some of the "last mile" type of processing we'd want to use Ray for in a machine learning pipeline. Specifically we will perform the following 3 types of operations:

1. Data Cleaning
2. Aggregation and scaling
3. Random Shuffle

The first thing we need to do is create a slightly more complex data set, one that has 3 columns with proper column names. 

In [24]:
ray.util.disconnect()

In [25]:
# make a multi-column data set
if os.path.exists("tmp/output_multi_col") == False:
    print("creating dataset")
    %memit \
    ds = ray.data.from_items([{"A":i%3,"B":i * 2,"C":i * 3} for i in range(200)])
    print("writing file")
    ds.write_csv("tmp/output_multi_col")
    del ds
    gc.collect()
else:
    print("files exists")

files exists


In [26]:
ray.util.disconnect()

In [27]:
ray.init('ray://{ray_head}:10001'.format(ray_head=os.environ['RAY_CLUSTER']), runtime_env={"working_dir": "tmp/output_multi_col/"})

2022-07-26 19:22:16,582	INFO packaging.py:388 -- Creating a file package for local directory 'tmp/output_multi_col/'.
2022-07-26 19:22:16,614	INFO packaging.py:241 -- Pushing file package 'gcs://_ray_pkg_8027f6b348e358d6.zip' (0.04MiB) to Ray cluster...
2022-07-26 19:22:16,616	INFO packaging.py:243 -- Successfully pushed file package 'gcs://_ray_pkg_8027f6b348e358d6.zip'.


ClientContext(dashboard_url='10.128.3.105:8265', python_version='3.8.12', ray_version='1.12.1', ray_commit='4863e33856b54ccf8add5cbe75e41558850a1b75', protocol_version='2022-03-16', _num_clients=1, _context_to_restore=<ray.util.client._ClientContext object at 0x7fead5935bb0>)

#### Data Cleaning

Cool, let's encapsulate all the data cleaning steps we want to perform on our data into a single function. This is good practice in general, but will also let us pass this function to ray to be run in parallel on our dataset. 

All the moves below are arbitrary and selected just to show what's possible :)   

In [28]:
# A Pandas DataFrame UDF for transforming the underlying blocks of a Dataset in parallel.
def transform_batch(df: pd.DataFrame):
    # Drop nulls.
    df = df.dropna(subset=["A"])
    # Add new column.
    df["new_col"] = df["A"] - 2 * df["B"] + df["C"] / 3
    # Transform existing column.
    df["A"] = 2 * df["A"] + 1
    # Drop column.
    df.drop(columns="B", inplace=True)
    # Re-add column 
    df["B"] = df["C"]
    return df

Read in our new dataset 

In [29]:
%%time
%memit ds = ray.data.read_csv(f"./")
print(ds)

peak memory: 187.26 MiB, increment: 0.36 MiB
Dataset(num_blocks=200, num_rows=None, schema={A: int64, B: int64, C: int64})
CPU times: user 102 ms, sys: 26.1 ms, total: 128 ms
Wall time: 4.69 s


Apply the transformations to our dataset in parallel on each block.  

In [30]:
%%time
ds = ds.map_batches(transform_batch, batch_format="pandas")
ds.take(5)

Map Progress: 100%|██████████| 200/200 [00:05<00:00, 36.01it/s]

CPU times: user 1.07 s, sys: 142 ms, total: 1.21 s
Wall time: 6.28 s





[{'A': 1, 'C': 0, 'new_col': 0.0, 'B': 0},
 {'A': 3, 'C': 3, 'new_col': -2.0, 'B': 3},
 {'A': 5, 'C': 6, 'new_col': -4.0, 'B': 6},
 {'A': 1, 'C': 9, 'new_col': -9.0, 'B': 9},
 {'A': 3, 'C': 12, 'new_col': -11.0, 'B': 12}]

AND, for good measure, let's compare timing of loading our dataset and running our cleaning function using regular old pandas.

In [31]:
%%time
files = os.listdir("tmp/output_multi_col")
files = [f"tmp/output_multi_col/{file}" for file in files]
%memit ds_panda = pd.concat(map(pd.read_csv, files))
ds_panda.shape

peak memory: 190.50 MiB, increment: 2.32 MiB
CPU times: user 286 ms, sys: 15 ms, total: 301 ms
Wall time: 410 ms


(200, 3)

In [32]:
%%time
ds_panda = transform_batch(ds_panda)
ds_panda.head(5)

CPU times: user 5.39 ms, sys: 1.08 ms, total: 6.46 ms
Wall time: 5.49 ms


Unnamed: 0,A,C,new_col,B
0,1,531,-531.0,531
0,5,159,-157.0,159
0,5,177,-175.0,177
0,3,516,-515.0,516
0,1,189,-189.0,189


#### Aggregations and Scaling

Now let's looks at a few operations like getting the mean, std, and scaling our data set that require knowledge of the whole dataset making them a little more difficult to parallelize.  

In [33]:
%time
ds.mean("B")

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.72 µs


GroupBy Map: 100%|██████████| 200/200 [00:02<00:00, 75.92it/s] 
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00,  9.19it/s]


298.5

In [34]:
%time
ds.mean(["B", "C"])

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 5.96 µs


GroupBy Map: 100%|██████████| 200/200 [00:02<00:00, 77.09it/s] 
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00,  8.53it/s]


{'mean(B)': 298.5, 'mean(C)': 298.5}

As always, we'll run the same operations with pandas so we have something to compare our results to.  

In [35]:
%time
ds_panda[["B","C"]].mean()

CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 5.96 µs


B    298.5
C    298.5
dtype: float64

In [36]:
%%time
stats = ds.aggregate(Mean("B"), Std("B"), Mean("C"), Std("C"), Mean("new_col"), Std("new_col") )
stats

GroupBy Map: 100%|██████████| 200/200 [00:02<00:00, 78.85it/s] 
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00,  4.95it/s]

CPU times: user 836 ms, sys: 74.3 ms, total: 910 ms
Wall time: 3.34 s





{'mean(B)': 298.5, 'std(B)': 173.63755354185338, 'mean(C)': 298.5, 'std(C)': 173.63755354185338, 'mean(new_col)': -297.505, 'std(new_col)': 173.63656949597816}

In [37]:
def batch_standard_scaler(df: pd.DataFrame):
    def column_standard_scaler(s: pd.Series):
        s_mean = stats[f"mean({s.name})"]
        s_std = stats[f"std({s.name})"]
        return (s - s_mean) / s_std

    cols = df.columns.difference(["A"])
    df.loc[:, cols] = df.loc[:, cols].transform(column_standard_scaler)
    return df

In [38]:
%time
ds = ds.map_batches(batch_standard_scaler, batch_format="pandas")
ds.take(5)

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.96 µs


Map Progress: 100%|██████████| 200/200 [00:03<00:00, 52.67it/s] 


[{'A': 1, 'C': -1.7190981669069068, 'new_col': 1.713377549807507, 'B': -1.7190981669069068},
 {'A': 3, 'C': -1.7018207983952796, 'new_col': 1.701859238856044, 'B': -1.7018207983952796},
 {'A': 5, 'C': -1.6845434298836524, 'new_col': 1.690340927904581, 'B': -1.6845434298836524},
 {'A': 1, 'C': -1.6672660613720252, 'new_col': 1.6615451505259233, 'B': -1.6672660613720252},
 {'A': 3, 'C': -1.6499886928603977, 'new_col': 1.6500268395744604, 'B': -1.6499886928603977}]

#### Shuffle

When running ML training pipelines it is considered good practice to shuffle our training set at the beginning of each epoch. Let's look at a couple different ways we can shuffle our data with Ray.  

* First we will shuffle the whole dataset once
* Then we will shuffle it N times
* Finally, we create a DatasetPipeline object that will shuffle each block when called in an iteration loop (like we would do for training) 


In [39]:
%%time
# Shuffle once
ds = ds.random_shuffle()
print(ds)
ds.take(5)

Shuffle Map: 100%|██████████| 200/200 [00:05<00:00, 35.01it/s]
Shuffle Reduce: 100%|██████████| 200/200 [00:09<00:00, 20.12it/s]


Dataset(num_blocks=200, num_rows=200, schema={A: int64, C: float64, new_col: float64, B: float64})
CPU times: user 8.03 s, sys: 3.09 s, total: 11.1 s
Wall time: 15.8 s


[{'A': 5, 'C': 0.23324447490696723, 'new_col': -0.22745784551401657, 'B': 0.23324447490696723},
 {'A': 5, 'C': -1.062558163465073, 'new_col': 1.0683521365255764, 'B': -1.062558163465073},
 {'A': 3, 'C': -1.0280034264418185, 'new_col': 1.0280380481954556, 'B': -1.0280034264418185},
 {'A': 5, 'C': 1.2180544800697177, 'new_col': -1.2122734318641073, 'B': 1.2180544800697177},
 {'A': 5, 'C': -0.8552297413255465, 'new_col': 0.8610225393992416, 'B': -0.8552297413255465}]

In [40]:
num_epochs = 20

In [41]:
%%time
#Shuffle N times
ds.random_shuffle().repeat(num_epochs)

Shuffle Map: 100%|██████████| 200/200 [00:34<00:00,  5.82it/s]
Shuffle Reduce: 100%|██████████| 200/200 [00:12<00:00, 16.05it/s] 


CPU times: user 9.84 s, sys: 4.24 s, total: 14.1 s
Wall time: 46.9 s


DatasetPipeline(num_windows=20, num_stages=1)

In [44]:
%%time
# create a pipeline that trigger a random shuffle before each batch (epoch)
ds = ds.repeat(num_epochs).random_shuffle_each_window()

n = 0
for i in ds.iter_batches():
    n += len(i)
n 

Stage 0:   0%|          | 0/20 [00:00<?, ?it/s]
  0%|          | 0/20 [00:00<?, ?it/s][A
Stage 0:   5%|▌         | 1/20 [00:00<00:11,  1.61it/s]
Stage 0:  10%|█         | 2/20 [01:02<10:57, 36.53s/it][A
Stage 0:  15%|█▌        | 3/20 [02:24<16:18, 57.57s/it][A
Stage 0:  20%|██        | 4/20 [03:45<17:47, 66.73s/it][A
Stage 0:  25%|██▌       | 5/20 [05:07<18:01, 72.13s/it][A
Stage 0:  30%|███       | 6/20 [06:27<17:30, 75.01s/it][A
Stage 0:  35%|███▌      | 7/20 [07:48<16:40, 76.94s/it][A
Stage 0:  40%|████      | 8/20 [09:10<15:39, 78.33s/it][A
Stage 0:  45%|████▌     | 9/20 [10:32<14:33, 79.44s/it][A
Stage 0:  50%|█████     | 10/20 [11:53<13:19, 79.92s/it][A
Stage 0:  55%|█████▌    | 11/20 [13:15<12:05, 80.56s/it][A
Stage 0:  60%|██████    | 12/20 [14:36<10:45, 80.69s/it][A
Stage 0:  65%|██████▌   | 13/20 [15:57<09:26, 80.96s/it][A
Stage 0:  70%|███████   | 14/20 [17:18<08:05, 80.95s/it][A
Stage 0:  75%|███████▌  | 15/20 [18:40<06:46, 81.24s/it][A
Stage 0:  80%|████████ 

CPU times: user 3min 51s, sys: 1min 38s, total: 5min 30s
Wall time: 26min 38s





4000

Great, so from the above, we can see how to use Ray to apply some common "last mile" data processing types of transformations to our dataset in a parallel fashion.  

# Part 3: Data Pipelines 

In this section we will mostly follow the examples from ["pipelining-compute"](https://docs.ray.io/en/latest/data/pipelining-compute.html) and ["advanced-pipelines"](https://docs.ray.io/en/latest/data/advanced-pipelines.html) from the Ray docs to demonstrate how and when to use "DatasetPipelines". 

According to the docs, "Unlike Datasets, which execute all transformations synchronously, DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training)."

We saw DatasetPipelines a bit in the earlier section for shuffling our data. Here will look into constructing slightly more complex pipelines. 

First things first; Let's build a small dataset we can convert into a DatasetPipeline.  

In [42]:
base = ray.data.range(100000)
print(base)

Dataset(num_blocks=200, num_rows=100000, schema=<class 'int'>)


Now we use `.window()` to convert our Dataset into a DatasetPipeline with 10 blocks per window (20 windows for 200 blocks) 

In [43]:
pipe = base.window(blocks_per_window=10)
print(pipe)

2022-07-26 19:26:27,404	INFO dataset.py:2643 -- Created DatasetPipeline with 20 windows: 0.04MiB min, 0.04MiB max, 0.04MiB mean


DatasetPipeline(num_windows=20, num_stages=2)


Next we want to define some functions we want to apply to our Data through the DatasetPipeline approach. We then use `pipe.map(func_N)` to add them to our pipeline.  

In [44]:
def func1(i):
    return i+1

def func2(i):
    return i *2

def func3(i):
    return i%3

In [45]:
pipe = pipe.map(func1)
pipe = pipe.map(func2)
pipe = pipe.map(func3)
print(pipe)

DatasetPipeline(num_windows=20, num_stages=5)


Once the DatasetPipeline is defined, we have to iterate over it for it to trigger the computations we've defined on it. To do that let's just run a quick for loop over data batches.

In [46]:
num_rows = 0
for row in pipe.iter_batches():
    num_rows += len(row) 
print(num_rows)

Stage 0:   0%|          | 0/20 [00:00<?, ?it/s]
  0%|          | 0/20 [00:00<?, ?it/s][A
Stage 1:   0%|          | 0/20 [00:00<?, ?it/s][A
Stage 0:  10%|█         | 2/20 [00:01<00:15,  1.16it/s][A
Stage 0:  15%|█▌        | 3/20 [00:01<00:09,  1.72it/s][A
Stage 0:  20%|██        | 4/20 [00:02<00:07,  2.15it/s][A
Stage 0:  25%|██▌       | 5/20 [00:02<00:05,  2.64it/s][A
Stage 0:  30%|███       | 6/20 [00:02<00:04,  3.09it/s][A
Stage 0:  35%|███▌      | 7/20 [00:02<00:03,  3.50it/s][A
Stage 0:  40%|████      | 8/20 [00:03<00:03,  3.56it/s][A
Stage 0:  45%|████▌     | 9/20 [00:03<00:02,  3.80it/s][A
Stage 0:  50%|█████     | 10/20 [00:03<00:02,  4.08it/s][A
Stage 0:  55%|█████▌    | 11/20 [00:03<00:02,  3.99it/s][A
Stage 0:  60%|██████    | 12/20 [00:04<00:01,  4.10it/s][A
Stage 0:  65%|██████▌   | 13/20 [00:04<00:01,  4.29it/s][A
Stage 0:  70%|███████   | 14/20 [00:04<00:01,  4.47it/s][A
Stage 0:  75%|███████▌  | 15/20 [00:04<00:01,  4.19it/s][A
Stage 0:  80%|████████  | 16

100000





Great! now we know how to create, define and run DatasetPipelines with Ray!

# Part 4: Large Scale ML Ingest Example

Here we will go ahead an follow the ["Big Data Ingestion"](https://docs.ray.io/en/latest/data/examples/big_data_ingestion.html) example from the Ray docs. 

The goal here is to tie together everything above into a single demo that reflects a more _realistic_ scenario on how we would apply the Ray Data toolkit to a parallel and distributed machine learning use case.  

First thing we will do is define a function called `create_shuffle_pipeline` that will turn our Dataset into a DatasetPipeline that will read in our data for each epoch, shuffle it and split it into equally sized shards for distributed training on multiple workers. 

In [47]:
@ray.remote
def create_shuffle_pipeline(training_data_dir: str, num_epochs: int, num_shards: int):

    return (
        ray.data.read_csv(training_data_dir)
        .repeat(num_epochs)
        .random_shuffle_each_window()
        .split(num_shards, equal=True)
    )

Then we will define our own remote `TrainingWorker` class that iterates over our shards during training. For simplicity we will simple `pass` our training step as we are focused on just the distributed data processing steps here (keep things simple).   

In [48]:
@ray.remote
class TrainingWorker:
    def __init__(self, rank, shard):
        self.rank = rank
        self.shard = shard

    def train(self):
        for epoch, training_dataset in enumerate(self.shard.iter_epochs()):
            # Following code emulates epoch based SGD training.
            print(f"Training... worker: {self.rank}, epoch: {epoch}")
            for i, batch in enumerate(training_dataset.iter_batches()):
                # TODO: replace the code for real training.
                pass

Here we will define the two key variables for this example, the number of Ray workers we'll use and the number of epochs to run. With the appropriate cluster resources, we can scale up our data ingest here by increasing the number of workers.

According to the docs this whole process can be linearly scaled to arbitrarily large data sets (example is 500gb) by adding more nodes to our cluster and increasing our `NUM_TRAINING_WORKERS`.   

In [49]:
NUM_TRAINING_WORKERS = 2
NUM_EPOCHS = 3

Now we create our DatasetPipeline called `splits` and instantiate our list of `TrainingWorkers`. 

In [50]:
%%time
splits = create_shuffle_pipeline.remote(f"./", NUM_EPOCHS, NUM_TRAINING_WORKERS)

CPU times: user 1.87 ms, sys: 2.32 ms, total: 4.19 ms
Wall time: 5.2 ms


In [51]:
splits = ray.get(splits)

In [52]:
%%time
training_workers = [TrainingWorker.options(name=f"{rank}-{shard}").remote(rank, shard) for rank, shard in enumerate(splits)]

CPU times: user 8.35 ms, sys: 304 µs, total: 8.66 ms
Wall time: 11.1 ms


Stage 0:   0%|          | 0/3 [00:00<?, ?it/s]=6776)[0m 
  0%|          | 0/3 [00:00<?, ?it/s][Aor pid=6776)[0m 
Stage 1:   0%|          | 0/3 [00:00<?, ?it/s][A76)[0m 


Finally we use, `ray.get` to train our remote training_workers in parallel! 

In [53]:
%%time
ray.get([worker.train.remote() for worker in training_workers])

Stage 0:  33%|███▎      | 1/3 [00:04<00:09,  4.74s/it]0m 
[2m[36m(PipelineSplitExecutorCoordinator pid=6776)[0m 
Stage 0:  67%|██████▋   | 2/3 [00:16<00:08,  8.83s/it][A
[2m[36m(PipelineSplitExecutorCoordinator pid=6776)[0m 
Stage 0: 100%|██████████| 3/3 [00:27<00:00,  9.78s/it][A


[2m[36m(TrainingWorker pid=448, ip=10.128.3.109)[0m Training... worker: 0, epoch: 0


Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]109)[0m 


[2m[36m(TrainingWorker pid=289, ip=10.131.2.37)[0m Training... worker: 1, epoch: 0


Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]37)[0m 
[2m[36m(PipelineSplitExecutorCoordinator pid=6776)[0m 
Stage 1: 100%|██████████| 3/3 [00:37<00:00, 11.91s/it][A


[2m[36m(TrainingWorker pid=289, ip=10.131.2.37)[0m Training... worker: 1, epoch: 1
[2m[36m(TrainingWorker pid=448, ip=10.128.3.109)[0m Training... worker: 0, epoch: 1
[2m[36m(TrainingWorker pid=448, ip=10.128.3.109)[0m Training... worker: 0, epoch: 2
[2m[36m(TrainingWorker pid=289, ip=10.131.2.37)[0m Training... worker: 1, epoch: 2
CPU times: user 97.6 ms, sys: 28.2 ms, total: 126 ms
Wall time: 33.7 s


Stage 0: 100%|██████████| 1/1 [00:10<00:00, 10.01s/it]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]37)[0m 
Stage 0: 100%|██████████| 1/1 [00:10<00:00, 10.60s/it] 
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 13.37it/s] 
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]109)[0m 
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 12.40it/s]
Stage 0:   0%|          | 0/1 [00:00<?, ?it/s]37)[0m 


[None, None]

Stage 0: 100%|██████████| 1/1 [00:00<00:00, 12.67it/s] 
Stage 0: 100%|██████████| 1/1 [00:00<00:00, 10.82it/s]


In [54]:
ray.util.disconnect()

### Congrats!

If you are looking at this cell and there are no error above, you know that Ray Data is working! 