Skip to content

Commit

Permalink
Merge pull request #45 from hartym/services_init
Browse files Browse the repository at this point in the history
Trying to fix unending transformations on start() error. (#38)
  • Loading branch information
hartym committed May 1, 2017
2 parents bd0b9a3 + bbd258c commit 3b44eb6
Show file tree
Hide file tree
Showing 17 changed files with 164 additions and 102 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
/.idea
/.release
/bonobo.iml
/bonobo/examples/work_in_progress/
/bonobo/ext/jupyter/js/node_modules/
/build/
/coverage.xml
Expand Down
11 changes: 9 additions & 2 deletions bonobo/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,15 @@ def open_fs(fs_url, *args, **kwargs):


# bonobo.basics
register_api_group(Limit, PrettyPrint, Tee, count, identity, noop, pprint, )
register_api_group(
Limit,
PrettyPrint,
Tee,
count,
identity,
noop,
pprint,
)

# bonobo.io
register_api_group(CsvReader, CsvWriter, FileReader, FileWriter, JsonReader, JsonWriter)
Expand Down Expand Up @@ -116,4 +124,3 @@ def get_examples_path(*pathsegments):
@register_api
def open_examples_fs(*pathsegments):
return open_fs(get_examples_path(*pathsegments))

1 change: 1 addition & 0 deletions bonobo/basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def _count_counter(self, context):

def PrettyPrint(title_keys=('title', 'name', 'id'), print_values=True, sort=True):
from bonobo.constants import NOT_MODIFIED

def _pprint(*args, **kwargs):
nonlocal title_keys, sort, print_values

Expand Down
4 changes: 4 additions & 0 deletions bonobo/examples/.style.yapf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[style]
based_on_style = pep8
column_limit = 74
dedent_closing_brackets = true
39 changes: 31 additions & 8 deletions bonobo/examples/datasets/fablabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
try:
import pycountry
except ImportError as exc:
raise ImportError('You must install package "pycountry" to run this example.') from exc
raise ImportError(
'You must install package "pycountry" to run this example.'
) from exc

API_DATASET = 'fablabs-in-the-world'
API_NETLOC = 'datanova.laposte.fr'
Expand Down Expand Up @@ -57,20 +59,41 @@ def display(row):
address = list(
filter(
None, (
' '.join(filter(None, (row.get('postal_code', None), row.get('city', None)))), row.get('county', None),
row.get('country'),
' '.join(
filter(
None, (
row.get('postal_code', None),
row.get('city', None)
)
)
), row.get('county', None), row.get('country'),
)
)
)

print(' - {}address{}: {address}'.format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address)))
print(' - {}links{}: {links}'.format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links'])))
print(' - {}geometry{}: {geometry}'.format(Fore.BLUE, Style.RESET_ALL, **row))
print(' - {}source{}: {source}'.format(Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET))
print(
' - {}address{}: {address}'.
format(Fore.BLUE, Style.RESET_ALL, address=', '.join(address))
)
print(
' - {}links{}: {links}'.
format(Fore.BLUE, Style.RESET_ALL, links=', '.join(row['links']))
)
print(
' - {}geometry{}: {geometry}'.
format(Fore.BLUE, Style.RESET_ALL, **row)
)
print(
' - {}source{}: {source}'.format(
Fore.BLUE, Style.RESET_ALL, source='datanova/' + API_DATASET
)
)


graph = bonobo.Graph(
OpenDataSoftAPI(dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'),
OpenDataSoftAPI(
dataset=API_DATASET, netloc=API_NETLOC, timezone='Europe/Paris'
),
normalize,
filter_france,
bonobo.Tee(display),
Expand Down
2 changes: 2 additions & 0 deletions bonobo/examples/files/json_handlers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import bonobo
from bonobo.commands.run import get_default_services


def get_fields(row):
return row['fields']


graph = bonobo.Graph(
bonobo.JsonReader(path='datasets/theaters.json'),
get_fields,
Expand Down
6 changes: 3 additions & 3 deletions bonobo/examples/tutorials/tut02_01_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
)

if __name__ == '__main__':
bonobo.run(graph, services={
'fs': bonobo.open_examples_fs('datasets')
})
bonobo.run(
graph, services={'fs': bonobo.open_examples_fs('datasets')}
)
6 changes: 3 additions & 3 deletions bonobo/examples/tutorials/tut02_02_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ def split_one(line):
)

if __name__ == '__main__':
bonobo.run(graph, services={
'fs': bonobo.open_examples_fs('datasets')
})
bonobo.run(
graph, services={'fs': bonobo.open_examples_fs('datasets')}
)
6 changes: 3 additions & 3 deletions bonobo/examples/tutorials/tut02_03_writeasmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ def write(self, fs, file, lineno, row):
)

if __name__ == '__main__':
bonobo.run(graph, services={
'fs': bonobo.open_examples_fs('datasets')
})
bonobo.run(
graph, services={'fs': bonobo.open_examples_fs('datasets')}
)
70 changes: 24 additions & 46 deletions bonobo/execution/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import sys
import traceback
from time import sleep

from bonobo.config import Container
from bonobo.config.processors import resolve_processors
from bonobo.util.errors import print_error
from bonobo.util.iterators import ensure_tuple
from bonobo.util.objects import Wrapper

Expand Down Expand Up @@ -43,16 +43,13 @@ def start(self):
False), ('{}.start() can only be called on a new node.').format(type(self).__name__)
assert self._context is None
self._started = True
try:
if self.parent:
self._context = self.parent.services.args_for(self.wrapped)
elif self.services:
self._context = self.services.args_for(self.wrapped)
else:
self._context = ()
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise

