From 7a326b1bca745dfe6a9a2068f385723d76e6b63d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Poni=C5=84ski?= Date: Wed, 13 Jul 2022 09:51:23 +0200 Subject: [PATCH] Fix generator support in fromdicts - use file cache instead of itertools.tee --- petl/io/json.py | 44 ++++++++++++++++++++++++++++++++++---- petl/test/io/test_json.py | 45 ++++++++++++++++++++++++++++++++++----- petl/transform/sorts.py | 13 +---------- petl/util/base.py | 18 +++++++++++++++- 4 files changed, 98 insertions(+), 22 deletions(-) diff --git a/petl/io/json.py b/petl/io/json.py index 9ddcfea2..90d031ec 100644 --- a/petl/io/json.py +++ b/petl/io/json.py @@ -3,15 +3,17 @@ # standard library dependencies import io -import itertools import json import inspect from json.encoder import JSONEncoder +from os import unlink +from tempfile import NamedTemporaryFile from petl.compat import PY2 +from petl.compat import pickle from petl.io.sources import read_source_from_arg, write_source_from_arg # internal dependencies -from petl.util.base import data, Table, dicts as _dicts, iterpeek +from petl.util.base import data, Table, dicts as _dicts, iterpeek, iterchunk def fromjson(source, *args, **kwargs): @@ -175,9 +177,43 @@ def __iter__(self): class DictsGeneratorView(DictsView): + def __init__(self, dicts, header=None, sample=1000, missing=None): + super(DictsGeneratorView, self).__init__(dicts, header, sample, missing) + self._filecache = None + def __iter__(self): - self.dicts, dicts = itertools.tee(self.dicts) - return iterdicts(dicts, self._header, self.sample, self.missing) + if not self._header: + self._determine_header() + yield self._header + + if not self._filecache: + self._filecache = NamedTemporaryFile(delete=False, mode='wb') + it = iter(self.dicts) + for o in it: + row = tuple(o[f] if f in o else self.missing for f in self._header) + pickle.dump(row, self._filecache, protocol=-1) + self._filecache.flush() + self._filecache.close() + + for row in iterchunk(self._filecache.name): + yield row + + def _determine_header(self): + it = iter(self.dicts) + header = list() + peek, it = iterpeek(it, self.sample) + self.dicts = it + if isinstance(peek, dict): + peek = [peek] + for o in peek: + if hasattr(o, 'keys'): + header += [k for k in o.keys() if k not in header] + self._header = tuple(header) + return it + + def __del__(self): + if self._filecache: + unlink(self._filecache.name) def iterjlines(f, header, missing): diff --git a/petl/test/io/test_json.py b/petl/test/io/test_json.py index ca02239a..06d49d23 100644 --- a/petl/test/io/test_json.py +++ b/petl/test/io/test_json.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, print_function, division - +from collections import OrderedDict from tempfile import NamedTemporaryFile import json @@ -121,7 +121,6 @@ def test_fromdicts_onepass(): def test_fromdicts_ordered(): - from collections import OrderedDict data = [OrderedDict([('foo', 'a'), ('bar', 1)]), OrderedDict([('foo', 'b')]), OrderedDict([('foo', 'c'), ('bar', 2), ('baz', True)])] @@ -181,7 +180,6 @@ def test_fromdicts_header_does_not_raise(): def test_fromdicts_header_list(): - from collections import OrderedDict data = [OrderedDict([('foo', 'a'), ('bar', 1)]), OrderedDict([('foo', 'b'), ('bar', 2)]), OrderedDict([('foo', 'c'), ('bar', 2)])] @@ -195,10 +193,21 @@ def test_fromdicts_header_list(): ieq(expect, actual) ieq(expect, actual) +def test_fromdicts_generator_twice(): + def generator(): + yield OrderedDict([('foo', 'a'), ('bar', 1)]) + yield OrderedDict([('foo', 'b'), ('bar', 2)]) + yield OrderedDict([('foo', 'c'), ('bar', 2)]) -def test_fromdicts_header_generator(): - from collections import OrderedDict + actual = fromdicts(generator()) + expect = (('foo', 'bar'), + ('a', 1), + ('b', 2), + ('c', 2)) + ieq(expect, actual) + ieq(expect, actual) +def test_fromdicts_generator_header(): def generator(): yield OrderedDict([('foo', 'a'), ('bar', 1)]) yield OrderedDict([('foo', 'b'), ('bar', 2)]) @@ -213,3 +222,29 @@ def generator(): ('c', 2)) ieq(expect, actual) ieq(expect, actual) + + +def test_fromdicts_generator_random_access(): + def generator(): + for i in range(5): + yield OrderedDict([('n', i), ('foo', 100*i), ('bar', 200*i)]) + + actual = fromdicts(generator(), sample=3) + assert actual.header() == ('n', 'foo', 'bar') + # first pass + it1 = iter(actual) + first_row1 = next(it1) + first_row2 = next(it1) + # second pass + it2 = iter(actual) + second_row1 = next(it2) + second_row2 = next(it2) + assert first_row1 == second_row1 + assert first_row2 == second_row2 + # reverse order + second_row3 = next(it2) + first_row3 = next(it1) + assert second_row3 == first_row3 + ieq(actual, actual) + assert actual.header() == ('n', 'foo', 'bar') + assert len(actual) == 6 diff --git a/petl/transform/sorts.py b/petl/transform/sorts.py index d591132e..c7835902 100755 --- a/petl/transform/sorts.py +++ b/petl/transform/sorts.py @@ -14,6 +14,7 @@ import petl.config as config from petl.comparison import comparable_itemgetter from petl.util.base import Table, asindices +from petl.util.base import iterchunk as _iterchunk logger = logging.getLogger(__name__) @@ -115,18 +116,6 @@ def sort(table, key=None, reverse=False, buffersize=None, tempdir=None, Table.sort = sort -def _iterchunk(fn): - # reopen so iterators from file cache are independent - debug('iterchunk, opening %s' % fn) - with open(fn, 'rb') as f: - try: - while True: - yield pickle.load(f) - except EOFError: - pass - debug('end of iterchunk, closed %s' % fn) - - class _Keyed(namedtuple('Keyed', ['key', 'obj'])): # Override default behavior of namedtuple comparisons, only keys need to be compared for heapmerge def __eq__(self, other): diff --git a/petl/util/base.py b/petl/util/base.py index d53b7f9e..1f9f1afc 100644 --- a/petl/util/base.py +++ b/petl/util/base.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, print_function, division - +import logging +import pickle import re from itertools import islice, chain, cycle, product,\ permutations, combinations, takewhile, dropwhile, \ @@ -16,6 +17,9 @@ from petl.comparison import comparable_itemgetter +logger = logging.getLogger(__name__) +debug = logger.debug + class IterContainer(object): def __contains__(self, item): @@ -742,6 +746,18 @@ def iterpeek(it, n=1): return peek, chain(peek, it) +def iterchunk(fn): + # reopen so iterators from file cache are independent + debug('iterchunk, opening %s' % fn) + with open(fn, 'rb') as f: + try: + while True: + yield pickle.load(f) + except EOFError: + pass + debug('end of iterchunk, closed %s' % fn) + + def empty(): """ Return an empty table. Can be useful when building up a table from a set