Skip to content

Commit

Permalink
Implement a grouped result as part of the pipeline API, as we're usin…
Browse files Browse the repository at this point in the history
…g this pattern commonly.
  • Loading branch information
pwalsh committed Jun 7, 2015
1 parent 1e8831a commit be372a2
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 10 deletions.
9 changes: 6 additions & 3 deletions goodtables/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, data, processors=None, dialect=None, format='csv',
options=None, fail_fast=False, row_limit=20000,
report_limit=1000, report_stream=None, header_index=0,
break_on_invalid_processor=True, post_task=None,
report_post_task=None):
report_type='basic'):

if data is None:
_msg = '`data` must be a filepath, url or stream.'
Expand All @@ -71,10 +71,13 @@ def __init__(self, data, processors=None, dialect=None, format='csv',
self.header_index = header_index
self.break_on_invalid_processor = break_on_invalid_processor

helpers.validate_handler(report_post_task)
helpers.validate_handler(post_task)

self.report_post_task = report_post_task or helpers.pipeline_stats
if report_type == 'grouped':
self.report_post_task = helpers.grouped_report
else:
self.report_post_task = helpers.basic_report

self.post_task = post_task

if self.report_stream:
Expand Down
60 changes: 58 additions & 2 deletions goodtables/utilities/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,26 @@ def get_report_result_types():
return result_types


def pipeline_stats(report):
"""Generates high-level pipeline statistics from a report."""
def basic_report(report):
"""Run meta statistics."""

report = set_meta_stats(report)

return report


def grouped_report(report):
"""Run meta statistics and group the results by row."""

report = set_meta_stats(report)
report = group_results(report)

return report


def set_meta_stats(report):

"""Set run statistics on the report meta object."""

def _bad_type_percent(column_name, results, row_count):
"""Return a percentage for amount of data with bad type in a column."""
Expand Down Expand Up @@ -90,6 +108,44 @@ def _bad_type_percent(column_name, results, row_count):
return report


def group_results(report):

"""Group report results by row."""

_rows = set([r['row_index'] for r in report['results']
if r['row_index'] is not None])

def make_groups(results, rows):
groups = {}

for row in rows:
groups.update({
row: {
'row_index': row,
'results': []
}
})

for index, result in enumerate(results):
if result['row_index'] is not None:

# set the result context on the group
if index == 0:
groups[result['row_index']]['result_context'] = result['result_context']

groups[result['row_index']]['results'].append(result)

# remove stuff we do not require per result
del result['result_context']
del result['row_index']

return groups

report['results'] = sorted([{k: v} for k, v in make_groups(report['results'], _rows).items()],
key=lambda result: list(result.keys())[0])
return report


def load_json_source(source):

"""Load a JSON source, from string, URL or buffer, into a Python type."""
Expand Down
15 changes: 10 additions & 5 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io
import json
from goodtables.pipeline import Pipeline
from goodtables.utilities import helpers
from goodtables import exceptions
from tests import base

Expand Down Expand Up @@ -247,9 +248,13 @@ def test_bad_post_task_raises(self):
self.assertRaises(exceptions.InvalidHandlerError, Pipeline,
filepath, post_task=say_hi)

def test_bad_report_post_task_raises(self):
def test_report_results_grouped_by_rows(self):

filepath = os.path.join(self.data_dir, 'valid.csv')
say_hi = 'Say Hi!'
self.assertRaises(exceptions.InvalidHandlerError, Pipeline,
filepath, report_post_task=say_hi)
filepath = os.path.join(self.data_dir, 'fail_fast_two_schema_errors.csv')
schema = os.path.join(self.data_dir, 'test_schema.json')
options = {'schema': {'schema': schema}}
validator = Pipeline(filepath, processors=('schema',), options=options,
fail_fast=True, report_type='grouped')
result, report = validator.run()
generated = report.generate()
self.assertEqual(1, len(generated['results']))

0 comments on commit be372a2

Please sign in to comment.