In [1]:
import ray

# Create a Dataset of Python objects.
ds = ray.data.range(10000)
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)

ds.take(5)
# -> [0, 1, 2, 3, 4]

ds.count()
# -> 10000

# Create a Dataset of Arrow records.
ds = ray.data.from_items([{"col1": i, "col2": str(i)} for i in range(10000)])
# -> Dataset(num_blocks=200, num_rows=10000, schema={col1: int64, col2: string})

ds.show(5)
# -> {'col1': 0, 'col2': '0'}
# -> {'col1': 1, 'col2': '1'}
# -> {'col1': 2, 'col2': '2'}
# -> {'col1': 3, 'col2': '3'}
# -> {'col1': 4, 'col2': '4'}

ds.schema()
# -> col1: int64
# -> col2: string

2022-04-23 10:28:11,252	INFO services.py:1412 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
{'col1': 3, 'col2': '3'}
{'col1': 4, 'col2': '4'}


col1: int64
col2: string

# reference to here 
[ref link](https://docs.ray.io/en/latest/data/getting-started.html)
# above expect output
```
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
{'col1': 3, 'col2': '3'}
{'col1': 4, 'col2': '4'}
col1: int64
col2: string
```

In [2]:
# Write to csv files in /tmp/output.
ray.data.range(10000).write_csv("/tmp/output")
# -> /tmp/output/data0.csv, /tmp/output/data1.csv, ...

# Use repartition to control the number of output files:
ray.data.range(10000).repartition(1).write_csv("/tmp/output2")
# -> /tmp/output2/data0.csv

Write Progress: 100%|████████████████████████████████████████████████| 200/200 [00:00<00:00, 405.00it/s]
Repartition: 100%|████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  5.84it/s]
Write Progress: 100%|█████████████████████████████████████████████████████| 1/1 [00:00<00:00,  7.08it/s]


# above expect output
```
Write Progress: 100%|████████████████████████████████████████████████| 200/200 [00:00<00:00, 345.12it/s]
Repartition: 100%|████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  8.42it/s]
Write Progress: 100%|████████████████████████████████████████████████████| 1/1 [00:00<00:00, 131.62it/s]
```
# write result
(ray_tf2d7) ziyu4huang@Ziyu-MBA-M1 /tmp % du -sh output*
800K    output
 48K    output2

In [5]:
import pandas as pd
import dask.dataframe as dd

# Create a Dataset from a list of Pandas DataFrame objects.
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.data.from_pandas([pdf])

# Create a Dataset from a Dask-on-Ray DataFrame.
dask_df = dd.from_pandas(pdf, npartitions=10)
ds = ray.data.from_dask(dask_df)

In [4]:
import ray

ds = ray.data.range(10000)
ds = ds.map(lambda x: x * 2)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5)
# -> [0, 2, 4, 6, 8]

ds.filter(lambda x: x > 5).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1859.63it/s]
# -> [6, 8, 10, 12, 14]

ds.flat_map(lambda x: [x, -x]).take(5)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1568.10it/s]
# -> [0, 0, 2, -2, 4]

Map Progress: 100%|█████████████████████████████████████████████████| 200/200 [00:00<00:00, 3013.05it/s]
Map Progress: 100%|█████████████████████████████████████████████████| 200/200 [00:00<00:00, 2706.29it/s]
Map Progress: 100%|█████████████████████████████████████████████████| 200/200 [00:00<00:00, 2766.33it/s]


[0, 0, 2, -2, 4]

### eee
# tht
[user draw io in jupter lab](https://zhuanlan.zhihu.com/p/70908238)