Skip to content

Commit

Permalink
Release WF slot on UnableToAcquireLockException (#1871)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Sep 26, 2023
1 parent aa5cb41 commit 717ee05
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,37 +281,38 @@ public void handle(WorkflowTask task) throws Exception {
MDC.put(LoggerTag.RUN_ID, runId);

boolean locked = false;
if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
// Serialize workflow task processing for a particular workflow run.
// This is used to make sure that query tasks and real workflow tasks
// are serialized when sticky is on.
//
// Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
// id waiting for a lock and consuming threads in case if lock is unavailable.
//
// Throws interrupted exception which is propagated. It's a correct way to handle it here.
//
// TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
// This value should be dynamically configured.
// TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
// any sense?
// This MAYBE makes sense only if a previous workflow task timed out, it's still in
// progress on the worker and the next workflow task got picked up by the same exact
// worker from the general non-sticky task queue.
// Even in this case, this advice looks misleading, something else is going on
// (like an extreme network latency).
locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);

if (!locked) {
throw new UnableToAcquireLockException(
"Workflow lock for the run id hasn't been released by one of previous execution attempts, "
+ "consider increasing workflow task timeout.");
}
}

Stopwatch swTotal =
workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
try {
if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
// Serialize workflow task processing for a particular workflow run.
// This is used to make sure that query tasks and real workflow tasks
// are serialized when sticky is on.
//
// Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
// id waiting for a lock and consuming threads in case if lock is unavailable.
//
// Throws interrupted exception which is propagated. It's a correct way to handle it here.
//
// TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
// This value should be dynamically configured.
// TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
// any sense?
// This MAYBE makes sense only if a previous workflow task timed out, it's still in
// progress on the worker and the next workflow task got picked up by the same exact
// worker from the general non-sticky task queue.
// Even in this case, this advice looks misleading, something else is going on
// (like an extreme network latency).
locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);

if (!locked) {
throw new UnableToAcquireLockException(
"Workflow lock for the run id hasn't been released by one of previous execution attempts, "
+ "consider increasing workflow task timeout.");
}
}

Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
do {
PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void concurrentPollRequestLockTest() throws Exception {
ImmutableMap.of("worker_type", "WorkflowWorker"),
100.0);
// Cleanup
worker.shutdown(new ShutdownManager(), true).get();
worker.shutdown(new ShutdownManager(), false).get();
// Verify we only handled two tasks
verify(taskHandler, times(2)).handleWorkflowTask(any());
}
Expand Down

0 comments on commit 717ee05

Please sign in to comment.