if self.parent:
self._context = self.parent.services.args_for(self.wrapped)
elif self.services:
self._context = self.services.args_for(self.wrapped)
else:
self._context = ()

for processor in resolve_processors(self.wrapped):
try:
Expand Down Expand Up @@ -80,41 +77,22 @@ def stop(self):
if self._stopped:
return

assert self._context is not None

self._stopped = True
while len(self._stack):
processor = self._stack.pop()
try:
# todo yield from ? how to ?
next(processor)
except StopIteration as exc:
# This is normal, and wanted.
pass
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
else:
# No error ? We should have had StopIteration ...
raise RuntimeError('Context processors should not yield more than once.')
if self._context is not None:
while len(self._stack):
processor = self._stack.pop()
try:
# todo yield from ? how to ?
next(processor)
except StopIteration as exc:
# This is normal, and wanted.
pass
except Exception as exc: # pylint: disable=broad-except
self.handle_error(exc, traceback.format_exc())
raise
else:
# No error ? We should have had StopIteration ...
raise RuntimeError('Context processors should not yield more than once.')

def handle_error(self, exc, trace):
"""
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
:param exc: the culprit
:param trace: Hercule Poirot's logbook.
:return: to hell
"""

from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {} in {}'.format(type(exc).__name__, self.wrapped),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)
return print_error(exc, trace, context=self.wrapped)
10 changes: 5 additions & 5 deletions bonobo/execution/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ def __init__(self, graph, plugins=None, services=None):
self.plugins = [PluginExecutionContext(plugin, parent=self) for plugin in plugins or ()]
self.services = Container(services) if services else Container()

for i, component_context in enumerate(self):
for i, node_context in enumerate(self):
try:
component_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
node_context.outputs = [self[j].input for j in self.graph.outputs_of(i)]
except KeyError:
continue

component_context.input.on_begin = partial(component_context.send, BEGIN, _control=True)
component_context.input.on_end = partial(component_context.send, END, _control=True)
component_context.input.on_finalize = partial(component_context.stop)
node_context.input.on_begin = partial(node_context.send, BEGIN, _control=True)
node_context.input.on_end = partial(node_context.send, END, _control=True)
node_context.input.on_finalize = partial(node_context.stop)

