Skip to content

Commit

Permalink
future tasks should not cause anything to happen if the instance is s…
Browse files Browse the repository at this point in the history
…uspended (#952)

* Reapply "future tasks should not cause anything to happen if the instance is suspended"

This reverts commit 05b50df.

* lint

* add required approval and merge instead of mark as auto merge

* lint

* added server default to new future_task column so default is actually added to the table schema w/ burnettk

* pyl w/ burnettk

---------

Co-authored-by: burnettk <burnettk@users.noreply.github.com>
Co-authored-by: jasquat <jasquat@users.noreply.github.com>
  • Loading branch information
3 people committed Feb 6, 2024
1 parent 64d8805 commit a61f2d6
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 17 deletions.
34 changes: 34 additions & 0 deletions spiffworkflow-backend/migrations/versions/29b261f5edf4_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""empty message
Revision ID: 29b261f5edf4
Revises: 343b406f723d
Create Date: 2024-02-06 13:52:18.973974
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '29b261f5edf4'
down_revision = '343b406f723d'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.add_column(sa.Column('archived_for_process_instance_status', sa.Boolean(), server_default=sa.text('false'), nullable=False))
batch_op.create_index(batch_op.f('ix_future_task_archived_for_process_instance_status'), ['archived_for_process_instance_status'], unique=False)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_future_task_archived_for_process_instance_status'))
batch_op.drop_column('archived_for_process_instance_status')

# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
)
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
Expand Down Expand Up @@ -57,24 +58,46 @@ def remove_stale_locks(self) -> None:
ProcessInstanceLockService.remove_stale_locks()

def process_future_tasks(self) -> None:
"""If something has been locked for a certain amount of time it is probably stale so unlock it."""
"""Timer related tasks go in the future_task table.
Celery is not great at scheduling things in the distant future. So this function periodically checks the future_task
table and puts tasks into the queue that are imminently ready to run. Imminently is configurable and defaults to those
that are 5 minutes away or less.
"""

with self.app.app_context():
future_task_lookahead_in_seconds = self.app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_FUTURE_TASK_LOOKAHEAD_IN_SECONDS"
]
lookahead = time.time() + future_task_lookahead_in_seconds
future_tasks = FutureTaskModel.query.filter(
and_(
FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead,
)
).all()
for future_task in future_tasks:
process_instance = (
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
.filter(TaskModel.guid == future_task.guid)
.first()
)
self.__class__.do_process_future_tasks(future_task_lookahead_in_seconds)

@classmethod
def do_process_future_tasks(cls, future_task_lookahead_in_seconds: int) -> None:
future_tasks = cls.imminent_future_tasks(future_task_lookahead_in_seconds)
for future_task in future_tasks:
process_instance = (
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
.filter(TaskModel.guid == future_task.guid)
.first()
)
if process_instance.allowed_to_run():
queue_future_task_if_appropriate(
process_instance, eta_in_seconds=future_task.run_at_in_seconds, task_guid=future_task.guid
)
else:
# if we are not allowed to run the process instance, we should not keep processing the future task
future_task.archived_for_process_instance_status = True
db.session.add(future_task)
db.session.commit()

@classmethod
def imminent_future_tasks(cls, future_task_lookahead_in_seconds: int) -> list[FutureTaskModel]:
lookahead = time.time() + future_task_lookahead_in_seconds
future_tasks: list[FutureTaskModel] = FutureTaskModel.query.filter(
and_(
FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead,
)
).all()
return future_tasks
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
if queue_enabled_for_process_model(process_instance):
buffer = 1
countdown = eta_in_seconds - time.time() + buffer
args_to_celery = {"process_instance_id": process_instance.id, "task_guid": task_guid}
args_to_celery = {
"process_instance_id": process_instance.id,
"task_guid": task_guid,
# the producer_identifier is so we can know what is putting messages in the queue
"producer_identifier": "future_task",
}
# add buffer to countdown to avoid rounding issues and race conditions with spiff. the situation we want to avoid is where
# we think the timer said to run it at 6:34:11, and we initialize the SpiffWorkflow library,
# expecting the timer to be ready, but the library considered it ready a little after that time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as postgres_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.sql import false

from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.db import db
Expand All @@ -17,6 +18,14 @@ class FutureTaskModel(SpiffworkflowBaseDBModel):
guid: str = db.Column(db.String(36), primary_key=True)
run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True)
completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)
archived_for_process_instance_status: bool = db.Column(
# db.Boolean, default=False, server_default=db.sql.False_(), nullable=False, index=True
db.Boolean,
default=False,
server_default=false(),
nullable=False,
index=True,
)

updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any

import marshmallow
from flask_sqlalchemy.query import Query
from marshmallow import INCLUDE
from marshmallow import Schema
from sqlalchemy import ForeignKey
Expand All @@ -14,6 +15,8 @@
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel


Expand Down Expand Up @@ -48,7 +51,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
process_model_display_name: str = db.Column(db.String(255), nullable=False, index=True)
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False, index=True) # type: ignore
bpmn_process_definition_id: int | None = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), nullable=True, index=True # type: ignore
ForeignKey(BpmnProcessDefinitionModel.id), # type: ignore
nullable=True,
index=True,
)
bpmn_process_id: int | None = db.Column(ForeignKey(BpmnProcessModel.id), nullable=True, index=True) # type: ignore

Expand All @@ -63,7 +68,7 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
) # type: ignore

bpmn_process = relationship(BpmnProcessModel, cascade="delete")
tasks = relationship("TaskModel", cascade="delete") # type: ignore
tasks = relationship("TaskModel", cascade="delete")
task_draft_data = relationship("TaskDraftDataModel", cascade="delete") # type: ignore
process_instance_events = relationship("ProcessInstanceEventModel", cascade="delete") # type: ignore
process_instance_file_data = relationship("ProcessInstanceFileDataModel", cascade="delete") # type: ignore
Expand Down Expand Up @@ -117,6 +122,16 @@ def spiffworkflow_fully_initialized(self) -> bool:
"""
return self.bpmn_process_definition_id is not None and self.bpmn_process_id is not None

def future_tasks_query(self) -> Query:
future_tasks: Query = (
FutureTaskModel.query.filter(
FutureTaskModel.completed == False, # noqa: E712
)
.join(TaskModel, TaskModel.guid == FutureTaskModel.guid)
.filter(TaskModel.process_instance_id == self.id)
)
return future_tasks

def serialized(self) -> dict[str, Any]:
"""Return object data in serializeable format."""
return {
Expand Down Expand Up @@ -152,6 +167,10 @@ def validate_status(self, key: str, value: Any) -> Any:
def can_submit_task(self) -> bool:
return not self.has_terminal_status() and self.status != "suspended"

def allowed_to_run(self) -> bool:
"""If this process can currently move forward with things like do_engine_steps."""
return not self.has_terminal_status() and self.status != "suspended"

def can_receive_message(self) -> bool:
"""If this process can currently accept messages."""
return not self.has_terminal_status() and self.status != "suspended"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.file import File
from spiffworkflow_backend.models.file import FileType
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
Expand Down Expand Up @@ -1810,9 +1811,20 @@ def suspend(self) -> None:
)
db.session.commit()

def bring_archived_future_tasks_back_to_life(self) -> None:
archived_future_tasks = (
self.process_instance_model.future_tasks_query()
.filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712
.all()
)
for archived_future_task in archived_future_tasks:
archived_future_task.archived_for_process_instance_status = False
db.session.add(archived_future_task)

def resume(self) -> None:
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
db.session.add(self.process_instance_model)
self.bring_archived_future_tasks_back_to_life()
ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.git_service import GitCommandError
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from flask import Flask
from pytest_mock.plugin import MockerFixture
from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor

from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec


class TestBackgroundProcessingService(BaseTest):
def test_process_future_tasks_with_no_future_tasks(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
BackgroundProcessingService(app).process_future_tasks()

def test_do_process_future_tasks_with_processable_future_task(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
mock = mocker.patch("celery.current_app.send_task")
self._load_up_a_future_task_and_return_instance()
assert mock.call_count == 0
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
assert mock.call_count == 1
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
assert future_tasks[0].archived_for_process_instance_status is False

def test_do_process_future_tasks_with_unprocessable_future_task(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
mock = mocker.patch("celery.current_app.send_task")
process_instance = self._load_up_a_future_task_and_return_instance()
assert mock.call_count == 0
process_instance.status = "suspended"
db.session.add(process_instance)
db.session.commit()
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 1
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
# should not process anything, so nothing goes to queue
assert mock.call_count == 0
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
assert future_tasks[0].archived_for_process_instance_status is True

# the next time do_process_future_tasks runs, it will not consider this task, which is nice
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 0
processor = ProcessInstanceProcessor(process_instance)
processor.resume()
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 1

def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel:
process_model = load_test_spec(
process_model_id="test_group/user-task-with-timer",
process_model_source_directory="user-task-with-timer",
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)

assert process_instance.status == "user_input_required"

future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
return process_instance

0 comments on commit a61f2d6

Please sign in to comment.