Skip to content

Commit

Permalink
Reapply "future tasks should not cause anything to happen if the inst…
Browse files Browse the repository at this point in the history
…ance is suspended"

This reverts commit 05b50df.
  • Loading branch information
burnettk committed Feb 2, 2024
1 parent 05b50df commit c291aea
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 19 deletions.
34 changes: 34 additions & 0 deletions spiffworkflow-backend/migrations/versions/acf20342181e_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""empty message
Revision ID: acf20342181e
Revises: 343b406f723d
Create Date: 2024-02-02 16:47:00.942504
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'acf20342181e'
down_revision = '343b406f723d'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.add_column(sa.Column('archived_for_process_instance_status', sa.Boolean(), nullable=False))
batch_op.create_index(batch_op.f('ix_future_task_archived_for_process_instance_status'), ['archived_for_process_instance_status'], unique=False)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_future_task_archived_for_process_instance_status'))
batch_op.drop_column('archived_for_process_instance_status')

# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
)
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
Expand Down Expand Up @@ -57,24 +58,46 @@ def remove_stale_locks(self) -> None:
ProcessInstanceLockService.remove_stale_locks()

def process_future_tasks(self) -> None:
"""If something has been locked for a certain amount of time it is probably stale so unlock it."""
"""Timer related tasks go in the future_task table.
Celery is not great at scheduling things in the distant future. So this function periodically checks the future_task
table and puts tasks into the queue that are imminently ready to run. Imminently is configurable and defaults to those
that are 5 minutes away or less.
"""

with self.app.app_context():
future_task_lookahead_in_seconds = self.app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_FUTURE_TASK_LOOKAHEAD_IN_SECONDS"
]
lookahead = time.time() + future_task_lookahead_in_seconds
future_tasks = FutureTaskModel.query.filter(
and_(
FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead,
)
).all()
for future_task in future_tasks:
process_instance = (
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
.filter(TaskModel.guid == future_task.guid)
.first()
)
self.__class__.do_process_future_tasks(future_task_lookahead_in_seconds)

@classmethod
def do_process_future_tasks(cls, future_task_lookahead_in_seconds: int) -> None:
future_tasks = cls.imminent_future_tasks(future_task_lookahead_in_seconds)
for future_task in future_tasks:
process_instance = (
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
.filter(TaskModel.guid == future_task.guid)
.first()
)
if process_instance.allowed_to_run():
queue_future_task_if_appropriate(
process_instance, eta_in_seconds=future_task.run_at_in_seconds, task_guid=future_task.guid
)
else:
# if we are not allowed to run the process instance, we should not keep processing the future task
future_task.archived_for_process_instance_status = True
db.session.add(future_task)
db.session.commit()

@classmethod
def imminent_future_tasks(cls, future_task_lookahead_in_seconds: int) -> list[FutureTaskModel]:
lookahead = time.time() + future_task_lookahead_in_seconds
future_tasks: list[FutureTaskModel] = FutureTaskModel.query.filter(
and_(
FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead,
)
).all()
return future_tasks
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
if queue_enabled_for_process_model(process_instance):
buffer = 1
countdown = eta_in_seconds - time.time() + buffer
args_to_celery = {"process_instance_id": process_instance.id, "task_guid": task_guid}
args_to_celery = {
"process_instance_id": process_instance.id,
"task_guid": task_guid,
# the producer_identifier is so we can know what is putting messages in the queue
"producer_identifier": "future_task",
}
# add buffer to countdown to avoid rounding issues and race conditions with spiff. the situation we want to avoid is where
# we think the timer said to run it at 6:34:11, and we initialize the SpiffWorkflow library,
# expecting the timer to be ready, but the library considered it ready a little after that time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class FutureTaskModel(SpiffworkflowBaseDBModel):
guid: str = db.Column(db.String(36), primary_key=True)
run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True)
completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)
archived_for_process_instance_status: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)

updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations
from flask_sqlalchemy.query import Query
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.future_task import FutureTaskModel

from typing import Any

Expand Down Expand Up @@ -48,7 +51,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
process_model_display_name: str = db.Column(db.String(255), nullable=False, index=True)
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False, index=True) # type: ignore
bpmn_process_definition_id: int | None = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), nullable=True, index=True # type: ignore
ForeignKey(BpmnProcessDefinitionModel.id), # type: ignore
nullable=True,
index=True,
)
bpmn_process_id: int | None = db.Column(ForeignKey(BpmnProcessModel.id), nullable=True, index=True) # type: ignore

