-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
future tasks should not cause anything to happen if the instance is suspended #952
Changes from all commits
c291aea
e037365
347ef3e
9b8d1b2
80355db
5d081ec
af1eb6f
5bf7a9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
"""empty message | ||
|
||
Revision ID: 29b261f5edf4 | ||
Revises: 343b406f723d | ||
Create Date: 2024-02-06 13:52:18.973974 | ||
|
||
""" | ||
from alembic import op | ||
import sqlalchemy as sa | ||
|
||
|
||
# revision identifiers, used by Alembic. | ||
revision = '29b261f5edf4' | ||
down_revision = '343b406f723d' | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
with op.batch_alter_table('future_task', schema=None) as batch_op: | ||
batch_op.add_column(sa.Column('archived_for_process_instance_status', sa.Boolean(), server_default=sa.text('false'), nullable=False)) | ||
batch_op.create_index(batch_op.f('ix_future_task_archived_for_process_instance_status'), ['archived_for_process_instance_status'], unique=False) | ||
|
||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
with op.batch_alter_table('future_task', schema=None) as batch_op: | ||
batch_op.drop_index(batch_op.f('ix_future_task_archived_for_process_instance_status')) | ||
batch_op.drop_column('archived_for_process_instance_status') | ||
|
||
# ### end Alembic commands ### |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,6 +69,7 @@ | |
from spiffworkflow_backend.models.db import db | ||
from spiffworkflow_backend.models.file import File | ||
from spiffworkflow_backend.models.file import FileType | ||
from spiffworkflow_backend.models.future_task import FutureTaskModel | ||
from spiffworkflow_backend.models.group import GroupModel | ||
from spiffworkflow_backend.models.human_task import HumanTaskModel | ||
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel | ||
|
@@ -1810,9 +1811,20 @@ def suspend(self) -> None: | |
) | ||
db.session.commit() | ||
|
||
def bring_archived_future_tasks_back_to_life(self) -> None: | ||
archived_future_tasks = ( | ||
self.process_instance_model.future_tasks_query() | ||
.filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712 | ||
.all() | ||
) | ||
for archived_future_task in archived_future_tasks: | ||
archived_future_task.archived_for_process_instance_status = False | ||
db.session.add(archived_future_task) | ||
|
||
def resume(self) -> None: | ||
self.process_instance_model.status = ProcessInstanceStatus.waiting.value | ||
db.session.add(self.process_instance_model) | ||
self.bring_archived_future_tasks_back_to_life() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Consider wrapping the operations in a try-except block and commit the session in a finally block if no exceptions occur. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since it is committing at the end of the resume method, it should be logically equivalent to what you're suggesting. it will only commit if all operations succeed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @burnettk You're absolutely right. The commit at the end of the |
||
ProcessInstanceTmpService.add_event_to_process_instance( | ||
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
from flask import Flask | ||
from pytest_mock.plugin import MockerFixture | ||
from spiffworkflow_backend.background_processing.background_processing_service import BackgroundProcessingService | ||
from spiffworkflow_backend.models.db import db | ||
from spiffworkflow_backend.models.future_task import FutureTaskModel | ||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel | ||
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor | ||
|
||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest | ||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec | ||
|
||
|
||
class TestBackgroundProcessingService(BaseTest): | ||
def test_process_future_tasks_with_no_future_tasks( | ||
self, | ||
app: Flask, | ||
with_db_and_bpmn_file_cleanup: None, | ||
) -> None: | ||
BackgroundProcessingService(app).process_future_tasks() | ||
|
||
def test_do_process_future_tasks_with_processable_future_task( | ||
self, | ||
app: Flask, | ||
mocker: MockerFixture, | ||
with_db_and_bpmn_file_cleanup: None, | ||
) -> None: | ||
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True): | ||
mock = mocker.patch("celery.current_app.send_task") | ||
self._load_up_a_future_task_and_return_instance() | ||
assert mock.call_count == 0 | ||
BackgroundProcessingService.do_process_future_tasks(99999999999999999) | ||
assert mock.call_count == 1 | ||
future_tasks = FutureTaskModel.query.all() | ||
assert len(future_tasks) == 1 | ||
assert future_tasks[0].archived_for_process_instance_status is False | ||
|
||
def test_do_process_future_tasks_with_unprocessable_future_task( | ||
self, | ||
app: Flask, | ||
mocker: MockerFixture, | ||
with_db_and_bpmn_file_cleanup: None, | ||
) -> None: | ||
with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_CELERY_ENABLED", True): | ||
mock = mocker.patch("celery.current_app.send_task") | ||
process_instance = self._load_up_a_future_task_and_return_instance() | ||
assert mock.call_count == 0 | ||
process_instance.status = "suspended" | ||
db.session.add(process_instance) | ||
db.session.commit() | ||
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999) | ||
assert len(future_tasks) == 1 | ||
BackgroundProcessingService.do_process_future_tasks(99999999999999999) | ||
# should not process anything, so nothing goes to queue | ||
assert mock.call_count == 0 | ||
future_tasks = FutureTaskModel.query.all() | ||
assert len(future_tasks) == 1 | ||
assert future_tasks[0].archived_for_process_instance_status is True | ||
|
||
# the next time do_process_future_tasks runs, it will not consider this task, which is nice | ||
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999) | ||
assert len(future_tasks) == 0 | ||
processor = ProcessInstanceProcessor(process_instance) | ||
processor.resume() | ||
future_tasks = BackgroundProcessingService.imminent_future_tasks(99999999999999999) | ||
assert len(future_tasks) == 1 | ||
|
||
def _load_up_a_future_task_and_return_instance(self) -> ProcessInstanceModel: | ||
process_model = load_test_spec( | ||
process_model_id="test_group/user-task-with-timer", | ||
process_model_source_directory="user-task-with-timer", | ||
) | ||
process_instance = self.create_process_instance_from_process_model(process_model=process_model) | ||
processor = ProcessInstanceProcessor(process_instance) | ||
processor.do_engine_steps(save=True) | ||
|
||
assert process_instance.status == "user_input_required" | ||
|
||
future_tasks = FutureTaskModel.query.all() | ||
assert len(future_tasks) == 1 | ||
return process_instance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method
bring_archived_future_tasks_back_to_life
correctly updates thearchived_for_process_instance_status
flag for archived future tasks. However, it's important to commit the session changes within the method to ensure data consistency and handle potential exceptions gracefully.+ db.session.commit()
Committable suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't want this logical transaction to partially succeed. the commit at the end of the resume method is when the logical transaction is complete and when we want to commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@burnettk, understood. It makes sense to manage the transaction commit at the end of the resume method to ensure the entire logical transaction completes successfully as a whole. Thank you for clarifying your approach to transaction management.