diff --git a/frestq/decorators.py b/frestq/decorators.py index 1370e5a..774a925 100644 --- a/frestq/decorators.py +++ b/frestq/decorators.py @@ -54,11 +54,18 @@ def decorator(view_func): if view_func is not None and not isfunction(view_func): view_func.action = action view_func.queue_name = queue + wrapper_func = view_func + else: + def wrapper_func(*args, **kwargs): + print(f"task: Starting task") + ret = view_func(*args, **kwargs) + print(f"task: Finished task") + return ret - ActionHandlers.add_action_handler(action, queue, view_func, kwargs) + ActionHandlers.add_action_handler(action, queue, wrapper_func, kwargs) FScheduler.reserve_scheduler(queue) - return view_func + return wrapper_func return decorator @@ -91,7 +98,11 @@ def __call__(self, *args): local_ssl_cert = app.config['SSL_CERT_STRING'] if certs_differ(sender_ssl_cert, local_ssl_cert): raise SecurityException() - return self.func(*args) + + print(f"local_task: Starting task.id={task.task_model.id}") + ret = self.func(*args) + print(f"local_task: Finished task.id={task.task_model.id}") + return ret def internal_task(name, **kwargs): """ diff --git a/frestq/protocol.py b/frestq/protocol.py index e39df3c..7bd629b 100644 --- a/frestq/protocol.py +++ b/frestq/protocol.py @@ -63,7 +63,7 @@ def update_task(msg): # fixed broken FK bug, when taskid exists in a non local db # task = msg.task - task = db.session.query(ModelTask).filter(ModelTask.id == msg.task_id).first() + task = db.session.query(ModelTask).filter(ModelTask.id == msg.task_id).with_for_update(of=ModelTask).first() logging.debug("UPDATING TASK with id %s" % task.id) if not task or\ @@ -77,14 +77,32 @@ def update_task(msg): raise SecurityException() keys = ['output_data', 'status'] + for key in keys: - if key in msg.input_data: - if isinstance(msg.input_data[key], str): - logging.debug("SETTING TASK FIELD '%s' to '%s'" % (key, - msg.input_data[key])) - else: - logging.debug("SETTING TASK FIELD '%s' to: %s" % (key, - dumps(msg.input_data[key]))) + if key not in msg.input_data: + continue + str_data = ( + msg.input_data[key] + if isinstance(msg.input_data[key], str) + else dumps(msg.input_data[key]) + ) + if ( + key == 'status' and + hasattr(task, key) and + task.status == 'finished' + ): + logging.debug( + f"({task.id}) **NOT** SETTING TASK FIELD '{key}' to " + f"'{str_data}' because it's already 'finished'" + ) + # do next (it might be a task with a parent task) + receiver_task = BaseTask.instance_by_model(task) + receiver_task.execute() + return + else: + logging.debug( + f"({task.id}) SETTING TASK FIELD '{key}' to '{str_data}'" + ) setattr(task, key, msg.input_data[key]) task.last_modified_date = datetime.utcnow() db.session.add(task) diff --git a/frestq/utils.py b/frestq/utils.py index 6d12258..fb57520 100644 --- a/frestq/utils.py +++ b/frestq/utils.py @@ -157,7 +157,7 @@ def traverse_tasktree(task, visitor_func, visitor_kwargs): def show_task(args): from .app import db from .models import Task - task_id = unicode(args.show_task) + task_id = str(args.show_task) task_model = db.session.query(Task).filter(Task.id.startswith(task_id)).all() if not task_model: print("task %s not found" % task_id) @@ -260,7 +260,7 @@ def show_activity(args): def show_message(args): from .app import db from .models import Message - msg_id = unicode(args.show_message) + msg_id = str(args.show_message) msg_model = db.session.query(Message).filter(Message.id.startswith(msg_id)).all() if not msg_model: print("message %s not found" % msg_id) @@ -273,7 +273,7 @@ def get_external_task(args): from .app import db from .models import Task - task_id = unicode(args.show_external) + task_id = str(args.show_external) task_model = db.session.query(Task).filter(Task.id.startswith(task_id)).all() return task_model @@ -281,7 +281,7 @@ def get_external_task(args): # drb def show_external_task(args): - task_id = unicode(args.show_external) + task_id = str(args.show_external) task_model = get_external_task(args) if not task_model: @@ -302,9 +302,9 @@ def finish_task(args): from .models import Task from .tasks import ExternalTask - task_id = unicode(args.finish[0]) + task_id = str(args.finish[0]) try: - finish_data = loads(unicode(args.finish[1])) + finish_data = loads(str(args.finish[1])) except: print("error loading the json finish data") return @@ -331,7 +331,7 @@ def deny_task(args): def task_tree(args): from .app import db from .models import Task - task_id = unicode(args.tree) + task_id = str(args.tree) task_model = db.session.query(Task).filter(Task.id.startswith(task_id)).all() if not task_model: print("task %s not found" % task_id) @@ -356,7 +356,12 @@ class DecoratorBase(object): func = None def __init__(self, func): - self.func = func + def wrapper_func(*args, **kwargs): + print(f"DecoratorBase: Starting task") + ret = func(*args, **kwargs) + print(f"DecoratorBase: Finished task") + return ret + self.func = wrapper_func def __getattribute__(self, name): if name == "func":