Skip to content

Commit

Permalink
Orbit stage timer prone to terminating. (#317)
Browse files Browse the repository at this point in the history
Motivation:

If an exception is thrown inside a timer task the timer will shut down and subsequent executions suppressed.

Modification:

Catch exceptions in the timer tasks.
Return exceptionally completed task in WaitFreeExceptionSerializer instead of throwing exception.

Result:

Stage timer should continue running until the stage is shutdown.
  • Loading branch information
johnou authored and JoeHegarty committed Jan 2, 2019
1 parent 9cb8d76 commit e7d7a2b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
62 changes: 38 additions & 24 deletions actors/runtime/src/main/java/cloud/orbit/actors/Stage.java
Original file line number Diff line number Diff line change
Expand Up @@ -933,9 +933,16 @@ else if (extension.getAfterHandlerName() != null)
@Override
public void run()
{
if (state == NodeCapabilities.NodeState.RUNNING)
try
{
ForkJoinTask.adapt(() -> pulse().join()).fork();
if (state == NodeCapabilities.NodeState.RUNNING)
{
ForkJoinTask.adapt(() -> pulse().join()).fork();
}
}
catch (Exception e)
{
logger.error("Failed executing timer task", e);
}
}
}, pulseIntervalMillis, pulseIntervalMillis);
Expand Down Expand Up @@ -1273,34 +1280,41 @@ public void dispose()
@Override
public void run()
{
if (localActor.isDeactivated() || state == NodeCapabilities.NodeState.STOPPED)
try
{
cancel();
return;
}
if (localActor.isDeactivated() || state == NodeCapabilities.NodeState.STOPPED)
{
cancel();
return;
}

executionSerializer.offerJob(key,
() -> {
if (localActor.isDeactivated() || state == NodeCapabilities.NodeState.STOPPED)
{
cancel();
}
else
{
try
executionSerializer.offerJob(key,
() -> {
if (localActor.isDeactivated() || state == NodeCapabilities.NodeState.STOPPED)
{
if (!canceled)
{
return (Task) taskCallable.call();
}
cancel();
}
catch (final Exception ex)
else
{
logger.warn("Error calling timer", ex);
try
{
if (!canceled)
{
return (Task) taskCallable.call();
}
}
catch (final Exception ex)
{
logger.warn("Error calling timer", ex);
}
}
}
return (Task) Task.done();
}, 10000);
return (Task) Task.done();
}, 10000);
}
catch (Exception e)
{
logger.error("Failed executing timer task", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public <R> Task<R> executeSerialized(Supplier<Task<R>> taskSupplier, int maxQueu
return source;
}))
{
throw new IllegalStateException(String.format("Queue full for %s (%d > %d)", key, queue.size(), maxQueueSize));
return Task.fromException(new IllegalStateException(String.format("Queue full for %s (%d > %d)", key, queue.size(), maxQueueSize)));
}

// managing the size like this to avoid using ConcurrentLinkedQueue.size()
Expand Down

0 comments on commit e7d7a2b

Please sign in to comment.