Skip to content

Commit

Permalink
Fixes bug while reading json (iterator failed to be created twice) (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdpython committed Oct 26, 2021
1 parent e4f8575 commit d462ea8
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 4 deletions.
2 changes: 2 additions & 0 deletions _unittests/ut_df/data/example.json
@@ -0,0 +1,2 @@
{"a": 1, "b": 2}
{"a": 3, "b": 4}
1 change: 1 addition & 0 deletions _unittests/ut_df/data/example2.json
@@ -0,0 +1 @@
[{"a":1,"b":2},{"a":3,"b":4}]
24 changes: 24 additions & 0 deletions _unittests/ut_df/test_dataframe_io_helpers.py
Expand Up @@ -145,6 +145,8 @@ def test_read_json_raw_head(self):
h1 = it.head()
h2 = it.head()
self.assertEqualDataFrame(h1, h2)
self.assertGreater(h1.shape[0], 1)
self.assertGreater(h2.shape[0], 1)

def test_pandas_json_chunksize(self):
jsonl = '''{"a": 1, "b": 2}
Expand Down Expand Up @@ -186,6 +188,28 @@ def test_read_json_rows2_head(self):
self.assertNotEmpty(h2)
self.assertEqualDataFrame(h1, h2)

def test_read_json_rows_file_head(self):
data = self.abs_path_join(__file__, 'data', 'example2.json')
dfs = pandas.read_json(data, orient='records')
self.assertEqual(dfs.shape, (2, 2))
it = StreamingDataFrame.read_json(data)
h1 = it.head()
h2 = it.head()
self.assertNotEmpty(h1)
self.assertNotEmpty(h2)
self.assertEqualDataFrame(h1, h2)

def test_read_json_rows_file_lines_head(self):
data = self.abs_path_join(__file__, 'data', 'example.json')
dfs = pandas.read_json(data, orient='records', lines=True)
self.assertEqual(dfs.shape, (2, 2))
it = StreamingDataFrame.read_json(data, lines="stream")
h1 = it.head()
h2 = it.head()
self.assertNotEmpty(h1)
self.assertNotEmpty(h2)
self.assertEqualDataFrame(h1, h2)

def test_read_json_ijson(self):
it = StreamingDataFrame.read_json(
BytesIO(TestDataFrameIOHelpers.text_json))
Expand Down
16 changes: 13 additions & 3 deletions pandas_streaming/df/dataframe.py
Expand Up @@ -178,7 +178,7 @@ def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDat
and it must be defined to return an iterator.
If *lines* is True, the function falls back into
:epkg:`pandas:read_json`, otherwise it used
@see fn enumerate_json_items. If *lines is ``'stream'``,
@see fn enumerate_json_items. If *lines* is ``'stream'``,
*enumerate_json_items* is called with parameter
``lines=True``.
Parameter *flatten* uses the trick described at
Expand Down Expand Up @@ -212,6 +212,13 @@ def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDat
it = StreamingDataFrame.read_json(BytesIO(data))
dfs = list(it)
print(dfs)
.. index:: IncompleteJSONError
The parsed json must have an empty line at the end otherwise
the following exception is raised:
`ijson.common.IncompleteJSONError: `
`parse error: unallowed token at this point in JSON text`.
"""
if not isinstance(chunksize, int) or chunksize <= 0:
raise ValueError( # pragma: no cover
Expand All @@ -228,7 +235,8 @@ def read_json(*args, chunksize=100000, flatten=False, **kwargs) -> 'StreamingDat
del kwargs['lines']

def localf(a0=args[0]):
a0.seek(0)
if hasattr(a0, 'seek'):
a0.seek(0)
return enumerate_json_items(
a0, encoding=kwargs.get('encoding', None), lines=True,
flatten=flatten)
Expand Down Expand Up @@ -280,6 +288,7 @@ def fct2(args=args, chunksize=chunksize, kw=kwargs.copy()):
**kwargs_create)

def fct3(st=st, args=args, chunksize=chunksize, kw=kwargs.copy()):
st.seek(0)
for r in pandas.read_json(
st, *args, chunksize=chunksize, nrows=chunksize,
lines=True, **kw):
Expand Down Expand Up @@ -920,8 +929,9 @@ def __getitem__(self, *args):

def iterate_col():
"iterate on one column"
one_col = [cols]
for df in iter_creation():
yield df[[cols]]
yield df[one_col]
return StreamingSeries(iterate_col, **self.get_kwargs())

if not isinstance(cols, list):
Expand Down
9 changes: 8 additions & 1 deletion pandas_streaming/df/dataframe_io_helpers.py
Expand Up @@ -113,11 +113,13 @@ def flatten_dictionary(dico, sep="_"):
"""
Flattens a dictionary with nested structure to a dictionary with no
hierarchy.
:param dico: dictionary to flatten
:param sep: string to separate dictionary keys by
:return: flattened dictionary
Inspired from `flatten_json <https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_.
Inspired from `flatten_json
<https://github.com/amirziai/flatten/blob/master/flatten_json.py>`_.
"""
flattened_dict = {}

Expand Down Expand Up @@ -223,6 +225,11 @@ def enumerate_json_items(filename, encoding=None, lines=False, flatten=False, fL
for item in enumerate_json_items(text_json):
print(item)
The parsed json must have an empty line at the end otherwise
the following exception is raised:
`ijson.common.IncompleteJSONError: `
`parse error: unallowed token at this point in JSON text`.
"""
if isinstance(filename, str):
if "{" not in filename and os.path.exists(filename):
Expand Down

0 comments on commit d462ea8

Please sign in to comment.