def __getitem__(self, item):
return self.nodes[item]
Expand Down
15 changes: 9 additions & 6 deletions bonobo/execution/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from bonobo.core.statistics import WithStatistics
from bonobo.errors import InactiveReadableError
from bonobo.execution.base import LoopingExecutionContext
from bonobo.structs.bags import Bag, ErrorBag
from bonobo.structs.bags import Bag
from bonobo.util.errors import is_error
from bonobo.util.iterators import iter_if_not_sequence


Expand All @@ -32,7 +33,13 @@ def __str__(self):
return (('+' if self.alive else '-') + ' ' + self.__name__ + ' ' + self.get_statistics_as_string()).strip()

def __repr__(self):
return '<' + self.__str__() + '>'
stats = self.get_statistics_as_string().strip()
return '<{}({}{}){}>'.format(
type(self).__name__,
'+' if self.alive else '',
self.__name__,
(' ' + stats) if stats else '',
)

def recv(self, *messages):
"""
Expand Down Expand Up @@ -116,10 +123,6 @@ def handle_results(self, input_bag, results):
self.push(_resolve(input_bag, result))


def is_error(bag):
return isinstance(bag, ErrorBag)


def _resolve(input_bag, output):
# NotModified means to send the input unmodified to output.
if output is NOT_MODIFIED:
Expand Down
2 changes: 0 additions & 2 deletions bonobo/ext/jupyter/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,3 @@ def run(self):
self.widget.value = [repr(node) for node in self.context.parent.nodes]

finalize = run


28 changes: 22 additions & 6 deletions bonobo/strategies/executor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import time
import traceback

from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor

from bonobo.constants import BEGIN, END
from bonobo.strategies.base import Strategy
from bonobo.structs.bags import Bag
from bonobo.util.errors import print_error


class ExecutorStrategy(Strategy):
Expand All @@ -29,18 +31,32 @@ def execute(self, graph, *args, plugins=None, services=None, **kwargs):
for plugin_context in context.plugins:

def _runner(plugin_context=plugin_context):
plugin_context.start()
plugin_context.loop()
plugin_context.stop()
try:
plugin_context.start()
plugin_context.loop()
plugin_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Error in plugin context', context=plugin_context)

futures.append(executor.submit(_runner))

for node_context in context.nodes:

def _runner(node_context=node_context):
node_context.start()
node_context.loop()
node_context.stop()
try:
node_context.start()
except Exception as exc:
print_error(
exc, traceback.format_exc(), prefix='Could not start node context', context=node_context
)
node_context.input.on_end()
else:
node_context.loop()

try:
node_context.stop()
except Exception as exc:
print_error(exc, traceback.format_exc(), prefix='Could not stop node context', context=node_context)

futures.append(executor.submit(_runner))

Expand Down
2 changes: 1 addition & 1 deletion bonobo/structs/bags.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Bag:
foo notbaz
"""

def __init__(self, *args, _flags=None, _parent=None, **kwargs):
self._flags = _flags or ()
self._parent = _parent
Expand Down
31 changes: 31 additions & 0 deletions bonobo/util/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import sys

from bonobo.structs.bags import ErrorBag


def is_error(bag):
return isinstance(bag, ErrorBag)


def print_error(exc, trace, context=None, prefix=''):
"""
Error handler. Whatever happens in a plugin or component, if it looks like an exception, taste like an exception
or somehow make me think it is an exception, I'll handle it.
:param exc: the culprit
:param trace: Hercule Poirot's logbook.
:return: to hell
"""

from colorama import Fore, Style
print(
Style.BRIGHT,
Fore.RED,
'\U0001F4A3 {}{}{}'.format(
(prefix + ': ') if prefix else '', type(exc).__name__, ' in {!r}'.format(context) if context else ''
),
Style.RESET_ALL,
sep='',
file=sys.stderr,
)
print(trace)
Loading

0 comments on commit 3b44eb6

Please sign in to comment.