Skip to content

Commit

Permalink
Add chunksize param to read_json when lines=True (#17168)
Browse files Browse the repository at this point in the history
closes #17048
  • Loading branch information
louispotok authored and jreback committed Sep 28, 2017
1 parent cc58b84 commit 42adf7d
Show file tree
Hide file tree
Showing 6 changed files with 383 additions and 88 deletions.
30 changes: 30 additions & 0 deletions asv_bench/benchmarks/io_bench.py
@@ -1,3 +1,4 @@
import os
from .pandas_vb_common import *
from pandas import concat, Timestamp, compat
try:
Expand Down Expand Up @@ -192,3 +193,32 @@ def time_read_nrows(self, compression, engine):
ext = ".bz2"
pd.read_csv(self.big_fname + ext, nrows=10,
compression=compression, engine=engine)


class read_json_lines(object):
goal_time = 0.2
fname = "__test__.json"

def setup(self):
self.N = 100000
self.C = 5
self.df = DataFrame(dict([('float{0}'.format(i), randn(self.N)) for i in range(self.C)]))
self.df.to_json(self.fname,orient="records",lines=True)

def teardown(self):
try:
os.remove(self.fname)
except:
pass

def time_read_json_lines(self):
pd.read_json(self.fname, lines=True)

def time_read_json_lines_chunk(self):
pd.concat(pd.read_json(self.fname, lines=True, chunksize=self.N//4))

def peakmem_read_json_lines(self):
pd.read_json(self.fname, lines=True)

def peakmem_read_json_lines_chunk(self):
pd.concat(pd.read_json(self.fname, lines=True, chunksize=self.N//4))
10 changes: 10 additions & 0 deletions doc/source/io.rst
Expand Up @@ -1845,6 +1845,7 @@ is ``None``. To explicitly force ``Series`` parsing, pass ``typ=series``
seconds, milliseconds, microseconds or nanoseconds respectively.
- ``lines`` : reads file as one json object per line.
- ``encoding`` : The encoding to use to decode py3 bytes.
- ``chunksize`` : when used in combination with ``lines=True``, return a JsonReader which reads in ``chunksize`` lines per iteration.

The parser will raise one of ``ValueError/TypeError/AssertionError`` if the JSON is not parseable.

Expand Down Expand Up @@ -2049,6 +2050,10 @@ Line delimited json
pandas is able to read and write line-delimited json files that are common in data processing pipelines
using Hadoop or Spark.

.. versionadded:: 0.21.0

For line-delimited json files, pandas can also return an iterator which reads in ``chunksize`` lines at a time. This can be useful for large files or to read from a stream.

.. ipython:: python
jsonl = '''
Expand All @@ -2059,6 +2064,11 @@ using Hadoop or Spark.
df
df.to_json(orient='records', lines=True)
# reader is an iterator that returns `chunksize` lines each iteration
reader = pd.read_json(StringIO(jsonl), lines=True, chunksize=1)
reader
for chunk in reader:
print(chunk)
.. _io.table_schema:

Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.21.0.txt
Expand Up @@ -162,6 +162,7 @@ Other Enhancements
- :func:`MultiIndex.is_monotonic_decreasing` has been implemented. Previously returned ``False`` in all cases. (:issue:`16554`)
- :func:`Categorical.rename_categories` now accepts a dict-like argument as `new_categories` and only updates the categories found in that dict. (:issue:`17336`)
- :func:`read_excel` raises ``ImportError`` with a better message if ``xlrd`` is not installed. (:issue:`17613`)
- :func:`read_json` now accepts a ``chunksize`` parameter that can be used when ``lines=True``. If ``chunksize`` is passed, read_json now returns an iterator which reads in ``chunksize`` lines with each iteration. (:issue:`17048`)
- :meth:`DataFrame.assign` will preserve the original order of ``**kwargs`` for Python 3.6+ users instead of sorting the column names


Expand Down
215 changes: 174 additions & 41 deletions pandas/io/json/json.py
@@ -1,4 +1,5 @@
# pylint: disable-msg=E1101,W0613,W0603
from itertools import islice
import os
import numpy as np

Expand All @@ -8,8 +9,10 @@
from pandas import compat, isna
from pandas import Series, DataFrame, to_datetime, MultiIndex
from pandas.io.common import (get_filepath_or_buffer, _get_handle,
_stringify_path)
_stringify_path, BaseIterator)
from pandas.io.parsers import _validate_integer
from pandas.core.common import AbstractMethodError
from pandas.core.reshape.concat import concat
from pandas.io.formats.printing import pprint_thing
from .normalize import _convert_to_line_delimits
from .table_schema import build_table_schema
Expand Down Expand Up @@ -175,7 +178,7 @@ def write(self):
def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,
convert_axes=True, convert_dates=True, keep_default_dates=True,
numpy=False, precise_float=False, date_unit=None, encoding=None,
lines=False):
lines=False, chunksize=None):
"""
Convert a JSON string to pandas object
Expand Down Expand Up @@ -264,6 +267,16 @@ def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,
.. versionadded:: 0.19.0
chunksize: integer, default None
Return JsonReader object for iteration.
See the `line-delimted json docs
<http://pandas.pydata.org/pandas-docs/stable/io.html#io-jsonl>`_
for more information on ``chunksize``.
This can only be passed if `lines=True`.
If this is None, the file will be read into memory all at once.
.. versionadded:: 0.21.0
Returns
-------
result : Series or DataFrame, depending on the value of `typ`.
Expand Down Expand Up @@ -323,47 +336,167 @@ def read_json(path_or_buf=None, orient=None, typ='frame', dtype=True,

filepath_or_buffer, _, _ = get_filepath_or_buffer(path_or_buf,
encoding=encoding)
if isinstance(filepath_or_buffer, compat.string_types):
try:
exists = os.path.exists(filepath_or_buffer)

# if the filepath is too long will raise here
# 5874
except (TypeError, ValueError):
exists = False

if exists:
fh, handles = _get_handle(filepath_or_buffer, 'r',
encoding=encoding)
json = fh.read()
fh.close()

json_reader = JsonReader(
filepath_or_buffer, orient=orient, typ=typ, dtype=dtype,
convert_axes=convert_axes, convert_dates=convert_dates,
keep_default_dates=keep_default_dates, numpy=numpy,
precise_float=precise_float, date_unit=date_unit, encoding=encoding,
lines=lines, chunksize=chunksize
)

if chunksize:
return json_reader

return json_reader.read()


class JsonReader(BaseIterator):
"""
JsonReader provides an interface for reading in a JSON file.
If initialized with ``lines=True`` and ``chunksize``, can be iterated over
``chunksize`` lines at a time. Otherwise, calling ``read`` reads in the
whole document.
"""
def __init__(self, filepath_or_buffer, orient, typ, dtype, convert_axes,
convert_dates, keep_default_dates, numpy, precise_float,
date_unit, encoding, lines, chunksize):

self.path_or_buf = filepath_or_buffer
self.orient = orient
self.typ = typ
self.dtype = dtype
self.convert_axes = convert_axes
self.convert_dates = convert_dates
self.keep_default_dates = keep_default_dates
self.numpy = numpy
self.precise_float = precise_float
self.date_unit = date_unit
self.encoding = encoding
self.lines = lines
self.chunksize = chunksize
self.nrows_seen = 0
self.should_close = False

if self.chunksize is not None:
self.chunksize = _validate_integer("chunksize", self.chunksize, 1)
if not self.lines:
raise ValueError("chunksize can only be passed if lines=True")

data = self._get_data_from_filepath(filepath_or_buffer)
self.data = self._preprocess_data(data)

def _preprocess_data(self, data):
"""
At this point, the data either has a `read` attribute (e.g. a file
object or a StringIO) or is a string that is a JSON document.
If self.chunksize, we prepare the data for the `__next__` method.
Otherwise, we read it into memory for the `read` method.
"""
if hasattr(data, 'read') and not self.chunksize:
data = data.read()
if not hasattr(data, 'read') and self.chunksize:
data = StringIO(data)

return data

def _get_data_from_filepath(self, filepath_or_buffer):
"""
read_json accepts three input types:
1. filepath (string-like)
2. file-like object (e.g. open file object, StringIO)
3. JSON string
This method turns (1) into (2) to simplify the rest of the processing.
It returns input types (2) and (3) unchanged.
"""

data = filepath_or_buffer

if isinstance(data, compat.string_types):
try:
exists = os.path.exists(filepath_or_buffer)

# gh-5874: if the filepath is too long will raise here
except (TypeError, ValueError):
pass

else:
if exists:
data, _ = _get_handle(filepath_or_buffer, 'r',
encoding=self.encoding)
self.should_close = True
self.open_stream = data

return data

def _combine_lines(self, lines):
"""Combines a list of JSON objects into one JSON object"""
lines = filter(None, map(lambda x: x.strip(), lines))
return '[' + ','.join(lines) + ']'

def read(self):
"""Read the whole JSON input into a pandas object"""
if self.lines and self.chunksize:
obj = concat(self)
elif self.lines:
obj = self._get_object_parser(
self._combine_lines(self.data.split('\n'))
)
else:
json = filepath_or_buffer
elif hasattr(filepath_or_buffer, 'read'):
json = filepath_or_buffer.read()
else:
json = filepath_or_buffer
obj = self._get_object_parser(self.data)
self.close()
return obj

def _get_object_parser(self, json):
"""parses a json document into a pandas object"""
typ = self.typ
dtype = self.dtype
kwargs = {
"orient": self.orient, "dtype": self.dtype,
"convert_axes": self.convert_axes,
"convert_dates": self.convert_dates,
"keep_default_dates": self.keep_default_dates, "numpy": self.numpy,
"precise_float": self.precise_float, "date_unit": self.date_unit
}
obj = None
if typ == 'frame':
obj = FrameParser(json, **kwargs).parse()

if typ == 'series' or obj is None:
if not isinstance(dtype, bool):
dtype = dict(data=dtype)
obj = SeriesParser(json, **kwargs).parse()

return obj

def close(self):
"""
If we opened a stream earlier, in _get_data_from_filepath, we should
close it. If an open stream or file was passed, we leave it open.
"""
if self.should_close:
try:
self.open_stream.close()
except (IOError, AttributeError):
pass

if lines:
# If given a json lines file, we break the string into lines, add
# commas and put it in a json list to make a valid json object.
lines = list(StringIO(json.strip()))
json = '[' + ','.join(lines) + ']'

obj = None
if typ == 'frame':
obj = FrameParser(json, orient, dtype, convert_axes, convert_dates,
keep_default_dates, numpy, precise_float,
date_unit).parse()

if typ == 'series' or obj is None:
if not isinstance(dtype, bool):
dtype = dict(data=dtype)
obj = SeriesParser(json, orient, dtype, convert_axes, convert_dates,
keep_default_dates, numpy, precise_float,
date_unit).parse()

return obj
def __next__(self):
lines = list(islice(self.data, self.chunksize))
if lines:
lines_json = self._combine_lines(lines)
obj = self._get_object_parser(lines_json)

# Make sure that the returned objects have the right index.
obj.index = range(self.nrows_seen, self.nrows_seen + len(obj))
self.nrows_seen += len(obj)

return obj

self.close()
raise StopIteration


class Parser(object):
Expand Down
47 changes: 0 additions & 47 deletions pandas/tests/io/json/test_pandas.py
Expand Up @@ -985,53 +985,6 @@ def test_tz_range_is_utc(self):
df = DataFrame({'DT': dti})
assert dumps(df, iso_dates=True) == dfexp

def test_read_jsonl(self):
# GH9180
result = read_json('{"a": 1, "b": 2}\n{"b":2, "a" :1}\n', lines=True)
expected = DataFrame([[1, 2], [1, 2]], columns=['a', 'b'])
assert_frame_equal(result, expected)

def test_read_jsonl_unicode_chars(self):
# GH15132: non-ascii unicode characters
# \u201d == RIGHT DOUBLE QUOTATION MARK

# simulate file handle
json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n'
json = StringIO(json)
result = read_json(json, lines=True)
expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]],
columns=['a', 'b'])
assert_frame_equal(result, expected)

# simulate string
json = '{"a": "foo”", "b": "bar"}\n{"a": "foo", "b": "bar"}\n'
result = read_json(json, lines=True)
expected = DataFrame([[u"foo\u201d", "bar"], ["foo", "bar"]],
columns=['a', 'b'])
assert_frame_equal(result, expected)

def test_to_jsonl(self):
# GH9180
df = DataFrame([[1, 2], [1, 2]], columns=['a', 'b'])
result = df.to_json(orient="records", lines=True)
expected = '{"a":1,"b":2}\n{"a":1,"b":2}'
assert result == expected

df = DataFrame([["foo}", "bar"], ['foo"', "bar"]], columns=['a', 'b'])
result = df.to_json(orient="records", lines=True)
expected = '{"a":"foo}","b":"bar"}\n{"a":"foo\\"","b":"bar"}'
assert result == expected
assert_frame_equal(pd.read_json(result, lines=True), df)

# GH15096: escaped characters in columns and data
df = DataFrame([["foo\\", "bar"], ['foo"', "bar"]],
columns=["a\\", 'b'])
result = df.to_json(orient="records", lines=True)
expected = ('{"a\\\\":"foo\\\\","b":"bar"}\n'
'{"a\\\\":"foo\\"","b":"bar"}')
assert result == expected
assert_frame_equal(pd.read_json(result, lines=True), df)

def test_latin_encoding(self):
if compat.PY2:
tm.assert_raises_regex(
Expand Down

0 comments on commit 42adf7d

Please sign in to comment.