Skip to content

Commit

Permalink
Wrap layer run() to ensure we fetch ActorFuture results
Browse files Browse the repository at this point in the history
  • Loading branch information
pfnsec committed Sep 11, 2020
1 parent 8b7f15c commit 9f4b08e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
2 changes: 1 addition & 1 deletion batik/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.0.3'
__version__ = '0.0.4'

from .env import env
from .layer import Layer, Actor
Expand Down
25 changes: 21 additions & 4 deletions batik/layer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
from dask.distributed import ActorFuture
import logging


class Layer:
def __init__(self, client=None, **kwargs):
super().__init__(**kwargs)
def __init__(self, layer_args={}, **kwargs):
self.layer_args = layer_args
def _run(self, payload):
if isinstance(payload, ActorFuture):
return self.run(payload.result())
else:
return self.run(payload)



class Actor:
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __init__(self, layer_args={}, **kwargs):
self.layer_args = layer_args
def _run(self, payload):
try:
if isinstance(payload, ActorFuture):
return self.run(payload.result())
else:
return self.run(payload)
except Exception as e:
logging.exception('')

0 comments on commit 9f4b08e

Please sign in to comment.