Ray Data loads data from various sources. This guide shows you how to:
- Read files like images
- Load in-memory data like pandas DataFrames
- Read databases like MySQL
Ray Data reads files from local disk or cloud storage in a variety of file formats. To view the full list of supported file formats, see the Input/Output reference <input-output>
.
Parquet
To read Parquet files, call ~ray.data.read_parquet
.
import ray
ds = ray.data.read_parquet("local:///tmp/iris.parquet")
print(ds.schema())
Column Type ------ ----sepal.length double sepal.width double petal.length double petal.width double variety string
Images
To read raw images, call ~ray.data.read_images
. Ray Data represents images as NumPy ndarrays.
import ray
ds = ray.data.read_images("local:///tmp/batoidea/JPEGImages/")
print(ds.schema())
Column Type ------ ----image numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
Text
To read lines of text, call ~ray.data.read_text
.
import ray
ds = ray.data.read_text("local:///tmp/this.txt")
print(ds.schema())
Column Type ------ ----text string
CSV
To read CSV files, call ~ray.data.read_csv
.
import ray
ds = ray.data.read_csv("local:///tmp/iris.csv")
print(ds.schema())
Column Type ------ ----sepal length (cm) double sepal width (cm) double petal length (cm) double petal width (cm) double target int64
Binary
To read raw binary files, call ~ray.data.read_binary_files
.
import ray
ds = ray.data.read_binary_files("local:///tmp/file.dat")
print(ds.schema())
Column Type ------ ----bytes binary
TFRecords
To read TFRecords files, call ~ray.data.read_tfrecords
.
import ray
ds = ray.data.read_tfrecords("local:///tmp/iris.tfrecords")
print(ds.schema())
Column Type ------ ----sepal length (cm) double sepal width (cm) double petal length (cm) double petal width (cm) double target int64
To read files from local disk, call a function like ~ray.data.read_parquet
and specify paths with the local://
schema. Paths can point to files or directories.
To read formats other than Parquet, see the Input/Output reference <input-output>
.
Tip
If your files are accessible on every node, exclude local://
to parallelize the read tasks across the cluster.
import ray
ds = ray.data.read_parquet("local:///tmp/iris.parquet")
print(ds.schema())
Column Type ------ ----sepal.length double sepal.width double petal.length double petal.width double variety string
To read files in cloud storage, authenticate all nodes with your cloud service provider. Then, call a method like ~ray.data.read_parquet
and specify URIs with the appropriate schema. URIs can point to buckets, folders, or objects.
To read formats other than Parquet, see the Input/Output reference <input-output>
.
S3
To read files from Amazon S3, specify URIs with the s3://
scheme.
import ray
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
print(ds.schema())
Column Type ------ ----sepal.length double sepal.width double petal.length double petal.width double variety string
GCS
To read files from Google Cloud Storage, install the Filesystem interface to Google Cloud Storage
pip install gcsfs
Then, create a GCSFileSystem
and specify URIs with the gcs://
scheme.
import ray
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
print(ds.schema())
Column Type ------ ----sepal.length double sepal.width double petal.length double petal.width double variety string
ABL
To read files from Azure Blob Storage, install the Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage
pip install adlfs
Then, create a AzureBlobFileSystem
and specify URIs with the az:// scheme.
import adlfs import ray
- ds = ray.data.read_parquet(
"az://ray-example-data/iris.parquet", adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)
print(ds.schema())
Column Type ------ ----sepal.length double sepal.width double petal.length double petal.width double variety string
To read files from NFS filesystems, call a function like ~ray.data.read_parquet
and specify files on the mounted filesystem. Paths can point to files or directories.
To read formats other than Parquet, see the Input/Output reference <input-output>
.
import ray
ds = ray.data.read_parquet("/mnt/cluster_storage/iris.parquet")
print(ds.schema())
Column Type ------ ----sepal.length double sepal.width double petal.length double petal.width double variety string
To read a compressed file, specify compression
in arrow_open_stream_args
. You can use any codec supported by Arrow.
import ray
- ds = ray.data.read_csv(
"s3://anonymous@ray-example-data/iris.csv.gz", arrow_open_stream_args={"compression": "gzip"},
)
Ray Data interoperates with libraries like pandas, NumPy, and Arrow.
Python objects
To create a ~ray.data.dataset.Dataset
from Python objects, call ~ray.data.from_items
and pass in a list of Dict
. Ray Data treats each Dict
as a row.
import ray
- ds = ray.data.from_items([
{"food": "spam", "price": 9.34}, {"food": "ham", "price": 5.37}, {"food": "eggs", "price": 0.94}
])
print(ds)
- MaterializedDataset(
num_blocks=3, num_rows=3, schema={food: string, price: double}
)
You can also create a ~ray.data.dataset.Dataset
from a list of regular Python objects.
import ray
ds = ray.data.from_items([1, 2, 3, 4, 5])
print(ds)
MaterializedDataset(num_blocks=5, num_rows=5, schema={item: int64})
NumPy
To create a ~ray.data.dataset.Dataset
from a NumPy array, call ~ray.data.from_numpy
. Ray Data treats the outer axis as the row dimension.
import numpy as np import ray
array = np.ones((3, 2, 2)) ds = ray.data.from_numpy(array)
print(ds)
- MaterializedDataset(
num_blocks=1, num_rows=3, schema={data: numpy.ndarray(shape=(2, 2), dtype=double)}
)
pandas
To create a ~ray.data.dataset.Dataset
from a pandas DataFrame, call ~ray.data.from_pandas
.
import pandas as pd import ray
- df = pd.DataFrame({
"food": ["spam", "ham", "eggs"], "price": [9.34, 5.37, 0.94]
}) ds = ray.data.from_pandas(df)
print(ds)
- MaterializedDataset(
num_blocks=1, num_rows=3, schema={food: object, price: float64}
)
PyArrow
To create a ~ray.data.dataset.Dataset
from an Arrow table, call ~ray.data.from_arrow
.
import pyarrow as pa
- table = pa.table({
"food": ["spam", "ham", "eggs"], "price": [9.34, 5.37, 0.94]
}) ds = ray.data.from_arrow(table)
print(ds)
- MaterializedDataset(
num_blocks=1, num_rows=3, schema={food: string, price: double}
)
Ray Data interoperates with distributed data processing frameworks like Dask <dask-on-ray>
, Spark <spark-on-ray>
, Modin <modin-on-ray>
, and Mars <mars-on-ray>
.
Dask
To create a ~ray.data.dataset.Dataset
from a Dask DataFrame, call ~ray.data.from_dask
. This function constructs a Dataset
backed by the distributed Pandas DataFrame partitions that underly the Dask DataFrame.
import dask.dataframe as dd import pandas as pd import ray
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) ddf = dd.from_pandas(df, npartitions=4) # Create a Dataset from a Dask DataFrame. ds = ray.data.from_dask(ddf)
ds.show(3)
{'string': 'spam', 'number': 0} {'string': 'ham', 'number': 1} {'string': 'eggs', 'number': 2}
Spark
To create a ~ray.data.dataset.Dataset
from a Spark DataFrame, call ~ray.data.from_spark
. This function creates a Dataset
backed by the distributed Spark DataFrame partitions that underly the Spark DataFrame.
import ray import raydp
- spark = raydp.init_spark(app_name="Spark -> Datasets Example",
num_executors=2, executor_cores=2, executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"]) ds = ray.data.from_spark(df)
ds.show(3)
{'col1': 0, 'col2': '0'} {'col1': 1, 'col2': '1'} {'col1': 2, 'col2': '2'}
Modin
To create a ~ray.data.dataset.Dataset
from a Modin DataFrame, call ~ray.data.from_modin
. This function constructs a Dataset
backed by the distributed Pandas DataFrame partitions that underly the Modin DataFrame.
import modin.pandas as md import pandas as pd import ray
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) mdf = md.DataFrame(df) # Create a Dataset from a Modin DataFrame. ds = ray.data.from_modin(mdf)
ds.show(3)
{'col1': 0, 'col2': '0'} {'col1': 1, 'col2': '1'} {'col1': 2, 'col2': '2'}
Mars
To create a ~ray.data.dataset.Dataset
from a Mars DataFrame, call ~ray.data.from_mars
. This function constructs a Dataset
backed by the distributed Pandas DataFrame partitions that underly the Mars DataFrame.
import mars import mars.dataframe as md import pandas as pd import ray
cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))}) mdf = md.DataFrame(df, num_partitions=8) # Create a tabular Dataset from a Mars DataFrame. ds = ray.data.from_mars(mdf)
ds.show(3)
{'col1': 0, 'col2': '0'} {'col1': 1, 'col2': '1'} {'col1': 2, 'col2': '2'}
Ray Data interoperates with HuggingFace and TensorFlow datasets.
HuggingFace
To convert a 🤗 Dataset to a Ray Datasets, call ~ray.data.from_huggingface
. This function accesses the underlying Arrow table and converts it to a Dataset directly.
Warning
~ray.data.from_huggingface
doesn't support parallel reads. This isn't an issue with in-memory 🤗 Datasets, but may fail with large memory-mapped 🤗 Datasets. Also, 🤗 IterableDataset
objects aren't supported.
import ray.data from datasets import load_dataset
hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1") ray_ds = ray.data.from_huggingface(hf_ds["train"]) ray_ds.take(2)
[{'text': ''}, {'text': ' = Valkyria Chronicles III = n'}]
TensorFlow
To convert a TensorFlow dataset to a Ray Dataset, call ~ray.data.from_tf
.
Warning
~ray.data.from_tf
doesn't support parallel reads. Only use this function with small datasets like MNIST or CIFAR.
import ray import tensorflow_datasets as tfds
tf_ds, _ = tfds.load("cifar10", split=["train", "test"]) ds = ray.data.from_tf(tf_ds)
print(ds)
- MaterializedDataset(
num_blocks=..., num_rows=50000, schema={ id: binary, image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8), label: int64 }
)
Ray Data reads from databases like MySQL, PostgreSQL, and MongoDB.
Call ~ray.data.read_sql
to read data from a database that provides a Python DB API2-compliant connector.
MySQL
To read from MySQL, install MySQL Connector/Python. It's the first-party MySQL database connector.
pip install mysql-connector-python
Then, define your connection logic and query the database.
import mysql.connector
import ray
- def create_connection():
- return mysql.connector.connect(
user="admin", password=..., host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com", connection_timeout=30, database="example",
)
# Get all movies dataset = ray.data.read_sql("SELECT * FROM movie", create_connection) # Get movies after the year 1980 dataset = ray.data.read_sql( "SELECT title, score FROM movie WHERE year >= 1980", create_connection ) # Get the number of movies per year dataset = ray.data.read_sql( "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection )
PostgreSQL
To read from PostgreSQL, install Psycopg 2. It's the most popular PostgreSQL database connector.
pip install psycopg2-binary
Then, define your connection logic and query the database.
import psycopg2
import ray
- def create_connection():
- return psycopg2.connect(
user="postgres", password=..., host="example-postgres-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com", dbname="example",
)
# Get all movies dataset = ray.data.read_sql("SELECT * FROM movie", create_connection) # Get movies after the year 1980 dataset = ray.data.read_sql( "SELECT title, score FROM movie WHERE year >= 1980", create_connection ) # Get the number of movies per year dataset = ray.data.read_sql( "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection )
Snowflake
To read from Snowflake, install the Snowflake Connector for Python.
pip install snowflake-connector-python
Then, define your connection logic and query the database.
import snowflake.connector
import ray
- def create_connection():
- return snowflake.connector.connect(
user=..., password=... account="ZZKXUVH-IPB52023", database="example",
)
# Get all movies dataset = ray.data.read_sql("SELECT * FROM movie", create_connection) # Get movies after the year 1980 dataset = ray.data.read_sql( "SELECT title, score FROM movie WHERE year >= 1980", create_connection ) # Get the number of movies per year dataset = ray.data.read_sql( "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection )
Databricks
To read from Databricks, set the DATABRICKS_HOST
environment variable to your Databricks warehouse access token.
export DATABRICKS_TOKEN=...
If you're running your program on the Databricks runtime, also set the DATABRICKS_HOST
environment variable.
export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net
Then, call ray.data.read_databricks_tables
to read from the Databricks SQL warehouse.
import ray
- dataset = ray.data.read_databricks_tables(
warehouse_id='a885ad08b64951ad', # Databricks SQL warehouse ID catalog='catalog_1', # Unity catalog name schema='db_1', # Schema name query="SELECT title, score FROM movie WHERE year >= 1980",
)
To read from BigQuery, install the Python Client for Google BigQuery and the Python Client for Google BigQueryStorage.
pip install google-cloud-bigquery
pip install google-cloud-bigquery-storage
To read data from BigQuery, call ~ray.data.read_bigquery
and specify the project id, dataset, and query (if applicable).
import ray
# Read the entire dataset (do not specify query) ds = ray.data.read_bigquery( project_id="my_gcloud_project_id", dataset="bigquery-public-data.ml_datasets.iris", )
# Read from a SQL query of the dataset (do not specify dataset) ds = ray.data.read_bigquery( project_id="my_gcloud_project_id", query = "SELECT * FROM bigquery-public-data.ml_datasets.iris LIMIT 50", )
# Write back to BigQuery ds.write_bigquery( project_id="my_gcloud_project_id", dataset="destination_dataset.destination_table", )
To read data from MongoDB, call ~ray.data.read_mongo
and specify the source URI, database, and collection. You also need to specify a pipeline to run against the collection.
import ray
# Read a local MongoDB. ds = ray.data.read_mongo( uri="mongodb://localhost:27017", database="my_db", collection="my_collection", pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}], )
# Reading a remote MongoDB is the same. ds = ray.data.read_mongo( uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", database="my_db", collection="my_collection", pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}], )
# Write back to MongoDB. ds.write_mongo( MongoDatasource(), uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", database="my_db", collection="my_collection", )
Synthetic datasets can be useful for testing and benchmarking.
Int Range
To create a synthetic ~ray.data.Dataset
from a range of integers, call ~ray.data.range
. Ray Data stores the integer range in a single column.
import ray
ds = ray.data.range(10000)
print(ds.schema())
Column Type ------ ----id int64
Tensor Range
To create a synthetic ~ray.data.Dataset
containing arrays, call ~ray.data.range_tensor
. Ray Data packs an integer range into ndarrays of the provided shape.
import ray
ds = ray.data.range_tensor(10, shape=(64, 64))
print(ds.schema())
Column Type ------ ----data numpy.ndarray(shape=(64, 64), dtype=int64)
If Ray Data can't load your data, subclass ~ray.data.datasource.Datasource
. Then, construct an instance of your custom datasource and pass it to ~ray.data.read_datasource
.
# Read from a custom datasource. ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)
# Write to a custom datasource. ds.write_datasource(YourCustomDatasource(), **write_args)
The dataset parallelism
determines the number of blocks the base data is split into for parallel reads. Ray Data decides internally how many read tasks to run concurrently to best utilize the cluster, ranging from 1...parallelism
tasks. In other words, the higher the parallelism, the smaller the data blocks in the Dataset and hence the more opportunity for parallel execution.
You can override the default parallelism by setting the parallelism
argument. For more information on how to tune the read parallelism, see Advanced: Performance Tips and Tuning <data_performance_tips>
.