Skip to content

Commit

Permalink
Fix GetOutput and Promise#peek, which now returns the Output type (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Jun 3, 2024
1 parent 5bba500 commit 52227b9
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 77 deletions.
3 changes: 2 additions & 1 deletion sdk-api-gen/src/test/java/dev/restate/sdk/CodegenTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public String submit(SharedWorkflowContext context, String request) {
CodegenTestWorkflowCornerCasesClient.connect("invalid", request).submit("my_send");
return CodegenTestWorkflowCornerCasesClient.connect("invalid", request)
.workflowHandle()
.getOutput();
.getOutput()
.getValue();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class CodegenTest : TestDefinitions.TestSuite {
return CodegenTestWorkflowCornerCasesClient.connect("invalid", request)
.workflowHandle()
.output
.value
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
return SingleSerdeAwaitableImpl(syscalls, deferred, key.serde())
}

override suspend fun peek(): T? {
override suspend fun peek(): Output<T> {
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.peekPromise(key.name(), completingContinuation(cont))
Expand All @@ -259,24 +259,9 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
throw readyResult.failure!!
}
if (readyResult.isEmpty) {
return null
return Output.notReady()
}
return key.serde().deserializeWrappingException(syscalls, readyResult.value!!)!!
}

override suspend fun isCompleted(): Boolean {
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.peekPromise(key.name(), completingContinuation(cont))
}

if (!deferred.isCompleted) {
suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.resolveDeferred(deferred, completingUnitContinuation(cont))
}
}

return !deferred.toResult()!!.isEmpty
return Output.ready(key.serde().deserializeWrappingException(syscalls, readyResult.value!!))
}
}

Expand Down
5 changes: 1 addition & 4 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,7 @@ sealed interface DurablePromise<T> {
suspend fun awaitable(): Awaitable<T>

/** @return the value, if already present, otherwise returns an empty optional. */
suspend fun peek(): T?

/** @return true if the promise is already completed. */
suspend fun isCompleted(): Boolean
suspend fun peek(): Output<T>
}

/** This class represents a handle to a [DurablePromise] created in another service. */
Expand Down
21 changes: 11 additions & 10 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package dev.restate.sdk.kotlin
import dev.restate.sdk.client.Client
import dev.restate.sdk.client.RequestOptions
import dev.restate.sdk.client.SendResponse
import dev.restate.sdk.common.Output
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.Target
import kotlin.time.Duration
Expand Down Expand Up @@ -39,7 +40,7 @@ suspend fun <Req> Client.sendSuspend(
return this.sendAsync(target, reqSerde, req, delay.toJavaDuration(), options).await()
}

