Skip to content

Commit

Permalink
Change Query.to_csv() to use new from_datatest() method
Browse files Browse the repository at this point in the history
and remove old _to_reader() method along with associated
tests.
  • Loading branch information
shawnbrown committed Jul 10, 2018
1 parent b6bf0d0 commit 28147a5
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 109 deletions.
22 changes: 1 addition & 21 deletions datatest/_query/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,26 +976,6 @@ def __repr__(self):
return '{0}({1}{2}{3}){4}'.format(
class_repr, source_repr, args_repr, kwds_repr, query_steps_repr)

def _get_reader(self, fieldnames=None):
"""Return a csv.reader-like iterator of the query results."""
iterable = _flatten_data(self.execute())
if not nonstringiter(iterable):
iterable = [iterable]

if not fieldnames:
first_row, iterable = iterpeek(iterable)
if not nonstringiter(first_row):
first_row = [first_row]

if self.args:
(fieldnames,) = _flatten_data(self.args[0])
if len(first_row) != len(fieldnames):
fieldnames = None

if fieldnames:
iterable = itertools.chain([fieldnames], iterable)
return iterable

def to_csv(self, file, fieldnames=None, **fmtparams):
"""Execute the query and write the results as a CSV file
(dictionaries and other mappings will be seralized).
Expand All @@ -1009,7 +989,7 @@ def to_csv(self, file, fieldnames=None, **fmtparams):
original *columns* argument will be used if the number of
selected columns matches the number of resulting columns.
"""
reader = self._get_reader(fieldnames)
reader = get_reader.from_datatest(self)

if not isinstance(file, file_types):
if PY2:
Expand Down
88 changes: 0 additions & 88 deletions tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1530,94 +1530,6 @@ def test_call(self):
self.assertEqual(query.fetch(), expected)


class TestQueryGetReader(unittest.TestCase):
def test_iterable_of_iterables(self):
query = Query.from_object([['a', 1], ['b', 2]])

reader = query._get_reader()

self.assertEqual(list(reader), [['a', 1], ['b', 2]])

def test_iterable_of_noniterables(self):
query = Query.from_object([1, 2])

reader = query._get_reader()

self.assertEqual(list(reader), [1, 2])

def test_single_noniterable(self):
query = Query.from_object(1)

reader = query._get_reader()
self.assertEqual(list(reader), [1])

reader = query._get_reader('A')
self.assertEqual(list(reader), ['A', 1])

# QUESTION: Should this behavior be implemented?
#reader = query._get_reader(['A'])
#self.assertEqual(list(reader), ['A', 1])

def test_automatic_fieldnames(self):
select = Selector([['A', 'B'], ['a', 1], ['b', 2]])
query = select(['A', 'B'])

reader = query._get_reader() # <- Automatic field names.
self.assertEqual(list(reader), [['A', 'B'], ['a', 1], ['b', 2]])

# Because selection contains 2 items, but output has been
# mapped to 1 item, it should not output a header.
func = lambda x: x[0] + str(x[1])
reader = query.map(func)._get_reader()
self.assertEqual(list(reader), ['a1', 'b2']) # <- No header!

def test_explicit_fieldnames(self):
select = Selector([['A', 'B'], ['a', 1], ['b', 2]])

query = select(['A', 'B'])

reader = query._get_reader(['X', 'Y']) # <- Explicit field names.
self.assertEqual(list(reader), [['X', 'Y'], ['a', 1], ['b', 2]])

def test_mapping_of_single_values(self):
select = Selector([['A', 'B'], ['a', 1], ['b', 2]])
query = select({'A': 'B'}) # <- A mapping!

reader = query._get_reader()

self.assertEqual(list(reader), [('A', 'B'), ('a', 1), ('b', 2)])

def test_mapping_with_tup_keys_and_single_vals(self):
select = Selector([['A', 'B'], ['a', 1], ['b', 2]])
query = select({('A', 'A'): 'B'}) # <- Tuple keys and single value.

reader = query._get_reader()

self.assertEqual(
list(reader),
[('A', 'A', 'B'), ('a','a', 1), ('b', 'b', 2)]
)

def test_mapping_with_tup_keys_and_tup_vals(self):
select = Selector([['A', 'B'], ['a', 1], ['b', 2]])
query = select({('A', 'A'): ('B', 'B')}) # <- Tuple keys and values.

reader = query._get_reader()

self.assertEqual(
list(reader),
[('A', 'A', 'B', 'B'), ('a','a', 1, 1), ('b', 'b', 2, 2)]
)

def test_summed_single_value(self):
select = Selector([['A', 'B'], ['a', 1], ['b', 2]])
query = select('B').sum()

reader = query._get_reader()

self.assertEqual(list(reader), ['B', 3])


class TestQueryToCsv(unittest.TestCase):
def setUp(self):
self.select = Selector([['A', 'B'], ['x', 1], ['y', 2]])
Expand Down

0 comments on commit 28147a5

Please sign in to comment.