Expand Down Expand Up @@ -117,6 +122,16 @@ def spiffworkflow_fully_initialized(self) -> bool:
"""
return self.bpmn_process_definition_id is not None and self.bpmn_process_id is not None

def future_tasks_query(self) -> Query:
future_tasks: Query = (
FutureTaskModel.query.filter(
FutureTaskModel.completed == False, # noqa: E712
)
.join(TaskModel, TaskModel.guid == FutureTaskModel.guid)
.filter(TaskModel.process_instance_id == self.id)
)
return future_tasks

def serialized(self) -> dict[str, Any]:
"""Return object data in serializeable format."""
return {
Expand Down Expand Up @@ -152,6 +167,10 @@ def validate_status(self, key: str, value: Any) -> Any:
def can_submit_task(self) -> bool:
return not self.has_terminal_status() and self.status != "suspended"

def allowed_to_run(self) -> bool:
"""If this process can currently move forward with things like do_engine_steps."""
return not self.has_terminal_status() and self.status != "suspended"

def can_receive_message(self) -> bool:
"""If this process can currently accept messages."""
return not self.has_terminal_status() and self.status != "suspended"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from spiffworkflow_backend.models.future_task import FutureTaskModel

# TODO: clean up this service for a clear distinction between it and the process_instance_service
# where this points to the pi service
import copy
Expand Down Expand Up @@ -609,9 +611,9 @@ def _set_definition_dict_for_bpmn_subprocess_definitions(
bpmn_process_definition_dict: dict = bpmn_subprocess_definition.properties_json
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier] = bpmn_process_definition_dict
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = (
bpmn_subprocess_definition.bpmn_identifier
)
bpmn_subprocess_definition_bpmn_identifiers[
bpmn_subprocess_definition.id
] = bpmn_subprocess_definition.bpmn_identifier

task_definitions = TaskDefinitionModel.query.filter(
TaskDefinitionModel.bpmn_process_definition_id.in_(bpmn_subprocess_definition_bpmn_identifiers.keys()) # type: ignore
Expand Down Expand Up @@ -1810,9 +1812,20 @@ def suspend(self) -> None:
)
db.session.commit()

def bring_archived_future_tasks_back_to_life(self) -> None:
archived_future_tasks = (
self.process_instance_model.future_tasks_query()
.filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712
.all()
)
for archived_future_task in archived_future_tasks:
archived_future_task.archived_for_process_instance_status = False
db.session.add(archived_future_task)

def resume(self) -> None:
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
db.session.add(self.process_instance_model)
self.bring_archived_future_tasks_back_to_life()
ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.git_service import GitCommandError
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from flask import Flask
from pytest_mock.plugin import MockerFixture
from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor

from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec


class TestBackgroundProcessingService(BaseTest):
def test_process_future_tasks_with_no_future_tasks(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
BackgroundProcessingService(app).process_future_tasks()

def test_do_process_future_tasks_with_processable_future_task(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
mock = mocker.patch("celery.current_app.send_task")
process_instance = self._load_up_a_future_task_and_return_instance()
assert mock.call_count == 0
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
assert mock.call_count == 1
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
assert future_tasks[0].archived_for_process_instance_status is False

def test_do_process_future_tasks_with_unprocessable_future_task(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
mock = mocker.patch("celery.current_app.send_task")
process_instance = self._load_up_a_future_task_and_return_instance()
assert mock.call_count == 0
process_instance.status = "suspended"
db.session.add(process_instance)
db.session.commit()
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 1
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
# should not process anything, so nothing goes to queue
assert mock.call_count == 0
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
assert future_tasks[0].archived_for_process_instance_status is True

# the next time do_process_future_tasks runs, it will not consider this task, which is nice
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 0
processor = ProcessInstanceProcessor(process_instance)
processor.resume()
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 1

def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel:
process_model = load_test_spec(
process_model_id="test_group/user-task-with-timer",
process_model_source_directory="user-task-with-timer",
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)

assert process_instance.status == "user_input_required"

future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
return process_instance

0 comments on commit c291aea

Please sign in to comment.