Skip to content

Commit

Permalink
ENH: context-manager for chunksize/iterator-reader
Browse files Browse the repository at this point in the history
  • Loading branch information
twoertwein committed Dec 3, 2020
1 parent 142ca08 commit a0fb4f7
Show file tree
Hide file tree
Showing 19 changed files with 256 additions and 189 deletions.
38 changes: 19 additions & 19 deletions doc/source/user_guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1577,19 +1577,19 @@ value will be an iterable object of type ``TextFileReader``:

.. ipython:: python
reader = pd.read_csv("tmp.sv", sep="|", chunksize=4)
reader
with pd.read_csv("tmp.sv", sep="|", chunksize=4) as reader:
reader
for chunk in reader:
print(chunk)
for chunk in reader:
print(chunk)
Specifying ``iterator=True`` will also return the ``TextFileReader`` object:

.. ipython:: python
reader = pd.read_csv("tmp.sv", sep="|", iterator=True)
reader.get_chunk(5)
with pd.read_csv("tmp.sv", sep="|", iterator=True) as reader:
reader.get_chunk(5)
.. ipython:: python
:suppress:
Expand Down Expand Up @@ -2238,10 +2238,10 @@ For line-delimited json files, pandas can also return an iterator which reads in
df.to_json(orient="records", lines=True)
# reader is an iterator that returns ``chunksize`` lines each iteration
reader = pd.read_json(StringIO(jsonl), lines=True, chunksize=1)
reader
for chunk in reader:
print(chunk)
with pd.read_json(StringIO(jsonl), lines=True, chunksize=1) as reader:
reader
for chunk in reader:
print(chunk)
.. _io.table_schema:

Expand Down Expand Up @@ -5471,19 +5471,19 @@ object can be used as an iterator.

.. ipython:: python
reader = pd.read_stata("stata.dta", chunksize=3)
for df in reader:
print(df.shape)
with pd.read_stata("stata.dta", chunksize=3) as reader:
for df in reader:
print(df.shape)
For more fine-grained control, use ``iterator=True`` and specify
``chunksize`` with each call to
:func:`~pandas.io.stata.StataReader.read`.

.. ipython:: python
reader = pd.read_stata("stata.dta", iterator=True)
chunk1 = reader.read(5)
chunk2 = reader.read(5)
with pd.read_stata("stata.dta", iterator=True) as reader:
chunk1 = reader.read(5)
chunk2 = reader.read(5)
Currently the ``index`` is retrieved as a column.

Expand Down Expand Up @@ -5595,9 +5595,9 @@ Obtain an iterator and read an XPORT file 100,000 lines at a time:
pass
rdr = pd.read_sas("sas_xport.xpt", chunk=100000)
for chunk in rdr:
do_something(chunk)
with pd.read_sas("sas_xport.xpt", chunk=100000) as rdr:
for chunk in rdr:
do_something(chunk)
The specification_ for the xport file format is available from the SAS
web site.
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ Other enhancements
- Improve numerical stability for :meth:`.Rolling.skew`, :meth:`.Rolling.kurt`, :meth:`Expanding.skew` and :meth:`Expanding.kurt` through implementation of Kahan summation (:issue:`6929`)
- Improved error reporting for subsetting columns of a :class:`.DataFrameGroupBy` with ``axis=1`` (:issue:`37725`)
- Implement method ``cross`` for :meth:`DataFrame.merge` and :meth:`DataFrame.join` (:issue:`5401`)
- :func:`read_csv/sas/json` return a context manager when called with ``chuncksize``/``iterator`` (:issue:`38225`)

.. ---------------------------------------------------------------------------
Expand Down
5 changes: 2 additions & 3 deletions pandas/io/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,9 +794,8 @@ def _data_to_frame(**kwargs):

# fill out elements of body that are "ragged"
_expand_elements(body)
tp = TextParser(body, header=header, **kwargs)
df = tp.read()
return df
with TextParser(body, header=header, **kwargs) as tp:
return tp.read()


