# Ray Data (still in beta)
Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. Datasets provide basic distributed data transformations such as map, filter, and repartition, and are compatible with a variety of file formats, datasources, and distributed frameworks.

In [1]:
import ray
ray.init()

2021-12-26 15:13:58,270	INFO services.py:1340 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.84.193',
 'raylet_ip_address': '192.168.84.193',
 'redis_address': '192.168.84.193:6379',
 'object_store_address': '/tmp/ray/session_2021-12-26_15-13-55_562074_2086/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-12-26_15-13-55_562074_2086/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2021-12-26_15-13-55_562074_2086',
 'metrics_export_port': 51466,
 'node_id': '622069df4572a1f75c22ab53831780238c4f84e569661c5c3ad0ffdb'}

In [2]:
ds = ray.data.range(10000)

In [3]:
print(type(ds))

<class 'ray.data.dataset.Dataset'>


In [8]:
!ray memory

Grouping by node address...        Sorting by object size...        Display allentries per group...


--- Summary for node address: 192.168.84.193 ---
Mem Used by Objects  Local References  Pinned        Pending Tasks  Captured in Objects  Actor Handles
70000.0 B            400, (69800.0 B)  0, (0.0 B)    0, (0.0 B)     0, (0.0 B)           13, (-13.0 B)

--- Object references for node address: 192.168.84.193 ---
IP Address | PID | Type | Call Site | Size | Reference Type | Object Ref

192.168.84.193 | 1898 | Worker |           | ?    | ACTOR_HANDLE   | ffffffffffffffffc2864d56385a6324af4fff2e0100000001000000


192.168.84.193 | 1900 | Worker |           | ?    | ACTOR_HANDLE   | ffffffffffffffffc2864d56385a6324af4fff2e0100000001000000


192.168.84.193 | 1899 | Worker |           | ?    | ACTOR_HANDLE   | ffffffffffffffffc2864d56385a6324af4fff2e0100000001000000


192.168.84.193 | 1576 | Worker |           | ?    | ACTOR_HANDLE   | ffffffffffffffffc2864d56385a6324af4fff2e0100000001000000

In [6]:
ds

Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

In [6]:
print(ds.get_internal_block_refs()[:3])

[ObjectRef(69a6825d641b4613ffffffffffffffffffffffff0100000002000000), ObjectRef(ee4e90da584ab0ebffffffffffffffffffffffff0100000002000000), ObjectRef(4ee449587774c1f0ffffffffffffffffffffffff0100000002000000)]


In [7]:
print(ray.get(ds.get_internal_block_refs()[0]))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]


In [8]:
ds.mean()

GroupBy Map: 100%|██████████| 200/200 [00:00<00:00, 306.37it/s]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 182.65it/s]


4999.5

## read parquet

In [9]:
import pandas as pd
pd.DataFrame([(1,2,3)]* 1000000, columns=['col1', 'col2', 'col3']).to_parquet('/tmp/tmp_ex_07.parquet')
pd.DataFrame([(1,2,3)]* 1000000, columns=['col1', 'col2', 'col3']).to_csv('/tmp/tmp_ex_07.csv')

In [10]:
!ls -l --block-size M /tmp/

total 13M
drwxrwxrwx 18 root root  1M Dec 25 19:43 ray
-rw-------  1 root root  1M Dec  3 02:34 tmp7xk4pjtb
-rw-r--r--  1 root root 13M Dec 25 19:43 tmp_ex_07.csv
-rw-r--r--  1 root root  1M Dec 25 19:43 tmp_ex_07.parquet


### read parquet

In [11]:
df = ray.data.read_parquet(['/tmp/tmp_ex_07.parquet'] * 3)


In [12]:
df

Dataset(num_blocks=3, num_rows=3000000, schema={col1: int64, col2: int64, col3: int64})

##### check re-partition effects

In [25]:
df = ray.data.read_parquet(['/tmp/tmp_ex_07.parquet'] * 3)
df.mean()

GroupBy Map: 100%|██████████| 3/3 [00:21<00:00,  7.28s/it]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 482.77it/s]

CPU times: user 470 ms, sys: 71.5 ms, total: 541 ms
Wall time: 22 s





{'mean(col1)': 1.0, 'mean(col2)': 2.0, 'mean(col3)': 3.0}

In [27]:
%%time
# parquet
df = ray.data.read_parquet(['/tmp/tmp_ex_07.parquet'] * 3).repartition(8)
df.mean()

Repartition: 100%|██████████| 8/8 [00:00<00:00, 339.51it/s]
GroupBy Map: 100%|██████████| 8/8 [00:17<00:00,  2.22s/it]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 371.77it/s]

CPU times: user 480 ms, sys: 77.8 ms, total: 558 ms
Wall time: 18 s





{'mean(col1)': 1.0, 'mean(col2)': 2.0, 'mean(col3)': 3.0}

### read_csv

In [28]:
%%time
df = ray.data.read_csv(['/tmp/tmp_ex_07.csv'] * 3)
df.mean()

GroupBy Map: 100%|██████████| 3/3 [00:28<00:00,  9.60s/it]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 479.62it/s]

CPU times: user 548 ms, sys: 120 ms, total: 668 ms
Wall time: 29.2 s





{'mean()': 499999.5, 'mean(col1)': 1.0, 'mean(col2)': 2.0, 'mean(col3)': 3.0}

In [29]:
%%time
df = ray.data.read_csv(['/tmp/tmp_ex_07.csv'] * 3).repartition(8)
df.mean()

