Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

future tasks should not cause anything to happen if the instance is suspended #952

Merged
merged 8 commits into from
Feb 6, 2024
34 changes: 34 additions & 0 deletions spiffworkflow-backend/migrations/versions/acf20342181e_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""empty message

Revision ID: acf20342181e
Revises: 343b406f723d
Create Date: 2024-02-02 16:47:00.942504

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'acf20342181e'
down_revision = '343b406f723d'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.add_column(sa.Column('archived_for_process_instance_status', sa.Boolean(), nullable=False))
batch_op.create_index(batch_op.f('ix_future_task_archived_for_process_instance_status'), ['archived_for_process_instance_status'], unique=False)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('future_task', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_future_task_archived_for_process_instance_status'))
batch_op.drop_column('archived_for_process_instance_status')

# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
)
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
Expand Down Expand Up @@ -57,24 +58,46 @@ def remove_stale_locks(self) -> None:
ProcessInstanceLockService.remove_stale_locks()

def process_future_tasks(self) -> None:
"""If something has been locked for a certain amount of time it is probably stale so unlock it."""
"""Timer related tasks go in the future_task table.

Celery is not great at scheduling things in the distant future. So this function periodically checks the future_task
table and puts tasks into the queue that are imminently ready to run. Imminently is configurable and defaults to those
that are 5 minutes away or less.
"""

with self.app.app_context():
future_task_lookahead_in_seconds = self.app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_FUTURE_TASK_LOOKAHEAD_IN_SECONDS"
]
lookahead = time.time() + future_task_lookahead_in_seconds
future_tasks = FutureTaskModel.query.filter(
and_(
FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead,
)
).all()
for future_task in future_tasks:
process_instance = (
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
.filter(TaskModel.guid == future_task.guid)
.first()
)
self.__class__.do_process_future_tasks(future_task_lookahead_in_seconds)

@classmethod
def do_process_future_tasks(cls, future_task_lookahead_in_seconds: int) -> None:
future_tasks = cls.imminent_future_tasks(future_task_lookahead_in_seconds)
for future_task in future_tasks:
process_instance = (
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
.filter(TaskModel.guid == future_task.guid)
.first()
)
if process_instance.allowed_to_run():
queue_future_task_if_appropriate(
process_instance, eta_in_seconds=future_task.run_at_in_seconds, task_guid=future_task.guid
)
else:
# if we are not allowed to run the process instance, we should not keep processing the future task
future_task.archived_for_process_instance_status = True
db.session.add(future_task)
db.session.commit()

@classmethod
def imminent_future_tasks(cls, future_task_lookahead_in_seconds: int) -> list[FutureTaskModel]:
lookahead = time.time() + future_task_lookahead_in_seconds
future_tasks: list[FutureTaskModel] = FutureTaskModel.query.filter(
and_(
FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead,
)
).all()
return future_tasks
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
if queue_enabled_for_process_model(process_instance):
buffer = 1
countdown = eta_in_seconds - time.time() + buffer
args_to_celery = {"process_instance_id": process_instance.id, "task_guid": task_guid}
args_to_celery = {
"process_instance_id": process_instance.id,
"task_guid": task_guid,
# the producer_identifier is so we can know what is putting messages in the queue
"producer_identifier": "future_task",
}
# add buffer to countdown to avoid rounding issues and race conditions with spiff. the situation we want to avoid is where
# we think the timer said to run it at 6:34:11, and we initialize the SpiffWorkflow library,
# expecting the timer to be ready, but the library considered it ready a little after that time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class FutureTaskModel(SpiffworkflowBaseDBModel):
guid: str = db.Column(db.String(36), primary_key=True)
run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True)
completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)
archived_for_process_instance_status: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)

updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations
from flask_sqlalchemy.query import Query
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.future_task import FutureTaskModel

from typing import Any

Expand Down Expand Up @@ -48,7 +51,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
process_model_display_name: str = db.Column(db.String(255), nullable=False, index=True)
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False, index=True) # type: ignore
bpmn_process_definition_id: int | None = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), nullable=True, index=True # type: ignore
ForeignKey(BpmnProcessDefinitionModel.id), # type: ignore
nullable=True,
index=True,
)
bpmn_process_id: int | None = db.Column(ForeignKey(BpmnProcessModel.id), nullable=True, index=True) # type: ignore

