Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions sdk-api/src/main/java/dev/restate/sdk/HandlerRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
package dev.restate.sdk;

import dev.restate.common.Slice;
import dev.restate.common.function.ThrowingBiConsumer;
import dev.restate.common.function.ThrowingBiFunction;
import dev.restate.common.function.ThrowingConsumer;
import dev.restate.common.function.ThrowingFunction;
import dev.restate.common.function.*;
import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.endpoint.definition.HandlerContext;
import dev.restate.sdk.internal.ContextThreadLocal;
Expand All @@ -22,6 +19,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -182,25 +180,43 @@ public static <CTX extends Context> HandlerRunner<Void, Void> of(
public static final class Options
implements dev.restate.sdk.endpoint.definition.HandlerRunner.Options {
/**
* Default options will use a {@link Executors#newCachedThreadPool()} shared among all the
* {@link HandlerRunner} instances.
* Default options will use virtual threads on Java 21+, or fallback to {@link
* Executors#newCachedThreadPool()} for Java &lt; 21. The bounded pool is shared among all
* {@link HandlerRunner} instances, and is used by {@link Restate#run}/{@link Context#run} as
* well.
*/
public static final Options DEFAULT = new Options(Executors.newCachedThreadPool());
public static final Options DEFAULT = new Options(createDefaultExecutor());

private final Executor executor;

/**
* You can run on virtual threads by using the executor {@code
* Executors.newVirtualThreadPerTaskExecutor()}.
*/
private Options(Executor executor) {
this.executor = executor;
}

/** Copy this options setting the given {@code executor}. */
/**
* Create an instance of {@link Options} with the given {@code executor}.
*
* <p>The given executor is used for running the handler code, and {@link Restate#run}/{@link
* Context#run} as well.
*/
public static Options withExecutor(Executor executor) {
return new Options(executor);
}

private static ExecutorService createDefaultExecutor() {
// Try to use virtual threads if available (Java 21+)
try {
return (ExecutorService)
Executors.class.getMethod("newVirtualThreadPerTaskExecutor").invoke(null);
} catch (Exception e) {
LOG.debug(
"Virtual threads not available, using unbounded thread pool. "
+ "If you need to customize the thread pool used by your restate handlers, "
+ "use HandlerRunner.Options.withExecutor() with Endpoint.bind()");

return Executors.newCachedThreadPool();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm torn on this one. On one hand, we should use a bounded executor as described here #580 (comment)

On the other hand, i'm worried that this is gonna cause some breaking changes wrt to Java < 21 users: an undersized queue will certainly cause a flood of failing invocations for some existing users, because of this artificial bound we would introduce here.

}
}
}

static HandlerContext getHandlerContext() {
Expand Down