_valid_parsers = {
Expand Down
12 changes: 11 additions & 1 deletion pandas/io/json/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ def read_json(
This can only be passed if `lines=True`.
If this is None, the file will be read into memory all at once.
.. versionchanged:: 1.2
``JsonReader`` is a context manager.
compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default 'infer'
For on-the-fly decompression of on-disk data. If 'infer', then use
gzip, bz2, zip or xz if path_or_buf is a string ending in
Expand Down Expand Up @@ -555,7 +558,8 @@ def read_json(
if chunksize:
return json_reader

return json_reader.read()
with json_reader:
return json_reader.read()


class JsonReader(abc.Iterator):
Expand Down Expand Up @@ -747,6 +751,12 @@ def __next__(self):
self.close()
raise StopIteration

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


class Parser:
_split_keys: Tuple[str, ...]
Expand Down
20 changes: 14 additions & 6 deletions pandas/io/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,17 @@
iterator : bool, default False
Return TextFileReader object for iteration or getting chunks with
``get_chunk()``.
.. versionchanged:: 1.2
``TextFileReader`` is a context manager.
chunksize : int, optional
Return TextFileReader object for iteration.
See the `IO Tools docs
<https://pandas.pydata.org/pandas-docs/stable/io.html#io-chunking>`_
for more information on ``iterator`` and ``chunksize``.
.. versionchanged:: 1.2
``TextFileReader`` is a context manager.
compression : {{'infer', 'gzip', 'bz2', 'zip', 'xz', None}}, default 'infer'
For on-the-fly decompression of on-disk data. If 'infer' and
`filepath_or_buffer` is path-like, then detect compression from the
Expand Down Expand Up @@ -451,12 +457,8 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds):
if chunksize or iterator:
return parser

try:
data = parser.read(nrows)
finally:
parser.close()

return data
with parser:
return parser.read(nrows)


_parser_defaults = {
Expand Down Expand Up @@ -1074,6 +1076,12 @@ def get_chunk(self, size=None):
size = min(size, self.nrows - self._currow)
return self.read(nrows=size)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


def _is_index_col(col):
return col is not None and col is not False
Expand Down
16 changes: 12 additions & 4 deletions pandas/io/sas/sasreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from pandas._typing import FilePathOrBuffer, Label

from pandas.io.common import IOHandles, stringify_path
from pandas.io.common import stringify_path

if TYPE_CHECKING:
from pandas import DataFrame
Expand All @@ -18,8 +18,6 @@ class ReaderBase(metaclass=ABCMeta):
Protocol for XportReader and SAS7BDATReader classes.
"""

handles: IOHandles

@abstractmethod
def read(self, nrows=None):
pass
Expand All @@ -28,6 +26,12 @@ def read(self, nrows=None):
def close(self):
pass

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


@overload
def read_sas(
Expand Down Expand Up @@ -87,8 +91,12 @@ def read_sas(
Encoding for text data. If None, text data are stored as raw bytes.
chunksize : int
Read file `chunksize` lines at a time, returns iterator.
.. versionchanged:: 1.2
``TextFileReader`` is a context manager.
iterator : bool, defaults to False
If True, returns an iterator for reading the file incrementally.
.. versionchanged:: 1.2
``TextFileReader`` is a context manager.
Returns
-------
Expand Down Expand Up @@ -136,5 +144,5 @@ def read_sas(
if iterator or chunksize:
return reader

with reader.handles:
with reader:
return reader.read()
6 changes: 4 additions & 2 deletions pandas/tests/io/json/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ def test_chunksize_with_compression(compression):
df = pd.read_json('{"a": ["foo", "bar", "baz"], "b": [4, 5, 6]}')
df.to_json(path, orient="records", lines=True, compression=compression)

res = pd.read_json(path, lines=True, chunksize=1, compression=compression)
roundtripped_df = pd.concat(res)
with pd.read_json(
path, lines=True, chunksize=1, compression=compression
) as res:
roundtripped_df = pd.concat(res)
tm.assert_frame_equal(df, roundtripped_df)


Expand Down
37 changes: 23 additions & 14 deletions pandas/tests/io/json/test_readlines.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,17 @@ def test_readjson_chunks(lines_json_df, chunksize):
# GH17048: memory usage when lines=True

unchunked = read_json(StringIO(lines_json_df), lines=True)
reader = read_json(StringIO(lines_json_df), lines=True, chunksize=chunksize)
chunked = pd.concat(reader)
with read_json(StringIO(lines_json_df), lines=True, chunksize=chunksize) as reader:
chunked = pd.concat(reader)

tm.assert_frame_equal(chunked, unchunked)


def test_readjson_chunksize_requires_lines(lines_json_df):
msg = "chunksize can only be passed if lines=True"
with pytest.raises(ValueError, match=msg):
pd.read_json(StringIO(lines_json_df), lines=False, chunksize=2)
with pd.read_json(StringIO(lines_json_df), lines=False, chunksize=2) as _:
pass


def test_readjson_chunks_series():
Expand All @@ -97,15 +98,17 @@ def test_readjson_chunks_series():
unchunked = pd.read_json(strio, lines=True, typ="Series")

strio = StringIO(s.to_json(lines=True, orient="records"))
chunked = pd.concat(pd.read_json(strio, lines=True, typ="Series", chunksize=1))
with pd.read_json(strio, lines=True, typ="Series", chunksize=1) as reader:
chunked = pd.concat(reader)

tm.assert_series_equal(chunked, unchunked)


def test_readjson_each_chunk(lines_json_df):
# Other tests check that the final result of read_json(chunksize=True)
# is correct. This checks the intermediate chunks.
chunks = list(pd.read_json(StringIO(lines_json_df), lines=True, chunksize=2))
with pd.read_json(StringIO(lines_json_df), lines=True, chunksize=2) as reader:
chunks = list(reader)
assert chunks[0].shape == (2, 2)
assert chunks[1].shape == (1, 2)

Expand All @@ -114,7 +117,8 @@ def test_readjson_chunks_from_file():
with tm.ensure_clean("test.json") as path:
df = DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
df.to_json(path, lines=True, orient="records")
chunked = pd.concat(pd.read_json(path, lines=True, chunksize=1))
with pd.read_json(path, lines=True, chunksize=1) as reader:
chunked = pd.concat(reader)
unchunked = pd.read_json(path, lines=True)
tm.assert_frame_equal(unchunked, chunked)

Expand All @@ -141,7 +145,8 @@ def test_readjson_chunks_closes(chunksize):
compression=None,
nrows=None,
)
reader.read()
with reader:
reader.read()
assert (
reader.handles.handle.closed
), f"didn't close stream with chunksize = {chunksize}"
Expand All @@ -152,7 +157,10 @@ def test_readjson_invalid_chunksize(lines_json_df, chunksize):
msg = r"'chunksize' must be an integer >=1"

with pytest.raises(ValueError, match=msg):
pd.read_json(StringIO(lines_json_df), lines=True, chunksize=chunksize)
with pd.read_json(
StringIO(lines_json_df), lines=True, chunksize=chunksize
) as _:
pass


@pytest.mark.parametrize("chunksize", [None, 1, 2])
Expand All @@ -176,7 +184,8 @@ def test_readjson_chunks_multiple_empty_lines(chunksize):
orig = DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
test = pd.read_json(j, lines=True, chunksize=chunksize)
if chunksize is not None:
test = pd.concat(test)
with test:
test = pd.concat(test)
tm.assert_frame_equal(orig, test, obj=f"chunksize: {chunksize}")


Expand Down Expand Up @@ -212,8 +221,8 @@ def test_readjson_nrows_chunks(nrows, chunksize):
{"a": 3, "b": 4}
{"a": 5, "b": 6}
{"a": 7, "b": 8}"""
reader = read_json(jsonl, lines=True, nrows=nrows, chunksize=chunksize)
chunked = pd.concat(reader)
with read_json(jsonl, lines=True, nrows=nrows, chunksize=chunksize) as reader:
chunked = pd.concat(reader)
expected = DataFrame({"a": [1, 3, 5, 7], "b": [2, 4, 6, 8]}).iloc[:nrows]
tm.assert_frame_equal(chunked, expected)

Expand All @@ -240,6 +249,6 @@ def test_readjson_lines_chunks_fileurl(datapath):
]
os_path = datapath("io", "json", "data", "line_delimited.json")
file_url = Path(os_path).as_uri()
url_reader = pd.read_json(file_url, lines=True, chunksize=1)
for index, chuck in enumerate(url_reader):
tm.assert_frame_equal(chuck, df_list_expected[index])
with pd.read_json(file_url, lines=True, chunksize=1) as url_reader:
for index, chuck in enumerate(url_reader):
tm.assert_frame_equal(chuck, df_list_expected[index])
12 changes: 6 additions & 6 deletions pandas/tests/io/parser/test_c_parser_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,25 +376,25 @@ def test_parse_trim_buffers(c_parser_only):
)

# Iterate over the CSV file in chunks of `chunksize` lines
chunks_ = parser.read_csv(
with parser.read_csv(
StringIO(csv_data), header=None, dtype=object, chunksize=chunksize
)
result = concat(chunks_, axis=0, ignore_index=True)
) as chunks_:
result = concat(chunks_, axis=0, ignore_index=True)

# Check for data corruption if there was no segfault
tm.assert_frame_equal(result, expected)

# This extra test was added to replicate the fault in gh-5291.
# Force 'utf-8' encoding, so that `_string_convert` would take
# a different execution branch.
chunks_ = parser.read_csv(
with parser.read_csv(
StringIO(csv_data),
header=None,
dtype=object,
chunksize=chunksize,
encoding="utf_8",
)
result = concat(chunks_, axis=0, ignore_index=True)
) as chunks_:
result = concat(chunks_, axis=0, ignore_index=True)
tm.assert_frame_equal(result, expected)


Expand Down
Loading

0 comments on commit a0fb4f7

Please sign in to comment.