From 28147a5355d13408c49075b43aedb84e168e8661 Mon Sep 17 00:00:00 2001 From: Shawn Brown Date: Mon, 9 Jul 2018 22:36:07 -0400 Subject: [PATCH] Change Query.to_csv() to use new from_datatest() method and remove old _to_reader() method along with associated tests. --- datatest/_query/query.py | 22 +--------- tests/test_query.py | 88 ---------------------------------------- 2 files changed, 1 insertion(+), 109 deletions(-) diff --git a/datatest/_query/query.py b/datatest/_query/query.py index 34479c5..7dde848 100644 --- a/datatest/_query/query.py +++ b/datatest/_query/query.py @@ -976,26 +976,6 @@ def __repr__(self): return '{0}({1}{2}{3}){4}'.format( class_repr, source_repr, args_repr, kwds_repr, query_steps_repr) - def _get_reader(self, fieldnames=None): - """Return a csv.reader-like iterator of the query results.""" - iterable = _flatten_data(self.execute()) - if not nonstringiter(iterable): - iterable = [iterable] - - if not fieldnames: - first_row, iterable = iterpeek(iterable) - if not nonstringiter(first_row): - first_row = [first_row] - - if self.args: - (fieldnames,) = _flatten_data(self.args[0]) - if len(first_row) != len(fieldnames): - fieldnames = None - - if fieldnames: - iterable = itertools.chain([fieldnames], iterable) - return iterable - def to_csv(self, file, fieldnames=None, **fmtparams): """Execute the query and write the results as a CSV file (dictionaries and other mappings will be seralized). @@ -1009,7 +989,7 @@ def to_csv(self, file, fieldnames=None, **fmtparams): original *columns* argument will be used if the number of selected columns matches the number of resulting columns. """ - reader = self._get_reader(fieldnames) + reader = get_reader.from_datatest(self) if not isinstance(file, file_types): if PY2: diff --git a/tests/test_query.py b/tests/test_query.py index 2ea876f..989b4d8 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1530,94 +1530,6 @@ def test_call(self): self.assertEqual(query.fetch(), expected) -class TestQueryGetReader(unittest.TestCase): - def test_iterable_of_iterables(self): - query = Query.from_object([['a', 1], ['b', 2]]) - - reader = query._get_reader() - - self.assertEqual(list(reader), [['a', 1], ['b', 2]]) - - def test_iterable_of_noniterables(self): - query = Query.from_object([1, 2]) - - reader = query._get_reader() - - self.assertEqual(list(reader), [1, 2]) - - def test_single_noniterable(self): - query = Query.from_object(1) - - reader = query._get_reader() - self.assertEqual(list(reader), [1]) - - reader = query._get_reader('A') - self.assertEqual(list(reader), ['A', 1]) - - # QUESTION: Should this behavior be implemented? - #reader = query._get_reader(['A']) - #self.assertEqual(list(reader), ['A', 1]) - - def test_automatic_fieldnames(self): - select = Selector([['A', 'B'], ['a', 1], ['b', 2]]) - query = select(['A', 'B']) - - reader = query._get_reader() # <- Automatic field names. - self.assertEqual(list(reader), [['A', 'B'], ['a', 1], ['b', 2]]) - - # Because selection contains 2 items, but output has been - # mapped to 1 item, it should not output a header. - func = lambda x: x[0] + str(x[1]) - reader = query.map(func)._get_reader() - self.assertEqual(list(reader), ['a1', 'b2']) # <- No header! - - def test_explicit_fieldnames(self): - select = Selector([['A', 'B'], ['a', 1], ['b', 2]]) - - query = select(['A', 'B']) - - reader = query._get_reader(['X', 'Y']) # <- Explicit field names. - self.assertEqual(list(reader), [['X', 'Y'], ['a', 1], ['b', 2]]) - - def test_mapping_of_single_values(self): - select = Selector([['A', 'B'], ['a', 1], ['b', 2]]) - query = select({'A': 'B'}) # <- A mapping! - - reader = query._get_reader() - - self.assertEqual(list(reader), [('A', 'B'), ('a', 1), ('b', 2)]) - - def test_mapping_with_tup_keys_and_single_vals(self): - select = Selector([['A', 'B'], ['a', 1], ['b', 2]]) - query = select({('A', 'A'): 'B'}) # <- Tuple keys and single value. - - reader = query._get_reader() - - self.assertEqual( - list(reader), - [('A', 'A', 'B'), ('a','a', 1), ('b', 'b', 2)] - ) - - def test_mapping_with_tup_keys_and_tup_vals(self): - select = Selector([['A', 'B'], ['a', 1], ['b', 2]]) - query = select({('A', 'A'): ('B', 'B')}) # <- Tuple keys and values. - - reader = query._get_reader() - - self.assertEqual( - list(reader), - [('A', 'A', 'B', 'B'), ('a','a', 1, 1), ('b', 'b', 2, 2)] - ) - - def test_summed_single_value(self): - select = Selector([['A', 'B'], ['a', 1], ['b', 2]]) - query = select('B').sum() - - reader = query._get_reader() - - self.assertEqual(list(reader), ['B', 3]) - - class TestQueryToCsv(unittest.TestCase): def setUp(self): self.select = Selector([['A', 'B'], ['x', 1], ['y', 2]])