Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Auto increasing block size for read_json #42357

Merged
merged 13 commits into from
Jan 22, 2024
56 changes: 54 additions & 2 deletions python/ray/data/datasource/json_datasource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from ray.data._internal.dataset_logger import DatasetLogger
from ray.data.context import DataContext
from ray.data.datasource.file_based_datasource import FileBasedDatasource
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
import pyarrow

logger = DatasetLogger(__name__)


@PublicAPI
class JSONDatasource(FileBasedDatasource):
Expand Down Expand Up @@ -34,6 +38,54 @@ def __init__(

# TODO(ekl) The PyArrow JSON reader doesn't support streaming reads.
def _read_stream(self, f: "pyarrow.NativeFile", path: str):
from pyarrow import json
from io import BytesIO

from pyarrow import ArrowInvalid, json

# When reading large files, the default block size configured in PyArrow can be
# too small, resulting in the following error: `pyarrow.lib.ArrowInvalid:
# straddling object straddles two block boundaries (try to increase block
# size?)`. The read will be retried with geometrically increasing block size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we also include the arrow issue link here?

# until the size reaches `DataContext.get_current().target_max_block_size`.
# The initial block size will start at the PyArrow default block size or it can
# be manually set through the `read_options` parameter as follows.
#
# >>> import pyarrow.json as pajson
# >>> block_size = 10 << 20 # Set block size to 10MB
# >>> ray.data.read_json( # doctest: +SKIP
# ... "s3://anonymous@ray-example-data/log.json",
# ... read_options=pajson.ReadOptions(block_size=block_size)
# ... )

yield json.read_json(f, read_options=self.read_options, **self.arrow_json_args)
buffer = f.read_buffer()
init_block_size = self.read_options.block_size
max_block_size = DataContext.get_current().target_max_block_size
while True:
try:
yield json.read_json(
BytesIO(buffer),
read_options=self.read_options,
**self.arrow_json_args,
)
self.read_options.block_size = init_block_size
break
except ArrowInvalid as e:
if isinstance(e, ArrowInvalid) and "straddling" in str(e):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this check inside the except block right? let's also compare to a longer string in case there are other error messages which use the word straddling:

Suggested change
if isinstance(e, ArrowInvalid) and "straddling" in str(e):
if "straddling object straddles two block boundaries" in str(e):

if self.read_options.block_size < max_block_size:
# Increase the block size in case it was too small.
logger.get_logger(log_to_stdout=False).info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use True for this since it is important, so that the user always sees this message in stdout logs

f"JSONDatasource read failed with "
f"block_size={self.read_options.block_size}. Retrying with "
f"block_size={self.read_options.block_size * 2}."
)
self.read_options.block_size *= 2
else:
raise ArrowInvalid(
f"{e} - Auto-increasing block size to "
f"{self.read_options.block_size}B failed. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
f"{self.read_options.block_size}B failed. "
f"{self.read_options.block_size} bytes failed. "

f"More information on this issue can be found here: "
f"https://github.com/apache/arrow/issues/25674"
)
else:
# unrelated error, simply reraise
raise e
14 changes: 0 additions & 14 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,20 +1042,6 @@ def read_json(
>>> ds.take(1)
[{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}]

When reading large files, the default block size configured in PyArrow can be too small,
resulting in the following error:
``pyarrow.lib.ArrowInvalid: straddling object straddles two block boundaries
(try to increase block size?)``.

To resolve this, use the ``read_options`` parameter to set a larger block size:

>>> import pyarrow.json as pajson
>>> block_size = 10 << 20 # Set block size to 10MB
>>> ray.data.read_json( # doctest: +SKIP
... "s3://anonymous@ray-example-data/log.json",
... read_options=pajson.ReadOptions(block_size=block_size)
... )

Args:
paths: A single file or directory, or a list of file or directory paths.
A list of paths can contain both files and directories.
Expand Down
69 changes: 69 additions & 0 deletions python/ray/data/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,75 @@ def test_json_write_block_path_provider(
assert df.equals(ds_df)


@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
],
)
def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoint_url):
if endpoint_url is None:
storage_options = {}
else:
storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url))

# Single small file, unit block_size
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path1 = os.path.join(data_path, "test1.json")
df1.to_json(path1, orient="records", lines=True, storage_options=storage_options)
ds = ray.data.read_json(
path1, filesystem=fs, read_options=pajson.ReadOptions(block_size=1)
)
dsdf = ds.to_pandas()
assert df1.equals(dsdf)
# Test metadata ops.
assert ds.count() == 3
assert ds.input_files() == [_unwrap_protocol(path1)]
assert "{one: int64, two: string}" in str(ds), ds

# Single large file, default block_size
num_chars = 2500000
num_rows = 3
df2 = pd.DataFrame(
{
"one": ["a" * num_chars for _ in range(num_rows)],
"two": ["b" * num_chars for _ in range(num_rows)],
}
)
path2 = os.path.join(data_path, "test2.json")
df2.to_json(path2, orient="records", lines=True, storage_options=storage_options)
ds = ray.data.read_json(path2, filesystem=fs)
dsdf = ds.to_pandas()
assert df2.equals(dsdf)
# Test metadata ops.
assert ds.count() == num_rows
assert ds.input_files() == [_unwrap_protocol(path2)]
assert "{one: string, two: string}" in str(ds), ds

# Single file, negative and zero block_size (expect failure)
df3 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path3 = os.path.join(data_path, "test3.json")
df3.to_json(path3, orient="records", lines=True, storage_options=storage_options)
try:
# Negative Buffer Size
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=-1)
)
dsdf = ds.to_pandas()
except pa.ArrowInvalid as e:
assert "Negative buffer resize" in str(e.cause)
try:
# Zero Buffer Size
ds = ray.data.read_json(
path3, filesystem=fs, read_options=pajson.ReadOptions(block_size=0)
)
dsdf = ds.to_pandas()
except pa.ArrowInvalid as e:
assert "Empty JSON file" in str(e.cause)


if __name__ == "__main__":
import sys

Expand Down