suspend fun <T> Client.AwakeableHandle.resolveSuspend(
suspend fun <T : Any> Client.AwakeableHandle.resolveSuspend(
serde: Serde<T>,
payload: T,
options: RequestOptions = RequestOptions.DEFAULT
Expand All @@ -56,24 +57,24 @@ suspend fun Client.AwakeableHandle.rejectSuspend(

suspend fun <T> Client.InvocationHandle<T>.attachSuspend(
options: RequestOptions = RequestOptions.DEFAULT
) {
this.attachAsync(options).await()
): T {
return this.attachAsync(options).await()
}

suspend fun <T> Client.InvocationHandle<T>.getOutputSuspend(
suspend fun <T : Any?> Client.InvocationHandle<T>.getOutputSuspend(
options: RequestOptions = RequestOptions.DEFAULT
) {
this.getOutputAsync(options).await()
): Output<T> {
return this.getOutputAsync(options).await()
}

suspend fun <T> Client.WorkflowHandle<T>.attachSuspend(
options: RequestOptions = RequestOptions.DEFAULT
) {
this.attachAsync(options).await()
): T {
return this.attachAsync(options).await()
}

suspend fun <T> Client.WorkflowHandle<T>.getOutputSuspend(
options: RequestOptions = RequestOptions.DEFAULT
) {
this.getOutputAsync(options).await()
): Output<T> {
return this.getOutputAsync(options).await()
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ class PromiseTest : PromiseTestSuite() {
emptyCaseReturnValue: String
): TestDefinitions.TestInvocationBuilder =
testDefinitionForWorkflow("AwaitPeekPromise") { ctx, _: Unit ->
ctx.promise(KtDurablePromiseKey.json<String>(promiseKey)).peek() ?: emptyCaseReturnValue
ctx.promise(KtDurablePromiseKey.json<String>(promiseKey))
.peek()
.orElse(emptyCaseReturnValue)
}

override fun awaitIsPromiseCompleted(promiseKey: String): TestDefinitions.TestInvocationBuilder =
testDefinitionForWorkflow("IsCompletedPromise") { ctx, _: Unit ->
ctx.promise(KtDurablePromiseKey.json<String>(promiseKey)).isCompleted()
ctx.promise(KtDurablePromiseKey.json<String>(promiseKey)).peek().isReady
}

override fun awaitResolvePromise(
Expand Down
16 changes: 2 additions & 14 deletions sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,29 +227,17 @@ public Awaitable<T> awaitable() {
}

@Override
public Optional<T> peek() {
public Output<T> peek() {
Deferred<ByteBuffer> deferred =
Util.blockOnSyscall(cb -> syscalls.peekPromise(key.name(), cb));

if (!deferred.isCompleted()) {
Util.<Void>blockOnSyscall(cb -> syscalls.resolveDeferred(deferred, cb));
}

return Util.unwrapOptionalReadyResult(deferred.toResult())
return Util.unwrapOutputReadyResult(deferred.toResult())
.map(bs -> Util.deserializeWrappingException(syscalls, key.serde(), bs));
}

@Override
public boolean isCompleted() {
Deferred<ByteBuffer> deferred =
Util.blockOnSyscall(cb -> syscalls.peekPromise(key.name(), cb));

if (!deferred.isCompleted()) {
Util.<Void>blockOnSyscall(cb -> syscalls.resolveDeferred(deferred, cb));
}

return !deferred.toResult().isEmpty();
}
};
}

Expand Down
9 changes: 2 additions & 7 deletions sdk-api/src/main/java/dev/restate/sdk/DurablePromise.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package dev.restate.sdk;

import dev.restate.sdk.common.DurablePromiseKey;
import java.util.Optional;
import dev.restate.sdk.common.Output;

/**
* A {@link DurablePromise} is a durable, distributed version of a {@link
Expand Down Expand Up @@ -39,10 +39,5 @@ public interface DurablePromise<T> {
/**
* @return the value, if already present, otherwise returns an empty optional.
*/
Optional<T> peek();

/**
* @return true if the promise is already completed.
*/
boolean isCompleted();
Output<T> peek();
}
11 changes: 11 additions & 0 deletions sdk-api/src/main/java/dev/restate/sdk/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package dev.restate.sdk;

import dev.restate.sdk.common.AbortedExecutionException;
import dev.restate.sdk.common.Output;
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.function.ThrowingFunction;
import dev.restate.sdk.common.syscalls.Deferred;
Expand Down Expand Up @@ -68,6 +69,16 @@ static <T> Optional<T> unwrapOptionalReadyResult(Result<T> res) {
return Optional.of(res.getValue());
}

static <T> Output<T> unwrapOutputReadyResult(Result<T> res) {
if (!res.isSuccess()) {
throw res.getFailure();
}
if (res.isEmpty()) {
return Output.notReady();
}
return Output.ready(res.getValue());
}

static <T, R> R executeMappingException(Syscalls syscalls, ThrowingFunction<T, R> fn, T t) {
try {
return fn.apply(t);
Expand Down
2 changes: 1 addition & 1 deletion sdk-api/src/test/java/dev/restate/sdk/PromiseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected TestDefinitions.TestInvocationBuilder awaitIsPromiseCompleted(String p
Serde.VOID,
JsonSerdes.BOOLEAN,
(context, unused) ->
context.promise(DurablePromiseKey.of(promiseKey, TestSerdes.STRING)).isCompleted());
context.promise(DurablePromiseKey.of(promiseKey, TestSerdes.STRING)).peek().isReady());
}

@Override
Expand Down
17 changes: 9 additions & 8 deletions sdk-common/src/main/java/dev/restate/sdk/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.client;

import dev.restate.sdk.common.Output;
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.Target;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -188,13 +189,13 @@ default Res attach() throws IngressException {
return attach(RequestOptions.DEFAULT);
}

CompletableFuture<Res> getOutputAsync(RequestOptions options);
CompletableFuture<Output<Res>> getOutputAsync(RequestOptions options);

default CompletableFuture<Res> getOutputAsync() {
default CompletableFuture<Output<Res>> getOutputAsync() {
return getOutputAsync(RequestOptions.DEFAULT);
}

default Res getOutput(RequestOptions options) throws IngressException {
default Output<Res> getOutput(RequestOptions options) throws IngressException {
try {
return getOutputAsync(options).join();
} catch (CompletionException e) {
Expand All @@ -205,7 +206,7 @@ default Res getOutput(RequestOptions options) throws IngressException {
}
}

default Res getOutput() throws IngressException {
default Output<Res> getOutput() throws IngressException {
return getOutput(RequestOptions.DEFAULT);
}
}
Expand Down Expand Up @@ -235,13 +236,13 @@ default Res attach() throws IngressException {
return attach(RequestOptions.DEFAULT);
}

CompletableFuture<Res> getOutputAsync(RequestOptions options);
CompletableFuture<Output<Res>> getOutputAsync(RequestOptions options);

default CompletableFuture<Res> getOutputAsync() {
default CompletableFuture<Output<Res>> getOutputAsync() {
return getOutputAsync(RequestOptions.DEFAULT);
}

default Res getOutput(RequestOptions options) throws IngressException {
default Output<Res> getOutput(RequestOptions options) throws IngressException {
try {
return getOutputAsync(options).join();
} catch (CompletionException e) {
Expand All @@ -252,7 +253,7 @@ default Res getOutput(RequestOptions options) throws IngressException {
}
}

default Res getOutput() throws IngressException {
default Output<Res> getOutput() throws IngressException {
return getOutput(RequestOptions.DEFAULT);
}
}
Expand Down
17 changes: 13 additions & 4 deletions sdk-common/src/main/java/dev/restate/sdk/client/DefaultClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import dev.restate.sdk.common.Output;
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.Target;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -230,7 +231,7 @@ public CompletableFuture<Res> attachAsync(RequestOptions options) {
}

@Override
public CompletableFuture<Res> getOutputAsync(RequestOptions options) {
public CompletableFuture<Output<Res>> getOutputAsync(RequestOptions options) {
// Prepare request
var reqBuilder =
HttpRequest.newBuilder()
Expand All @@ -254,8 +255,12 @@ public CompletableFuture<Res> getOutputAsync(RequestOptions options) {
handleNonSuccessResponse(response);
}

if (response.statusCode() == 470) {
return Output.notReady();
}

try {
return resSerde.deserialize(response.body());
return Output.ready(resSerde.deserialize(response.body()));
} catch (Exception e) {
throw new IngressException(
"Cannot deserialize the response",
Expand Down Expand Up @@ -316,7 +321,7 @@ public CompletableFuture<Res> attachAsync(RequestOptions options) {
}

@Override
public CompletableFuture<Res> getOutputAsync(RequestOptions options) {
public CompletableFuture<Output<Res>> getOutputAsync(RequestOptions options) {
// Prepare request
var reqBuilder =
HttpRequest.newBuilder()
Expand Down Expand Up @@ -346,8 +351,12 @@ public CompletableFuture<Res> getOutputAsync(RequestOptions options) {
handleNonSuccessResponse(response);
}

if (response.statusCode() == 470) {
return Output.notReady();
}

try {
return resSerde.deserialize(response.body());
return Output.ready(resSerde.deserialize(response.body()));
} catch (Exception e) {
throw new IngressException(
"Cannot deserialize the response",
Expand Down
Loading

0 comments on commit 52227b9

Please sign in to comment.