Skip to content

Commit

Permalink
Change API to be a global event registry that can handle multiple hoo…
Browse files Browse the repository at this point in the history
…ks per event
  • Loading branch information
jimjshields committed Jun 15, 2020
1 parent 0ce9364 commit d710205
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
10 changes: 0 additions & 10 deletions pyqs/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,6 @@ def __init__(self, queue=None, delay_seconds=None,
self.delay_seconds = delay_seconds
self.function_path = custom_function_path

@classmethod
def set_hooks(self, hooks):
self._hooks = hooks

@classmethod
def get_hooks(self):
return self._hooks

def __call__(self, *args, **kwargs):
func_to_wrap = args[0]
function = func_to_wrap
Expand All @@ -79,6 +71,4 @@ def __call__(self, *args, **kwargs):
function = self.function_path
func_to_wrap.delay = task_delayer(
function, self.queue_name, self.delay_seconds, override=override)
func_to_wrap.pre_process_hook = self.get_hooks().get("pre_process")
func_to_wrap.post_process_hook = self.get_hooks().get("post_process")
return func_to_wrap
19 changes: 19 additions & 0 deletions pyqs/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class Events:
def __init__(self):
self.pre_process = []
self.post_process = []


# Singleton
EVENTS = Events()


def register(name, callback):
if hasattr(EVENTS, name):
getattr(EVENTS, name).append(callback)
else:
raise Exception(f"{name} is not a valid pyqs event.")


def get_events():
return EVENTS
22 changes: 13 additions & 9 deletions pyqs/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import boto3

from pyqs.utils import get_aws_region_name, decode_message
from pyqs.events import get_events

MESSAGE_DOWNLOAD_BATCH_SIZE = 10
LONG_POLLING_INTERVAL = 20
Expand Down Expand Up @@ -211,8 +212,9 @@ def process_message(self):
return True
try:
start_time = time.time()
if task.pre_process_hook:
task.pre_process_hook(context)
pre_process_hooks = get_events().pre_process
for hook in pre_process_hooks:
hook(context)
task(*args, **kwargs)
except Exception:
end_time = time.time()
Expand All @@ -226,10 +228,11 @@ def process_message(self):
traceback.format_exc(),
)
)
if task.post_process_hook:
context["status"] = "exception"
context["exception"] = traceback.format_exc()
task.post_process_hook(context)
post_process_hooks = get_events().post_process
context["status"] = "exception"
context["exception"] = traceback.format_exc()
for hook in post_process_hooks:
hook(context)
return True
else:
end_time = time.time()
Expand All @@ -246,9 +249,10 @@ def process_message(self):
repr(kwargs),
)
)
if task.post_process_hook:
context["status"] = "success"
task.post_process_hook(context)
post_process_hooks = get_events().post_process
context["status"] = "success"
for hook in post_process_hooks:
hook(context)
return True


Expand Down

0 comments on commit d710205

Please sign in to comment.