Skip to content

Commit

Permalink
ARROW-16413: [Python] Certain dataset APIs hang with a python filesystem
Browse files Browse the repository at this point in the history
Closes apache#13033 from jorisvandenbossche/ARROW-16413

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
  • Loading branch information
jorisvandenbossche authored and kszucs committed May 3, 2022
1 parent ffd8074 commit 0d30a05
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 5 deletions.
8 changes: 6 additions & 2 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,12 @@ cdef class FileFormat(_Weakrefable):
schema : Schema
The schema inferred from the file
"""
c_source = _make_file_source(file, filesystem)
c_schema = GetResultValue(self.format.Inspect(c_source))
cdef:
CFileSource c_source = _make_file_source(file, filesystem)
CResult[shared_ptr[CSchema]] c_result
with nogil:
c_result = self.format.Inspect(c_source)
c_schema = GetResultValue(c_result)
return pyarrow_wrap_schema(move(c_schema))

def make_fragment(self, file, filesystem=None,
Expand Down
7 changes: 4 additions & 3 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ cdef class ParquetDatasetFactory(DatasetFactory):
FileFormat format not None,
ParquetFactoryOptions options=None):
cdef:
c_string path
c_string c_path
shared_ptr[CFileSystem] c_filesystem
shared_ptr[CParquetFileFormat] c_format
CResult[shared_ptr[CDatasetFactory]] result
Expand All @@ -801,8 +801,9 @@ cdef class ParquetDatasetFactory(DatasetFactory):
options = options or ParquetFactoryOptions()
c_options = options.unwrap()

result = CParquetDatasetFactory.MakeFromMetaDataPath(
c_path, c_filesystem, c_format, c_options)
with nogil:
result = CParquetDatasetFactory.MakeFromMetaDataPath(
c_path, c_filesystem, c_format, c_options)
self.init(GetResultValue(result))

cdef init(self, shared_ptr[CDatasetFactory]& sp):
Expand Down
51 changes: 51 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import datetime
import pathlib
import pickle
import sys
import textwrap
import tempfile
import threading
Expand Down Expand Up @@ -2582,6 +2583,32 @@ def test_open_dataset_from_fsspec(tempdir):
assert dataset.schema.equals(table.schema)


@pytest.mark.parquet
def test_file_format_inspect_fsspec(tempdir):
# https://issues.apache.org/jira/browse/ARROW-16413
fsspec = pytest.importorskip("fsspec")

# create bucket + file with pyarrow
table = pa.table({'a': [1, 2, 3]})
path = tempdir / "data.parquet"
pq.write_table(table, path)

# read using fsspec filesystem
fsspec_fs = fsspec.filesystem("file")
assert fsspec_fs.ls(tempdir)[0].endswith("data.parquet")

# inspect using dataset file format
format = ds.ParquetFileFormat()
# manually creating a PyFileSystem instead of using fs._ensure_filesystem
# which would convert an fsspec local filesystem to a native one
filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs))
schema = format.inspect(path, filesystem)
assert schema.equals(table.schema)

fragment = format.make_fragment(path, filesystem)
assert fragment.physical_schema.equals(table.schema)


@pytest.mark.pandas
def test_filter_timestamp(tempdir, dataset_reader):
# ARROW-11379
Expand Down Expand Up @@ -3094,6 +3121,30 @@ def test_parquet_dataset_factory(tempdir):
assert result.num_rows == 40


@pytest.mark.parquet
@pytest.mark.pandas # write_to_dataset currently requires pandas
@pytest.mark.skipif(sys.platform == 'win32',
reason="Results in FileNotFoundError on Windows")
def test_parquet_dataset_factory_fsspec(tempdir):
# https://issues.apache.org/jira/browse/ARROW-16413
fsspec = pytest.importorskip("fsspec")

# create dataset with pyarrow
root_path = tempdir / "test_parquet_dataset"
metadata_path, table = _create_parquet_dataset_simple(root_path)

# read using fsspec filesystem
fsspec_fs = fsspec.filesystem("file")
# manually creating a PyFileSystem, because passing the local fsspec
# filesystem would internally be converted to native LocalFileSystem
filesystem = fs.PyFileSystem(fs.FSSpecHandler(fsspec_fs))
dataset = ds.parquet_dataset(metadata_path, filesystem=filesystem)
assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 4
result = dataset.to_table()
assert result.num_rows == 40


@pytest.mark.parquet
@pytest.mark.pandas # write_to_dataset currently requires pandas
@pytest.mark.parametrize('use_legacy_dataset', [False, True])
Expand Down

0 comments on commit 0d30a05

Please sign in to comment.