Skip to content

Commit

Permalink
Revert "future tasks should not cause anything to happen if the insta…
Browse files Browse the repository at this point in the history
…nce is suspended"

This reverts commit b627567.
  • Loading branch information
burnettk committed Feb 2, 2024
1 parent b627567 commit 05b50df
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 195 deletions.
34 changes: 0 additions & 34 deletions spiffworkflow-backend/migrations/versions/acf20342181e_.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
)
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
Expand Down Expand Up @@ -58,46 +57,24 @@ def remove_stale_locks(self) -> None:
ProcessInstanceLockService.remove_stale_locks()

def process_future_tasks(self) -> None:
"""Timer related tasks go in the future_task table.
Celery is not great at scheduling things in the distant future. So this function periodically checks the future_task
table and puts tasks into the queue that are imminently ready to run. Imminently is configurable and defaults to those
that are 5 minutes away or less.
"""

"""If something has been locked for a certain amount of time it is probably stale so unlock it."""
with self.app.app_context():
future_task_lookahead_in_seconds = self.app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_FUTURE_TASK_LOOKAHEAD_IN_SECONDS"
]
self.__class__.do_process_future_tasks(future_task_lookahead_in_seconds)

@classmethod
def do_process_future_tasks(cls, future_task_lookahead_in_seconds: int) -> None:
future_tasks = cls.imminent_future_tasks(future_task_lookahead_in_seconds)
for future_task in future_tasks:
process_instance = (
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
.filter(TaskModel.guid == future_task.guid)
.first()
)
if process_instance.allowed_to_run():
lookahead = time.time() + future_task_lookahead_in_seconds
future_tasks = FutureTaskModel.query.filter(
and_(
FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead,
)
).all()
for future_task in future_tasks:
process_instance = (
ProcessInstanceModel.query.join(TaskModel, TaskModel.process_instance_id == ProcessInstanceModel.id)
.filter(TaskModel.guid == future_task.guid)
.first()
)
queue_future_task_if_appropriate(
process_instance, eta_in_seconds=future_task.run_at_in_seconds, task_guid=future_task.guid
)
else:
# if we are not allowed to run the process instance, we should not keep processing the future task
future_task.archived_for_process_instance_status = True
db.session.add(future_task)
db.session.commit()

@classmethod
def imminent_future_tasks(cls, future_task_lookahead_in_seconds: int) -> list[FutureTaskModel]:
lookahead = time.time() + future_task_lookahead_in_seconds
future_tasks: list[FutureTaskModel] = FutureTaskModel.query.filter(
and_(
FutureTaskModel.completed == False, # noqa: E712
FutureTaskModel.archived_for_process_instance_status == False, # noqa: E712
FutureTaskModel.run_at_in_seconds < lookahead,
)
).all()
return future_tasks
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
if queue_enabled_for_process_model(process_instance):
buffer = 1
countdown = eta_in_seconds - time.time() + buffer
args_to_celery = {
"process_instance_id": process_instance.id,
"task_guid": task_guid,
# the producer_identifier is so we can know what is putting messages in the queue
"producer_identifier": "future_task",
}
args_to_celery = {"process_instance_id": process_instance.id, "task_guid": task_guid}
# add buffer to countdown to avoid rounding issues and race conditions with spiff. the situation we want to avoid is where
# we think the timer said to run it at 6:34:11, and we initialize the SpiffWorkflow library,
# expecting the timer to be ready, but the library considered it ready a little after that time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ class FutureTaskModel(SpiffworkflowBaseDBModel):
guid: str = db.Column(db.String(36), primary_key=True)
run_at_in_seconds: int = db.Column(db.Integer, nullable=False, index=True)
completed: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)
archived_for_process_instance_status: bool = db.Column(db.Boolean, default=False, nullable=False, index=True)

updated_at_in_seconds: int = db.Column(db.Integer, nullable=False)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from __future__ import annotations
from flask_sqlalchemy.query import Query
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.future_task import FutureTaskModel

from typing import Any

Expand Down Expand Up @@ -51,9 +48,7 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
process_model_display_name: str = db.Column(db.String(255), nullable=False, index=True)
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False, index=True) # type: ignore
bpmn_process_definition_id: int | None = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), # type: ignore
nullable=True,
index=True,
ForeignKey(BpmnProcessDefinitionModel.id), nullable=True, index=True # type: ignore
)
bpmn_process_id: int | None = db.Column(ForeignKey(BpmnProcessModel.id), nullable=True, index=True) # type: ignore

Expand Down Expand Up @@ -122,16 +117,6 @@ def spiffworkflow_fully_initialized(self) -> bool:
"""
return self.bpmn_process_definition_id is not None and self.bpmn_process_id is not None

def future_tasks_query(self) -> Query:
future_tasks: Query = (
FutureTaskModel.query.filter(
FutureTaskModel.completed == False, # noqa: E712
)
.join(TaskModel, TaskModel.guid == FutureTaskModel.guid)
.filter(TaskModel.process_instance_id == self.id)
)
return future_tasks

def serialized(self) -> dict[str, Any]:
"""Return object data in serializeable format."""
return {
Expand Down Expand Up @@ -167,10 +152,6 @@ def validate_status(self, key: str, value: Any) -> Any:
def can_submit_task(self) -> bool:
return not self.has_terminal_status() and self.status != "suspended"

def allowed_to_run(self) -> bool:
"""If this process can currently move forward with things like do_engine_steps."""
return not self.has_terminal_status() and self.status != "suspended"

def can_receive_message(self) -> bool:
"""If this process can currently accept messages."""
return not self.has_terminal_status() and self.status != "suspended"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from spiffworkflow_backend.models.future_task import FutureTaskModel

# TODO: clean up this service for a clear distinction between it and the process_instance_service
# where this points to the pi service
import copy
Expand Down Expand Up @@ -611,9 +609,9 @@ def _set_definition_dict_for_bpmn_subprocess_definitions(
bpmn_process_definition_dict: dict = bpmn_subprocess_definition.properties_json
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier] = bpmn_process_definition_dict
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
bpmn_subprocess_definition_bpmn_identifiers[
bpmn_subprocess_definition.id
] = bpmn_subprocess_definition.bpmn_identifier
bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = (
bpmn_subprocess_definition.bpmn_identifier
)

task_definitions = TaskDefinitionModel.query.filter(
TaskDefinitionModel.bpmn_process_definition_id.in_(bpmn_subprocess_definition_bpmn_identifiers.keys()) # type: ignore
Expand Down Expand Up @@ -1812,20 +1810,9 @@ def suspend(self) -> None:
)
db.session.commit()

def bring_archived_future_tasks_back_to_life(self) -> None:
archived_future_tasks = (
self.process_instance_model.future_tasks_query()
.filter(FutureTaskModel.archived_for_process_instance_status == True) # noqa: E712
.all()
)
for archived_future_task in archived_future_tasks:
archived_future_task.archived_for_process_instance_status = False
db.session.add(archived_future_task)

def resume(self) -> None:
self.process_instance_model.status = ProcessInstanceStatus.waiting.value
db.session.add(self.process_instance_model)
self.bring_archived_future_tasks_back_to_life()
ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model_cycle import ProcessModelCycleModel
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.git_service import GitCommandError
Expand Down

This file was deleted.

0 comments on commit 05b50df

Please sign in to comment.