Expand Down Expand Up @@ -117,6 +122,16 @@ def spiffworkflow_fully_initialized(self) -> bool:
"""
return self.bpmn_process_definition_id is not None and self.bpmn_process_id is not None

def future_tasks_query(self) -> Query:
future_tasks: Query = (
FutureTaskModel.query.filter(
FutureTaskModel.completed == False, # noqa: E712
)
.join(TaskModel, TaskModel.guid == FutureTaskModel.guid)
.filter(TaskModel.process_instance_id == self.id)
)
return future_tasks

def serialized(self) -> dict[str, Any]:
"""Return object data in serializeable format."""
return {
Expand Down Expand Up @@ -152,6 +167,10 @@ def validate_status(self, key: str, value: Any) -> Any:
def can_submit_task(self) -> bool:
return not self.has_terminal_status() and self.status != "suspended"

def allowed_to_run(self) -> bool:
"""If this process can currently move forward with things like do_engine_steps."""
return not self.has_terminal_status() and self.status != "suspended"

def can_receive_message(self) -> bool:
"""If this process can currently accept messages."""
return not self.has_terminal_status() and self.status != "suspended"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from spiffworkflow_backend.models.future_task import FutureTaskModel

# TODO: clean up this service for a clear distinction between it and the process_instance_service
# where this points to the pi service
import copy
Expand Down Expand Up @@ -609,9 +611,9 @@ def _set_definition_dict_for_bpmn_subprocess_definitions(
bpmn_process_definition_dict: dict = bpmn_subprocess_definition.properties_json
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier] = bpmn_process_definition_dict
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = (
bpmn_subprocess_definition.bpmn_identifier
)
bpmn_subprocess_definition_bpmn_identifiers[
bpmn_subprocess_definition.id
] = bpmn_subprocess_definition.bpmn_identifier

task_definitions = TaskDefinitionModel.query.filter(
TaskDefinitionModel.bpmn_process_definition_id.in_(bpmn_subprocess_definition_bpmn_identifiers.keys()) # type: ignore
Expand Down Expand Up @@ -1810,9 +1812,20 @@ def suspend(self) -> None:
)
db.session.commit()

def bring_archived_future_tasks_back_to_life(self) -> None:
archived_future_tasks = (
self.process_instance_model.future_tasks_query()
.filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712
.all()
)
for archived_future_task in archived_future_tasks:
archived_future_task.archived_for_process_instance_status = False
db.session.add(archived_future_task)
Comment on lines +1814 to +1822
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method bring_archived_future_tasks_back_to_life correctly updates the archived_for_process_instance_status flag for archived future tasks. However, it's important to commit the session changes within the method to ensure data consistency and handle potential exceptions gracefully.

+ db.session.commit()

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
def bring_archived_future_tasks_back_to_life(self) -> None:
archived_future_tasks = (
self.process_instance_model.future_tasks_query()
.filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712
.all()
)
for archived_future_task in archived_future_tasks:
archived_future_task.archived_for_process_instance_status = False
db.session.add(archived_future_task)
def bring_archived_future_tasks_back_to_life(self) -> None:
archived_future_tasks = (
self.process_instance_model.future_tasks_query()
.filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712
.all()
)
for archived_future_task in archived_future_tasks:
archived_future_task.archived_for_process_instance_status = False
db.session.add(archived_future_task)
db.session.commit()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want this logical transaction to partially succeed. the commit at the end of the resume method is when the logical transaction is complete and when we want to commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@burnettk, understood. It makes sense to manage the transaction commit at the end of the resume method to ensure the entire logical transaction completes successfully as a whole. Thank you for clarifying your approach to transaction management.


def resume(self) -> None:
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
db.session.add(self.process_instance_model)
self.bring_archived_future_tasks_back_to_life()
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resume method correctly calls bring_archived_future_tasks_back_to_life to unarchive future tasks. However, it's recommended to handle potential exceptions and ensure the database session is committed only if all operations succeed to maintain data integrity.

Consider wrapping the operations in a try-except block and commit the session in a finally block if no exceptions occur.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is committing at the end of the resume method, it should be logically equivalent to what you're suggesting. it will only commit if all operations succeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@burnettk You're absolutely right. The commit at the end of the resume method does ensure that all operations within the method must succeed for the changes to be persisted, effectively maintaining data integrity as you've described. This approach indeed makes the explicit use of a try-except block for transaction management unnecessary in this context. Thank you for pointing this out and clarifying.

ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.git_service import GitCommandError
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from flask import Flask
from pytest_mock.plugin import MockerFixture
from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor

from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec


class TestBackgroundProcessingService(BaseTest):
def test_process_future_tasks_with_no_future_tasks(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
BackgroundProcessingService(app).process_future_tasks()

def test_do_process_future_tasks_with_processable_future_task(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
mock = mocker.patch("celery.current_app.send_task")
process_instance = self._load_up_a_future_task_and_return_instance()
assert mock.call_count == 0
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
assert mock.call_count == 1
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
assert future_tasks[0].archived_for_process_instance_status is False

def test_do_process_future_tasks_with_unprocessable_future_task(
self,
app: Flask,
mocker: MockerFixture,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True):
mock = mocker.patch("celery.current_app.send_task")
process_instance = self._load_up_a_future_task_and_return_instance()
assert mock.call_count == 0
process_instance.status = "suspended"
db.session.add(process_instance)
db.session.commit()
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 1
BackgroundProcessingService.do_process_future_tasks(99999999999999999)
# should not process anything, so nothing goes to queue
assert mock.call_count == 0
future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
assert future_tasks[0].archived_for_process_instance_status is True

# the next time do_process_future_tasks runs, it will not consider this task, which is nice
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 0
processor = ProcessInstanceProcessor(process_instance)
processor.resume()
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999)
assert len(future_tasks) == 1

def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel:
process_model = load_test_spec(
process_model_id="test_group/user-task-with-timer",
process_model_source_directory="user-task-with-timer",
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)

assert process_instance.status == "user_input_required"

future_tasks = FutureTaskModel.query.all()
assert len(future_tasks) == 1
return process_instance
Loading