Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

* Updated package version to v1.0.0 - to be updated
* update DataConverterException with detail error message ([#78](https://github.com/microsoft/durabletask-java/issues/78))
* update OrchestratorBlockedEvent and TaskFailedException to be unchecked exceptions ([#88](https://github.com/microsoft/durabletask-java/issues/88))

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
* Control flow {@code Throwable} class for orchestrator functions. This {@code Throwable} must never be caught by user
* code.
* <p>
* {@code OrchestratorBlockedEvent} is thrown when an orchestrator calls {@link Task#await} on an uncompleted task. The
* {@code OrchestratorBlockedException} is thrown when an orchestrator calls {@link Task#await} on an uncompleted task. The
* purpose of throwing in this way is to halt execution of the orchestrator to save the current state and commit any
* side effects. Catching {@code OrchestratorBlockedEvent} in user code could prevent the orchestration from saving
* side effects. Catching {@code OrchestratorBlockedException} in user code could prevent the orchestration from saving
* state and scheduling new tasks, resulting in the orchestration getting stuck.
*/
public final class OrchestratorBlockedEvent extends Throwable {
OrchestratorBlockedEvent(String message) {
public final class OrchestratorBlockedException extends RuntimeException {
OrchestratorBlockedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ public interface OrchestratorFunction<R> {
* @param ctx the orchestration context, which provides access to additional context for the current orchestration
* execution
* @return the serializable output of the orchestrator function
* @throws OrchestratorBlockedEvent when the orchestrator blocks on an uncompleted task, which is a normal occurrence
* @throws TaskFailedException when a task fails with an unhandled exception
*/
R apply(TaskOrchestrationContext ctx) throws OrchestratorBlockedEvent, TaskFailedException;
R apply(TaskOrchestrationContext ctx);
}
11 changes: 3 additions & 8 deletions client/src/main/java/com/microsoft/durabletask/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package com.microsoft.durabletask;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Represents an asynchronous operation in a durable orchestration.
Expand All @@ -16,13 +14,13 @@
* </pre>
* <p>
* Orchestrator code uses the {@link #await()} method to block on the completion of the task and retrieve the result.
* If the task is not yet complete, the {@code await()} method will throw an {@link OrchestratorBlockedEvent}, which
* If the task is not yet complete, the {@code await()} method will throw an {@link OrchestratorBlockedException}, which
* pauses the orchestrator's execution so that it can save its progress into durable storage and schedule any
* outstanding work. When the task is complete, the orchestrator will run again from the beginning and the next time
* the task's {@code await()} method is called, the result will be returned, or a {@link TaskFailedException} will be
* thrown if the result of the task was an unhandled exception.
* <p>
* Note that orchestrator code must never catch {@code OrchestratorBlockedEvent} because doing so can cause the
* Note that orchestrator code must never catch {@code OrchestratorBlockedException} because doing so can cause the
* orchestration instance to get permanently stuck.
*
* @param <V> the return type of the task
Expand Down Expand Up @@ -54,9 +52,6 @@ public boolean isCancelled() {
* Blocks the orchestrator until this task to complete, and then returns its result.
*
* @return the result of the task
* @throws TaskFailedException if the task failed with an unhandled exception
* @throws OrchestratorBlockedEvent if the task has not yet been scheduled, which is a normal occurrence.
* This {@code Throwable} must never be caught in user code.
*/
public abstract V await() throws TaskFailedException, OrchestratorBlockedEvent;
public abstract V await();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* Detailed information associated with a particular task failure can be retrieved using the {@link #getErrorDetails()}
* method.
*/
public class TaskFailedException extends Exception {
public class TaskFailedException extends RuntimeException {
private final FailureDetails details;
private final String taskName;
private final int taskId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public interface TaskOrchestration {
*
* @param ctx provides access to methods for scheduling durable tasks and getting information about the current
* orchestration instance.
* @throws TaskFailedException when an orchestrator fails with an unhandled exception
* @throws OrchestratorBlockedEvent when the orchestrator blocks on an uncompleted task, which is a normal occurrence
*/
void run(TaskOrchestrationContext ctx) throws TaskFailedException, OrchestratorBlockedEvent;
void run(TaskOrchestrationContext ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public TaskOrchestratorResult execute(List<HistoryEvent> pastEvents, List<Histor
// or we receive a yield signal
while (context.processNextEvent()) { /* no method body */ }
completed = true;
} catch (OrchestratorBlockedException orchestratorBlockedException) {
logger.fine("The orchestrator has yielded and will await for new events.");
} catch (Exception e) {
// The orchestrator threw an unhandled exception - fail it
// TODO: What's the right way to log this?
logger.warning("The orchestrator failed with an unhandled exception: " + e.toString());
context.fail(new FailureDetails(e));
} catch (OrchestratorBlockedEvent orchestratorBlockedEvent) {
logger.fine("The orchestrator has yielded and will await for new events.");
}

if (context.continuedAsNew || (completed && context.pendingActions.isEmpty() && !context.waitingForEvents())) {
Expand Down Expand Up @@ -712,11 +712,11 @@ private boolean waitingForEvents() {
return this.outstandingEvents.size() > 0;
}

private boolean processNextEvent() throws TaskFailedException, OrchestratorBlockedEvent {
private boolean processNextEvent() {
return this.historyEventPlayer.moveNext();
}

private void processEvent(HistoryEvent e) throws TaskFailedException, OrchestratorBlockedEvent {
private void processEvent(HistoryEvent e) {
switch (e.getEventTypeCase()) {
case ORCHESTRATORSTARTED:
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
Expand Down Expand Up @@ -826,7 +826,7 @@ public OrchestrationHistoryIterator(List<HistoryEvent> pastEvents, List<HistoryE
this.currentHistoryList = pastEvents;
}

public boolean moveNext() throws TaskFailedException, OrchestratorBlockedEvent {
public boolean moveNext() {
if (this.currentHistoryList == pastEvents && this.currentHistoryIndex >= pastEvents.size()) {
// Move forward to the next list
this.currentHistoryList = this.newEvents;
Expand Down Expand Up @@ -860,7 +860,7 @@ public ExternalEventTask(String eventName, int taskId, Duration timeout) {

// TODO: Shouldn't this be throws TaskCanceledException?
@Override
protected void handleException(Throwable e) throws TaskFailedException {
protected void handleException(Throwable e) {
// Cancellation is caused by user-specified timeouts
if (e instanceof CancellationException) {
String message = String.format(
Expand Down Expand Up @@ -910,7 +910,7 @@ private RetriableTask(
}

@Override
public V await() throws TaskFailedException, OrchestratorBlockedEvent {
public V await() {
Instant startTime = this.context.getCurrentInstant();
while (true) {
Task<V> currentTask = this.taskFactory.create();
Expand Down Expand Up @@ -1024,7 +1024,7 @@ public CompletableTask() {
}

@Override
public V await() throws TaskFailedException, OrchestratorBlockedEvent {
public V await() {
do {
// If the future is done, return its value right away
if (this.future.isDone()) {
Expand All @@ -1039,14 +1039,14 @@ public V await() throws TaskFailedException, OrchestratorBlockedEvent {
} while (ContextImplTask.this.processNextEvent());

// There's no more history left to replay and the current task is still not completed. This is normal.
// The OrchestratorBlockedEvent throwable allows us to yield the current thread back to the executor so
// The OrchestratorBlockedException exception allows us to yield the current thread back to the executor so
// that we can send the current set of actions back to the worker and wait for new events to come in.
// This is *not* an exception - it's a normal part of orchestrator control flow.
throw new OrchestratorBlockedEvent(
throw new OrchestratorBlockedException(
"The orchestrator is blocked and waiting for new inputs. This Throwable should never be caught by user code.");
}

protected void handleException(Throwable e) throws TaskFailedException {
protected void handleException(Throwable e) {
if (e instanceof TaskFailedException) {
throw (TaskFailedException)e;
}
Expand Down