Repartition: 100%|██████████| 8/8 [00:00<00:00, 318.12it/s]
GroupBy Map: 100%|██████████| 8/8 [00:27<00:00,  3.44s/it]
GroupBy Reduce: 100%|██████████| 1/1 [00:00<00:00, 347.24it/s]

CPU times: user 692 ms, sys: 170 ms, total: 861 ms
Wall time: 28.1 s





{'mean()': 499999.5, 'mean(col1)': 1.0, 'mean(col2)': 2.0, 'mean(col3)': 3.0}

# Modin

In [2]:
!ray memory

Grouping by node address...        Sorting by object size...        Display allentries per group...


To record callsite information for each ObjectRef created, set env variable RAY_record_ref_creation_sites=1

--- Aggregate object store stats across all nodes ---
Plasma memory usage 0 MiB, 0 objects, 0.0% full, 0.0% needed

[0m

In [3]:
import modin.pandas as mod_pd
import pandas as pd

In [10]:
pd.DataFrame([(1,2,3)]* 1000000, columns=['col1', 'col2', 'col3']).to_csv('/tmp/tmp_ex_07.csv', index=False)
# pd.DataFrame([(1,2,3)]* 10000000, columns=['col1', 'col2', 'col3']).to_csv('/tmp/tmp_ex_07_10x.csv', index=False)
# pd.DataFrame([(1,2,3)]* 30000000, columns=['col1', 'col2', 'col3']).to_csv('/tmp/tmp_ex_07_30x.csv', index=False)

## 1000000 rows, 3 col csv

In [29]:
#### modin

In [36]:
file = '/tmp/tmp_ex_07.csv'
mod_df = mod_pd.read_csv(file)

In [31]:
!ray memory

Grouping by node address...        Sorting by object size...        Display allentries per group...


--- Summary for node address: 192.168.84.193 ---
Mem Used by Objects  Local References  Pinned        Pending Tasks  Captured in Objects  Actor Handles
24007064.0 B         8, (24007064.0 B)  0, (0.0 B)    0, (0.0 B)     0, (0.0 B)           0, (0.0 B)   

--- Object references for node address: 192.168.84.193 ---
IP Address | PID | Type | Call Site | Size | Reference Type | Object Ref

192.168.84.193 | 2086 | Driver |           | 3000715.0 B | LOCAL_REFERENCE | f6d59484161292e1ffffffffffffffffffffffff0100000001000000


192.168.84.193 | 2086 | Driver |           | 3000907.0 B | LOCAL_REFERENCE | 780ab616ab31e1bfffffffffffffffffffffffff0100000001000000


192.168.84.193 | 2086 | Driver |           | 3000907.0 B | LOCAL_REFERENCE | 46801c25ff7108dcffffffffffffffffffffffff0100000001000000


192.168.84.193 | 2086 | Driver |           | 3000907.0 B | LOCAL_REFERENCE | f0da08b1cd55d36dfffffff

In [37]:
%%timeit -n 100
mod_df.mean()

[2m[36m(deploy_ray_func pid=16587)[0m 
30.5 ms ± 2.83 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [23]:
%%timeit -n 10
file = '/tmp/tmp_ex_07.csv'
mod_df = mod_pd.read_csv(file)
mod_df.mean()

93.2 ms ± 1.68 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [30]:
#### pure pandas

In [24]:
%%timeit -n 10
file = '/tmp/tmp_ex_07.csv'
pd_df = pd.read_csv(file)
pd_df.mean()

95.2 ms ± 2.22 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [27]:
file = '/tmp/tmp_ex_07.csv'
pd_df = pd.read_csv(file)

In [28]:
%%timeit -n 100
pd_df.mean()

3.35 ms ± 383 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


## 10x rows

In [49]:
%%timeit -n 10
file = '/tmp/tmp_ex_07_10x.csv'
mod_df = mod_pd.read_csv(file)
mod_df.mean()

[2m[36m(deploy_ray_func pid=13674)[0m 
[2m[36m(apply_list_of_funcs pid=13675)[0m 
533 ms ± 22.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [51]:
%%timeit -n 10
file = '/tmp/tmp_ex_07_10x.csv'
pd_df = pd.read_csv(file)
pd_df.mean()

1.12 s ± 47.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


## 30x rows

In [62]:
%%timeit -n 3
file = '/tmp/tmp_ex_07_30x.csv'
mod_df = mod_pd.read_csv(file)
mod_df.mean()

1.99 s ± 248 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)


In [63]:
%%timeit -n 3
file = '/tmp/tmp_ex_07_30x.csv'
pd_df = pd.read_csv(file)
pd_df.mean()

3.69 s ± 609 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)


In [None]:
# 30x rows- mean()

In [32]:
file = '/tmp/tmp_ex_07_30x.csv'
mod_df = mod_pd.read_csv(file)

In [33]:
%%timeit -n 100
mod_df.mean()

[2m[36m(apply_list_of_funcs pid=16642)[0m 


60.4 ms ± 13.7 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
[2m[36m(apply_list_of_funcs pid=2231)[0m 
[2m[36m(apply_list_of_funcs pid=16642)[0m 


In [34]:
file = '/tmp/tmp_ex_07_30x.csv'
pd_df = pd.read_csv(file)

[2m[36m(apply_list_of_funcs pid=2237)[0m 


In [35]:
%%timeit -n 100
pd_df.mean()

93.6 ms ± 12.2 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
