Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More implicitness in kotlin #277

Merged
merged 1 commit into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions examples/src/main/kotlin/my/restate/sdk/examples/CounterKt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ package my.restate.sdk.examples

import dev.restate.sdk.annotation.Handler
import dev.restate.sdk.annotation.VirtualObject
import dev.restate.sdk.common.StateKey
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder
import dev.restate.sdk.kotlin.KtSerdes
import dev.restate.sdk.kotlin.KtStateKey
import dev.restate.sdk.kotlin.ObjectContext
import kotlinx.serialization.Serializable
import org.apache.logging.log4j.LogManager
Expand All @@ -24,7 +23,7 @@ import org.apache.logging.log4j.Logger
class CounterKt {

companion object {
private val TOTAL = StateKey.of<Long>("total", KtSerdes.json())
private val TOTAL = KtStateKey.json<Long>("total")
private val LOG: Logger = LogManager.getLogger(CounterKt::class.java)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}
}

override suspend fun <T : Any?> run(serde: Serde<T>, sideEffectAction: suspend () -> T): T {
override suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteString>>
->
Expand Down Expand Up @@ -165,7 +165,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
var actionReturnValue: T? = null
var actionFailure: TerminalException? = null
try {
actionReturnValue = sideEffectAction()
actionReturnValue = block()
} catch (e: TerminalException) {
actionFailure = e
} catch (e: Error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,21 @@ package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.StateKey
import java.nio.charset.StandardCharsets
import kotlin.reflect.typeOf
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer

object KtStateKey {

/** Creates a json [StateKey]. */
inline fun <reified T> json(name: String): StateKey<T> {
return StateKey.of(name, KtSerdes.json())
}
}

object KtSerdes {

/** Creates a [Serde] implementation using the `kotlinx.serialization` json module. */
Expand Down
94 changes: 79 additions & 15 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ sealed interface Context {
* Errors occurring within this closure won't be propagated to the caller, unless they are
* [TerminalException]. Consider the following code:
* ```
* // Bad usage of try-catch outside the run
* // Bad usage of try-catch outside the runBlock
* try {
* ctx.run {
* ctx.runBlock {
* throw IllegalStateException();
* };
* } catch (e: IllegalStateException) {
Expand All @@ -125,9 +125,9 @@ sealed interface Context {
* // following the invocation retry policy.
* }
*
* // Good usage of try-catch outside the run
* // Good usage of try-catch outside the runBlock
* try {
* ctx.run {
* ctx.runBlock {
* throw TerminalException("my error");
* };
* } catch (e: TerminalException) {
Expand All @@ -138,16 +138,11 @@ sealed interface Context {
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
*
* @param serde the type tag of the return value, used to serialize/deserialize it.
* @param action closure to execute.
* @param block closure to execute.
* @param T type of the return value.
* @return value of the run operation.
* @return value of the runBlock operation.
*/
suspend fun <T : Any?> run(serde: Serde<T>, sideEffectAction: suspend () -> T): T

/** Like [run] without a return value. */
suspend fun run(sideEffectAction: suspend () -> Unit) {
run(KtSerdes.UNIT, sideEffectAction)
}
suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T

/**
* Create an [Awakeable], addressable through [Awakeable.id].
Expand Down Expand Up @@ -176,15 +171,74 @@ sealed interface Context {
*
* This instance is useful to generate identifiers, idempotency keys, and for uniform sampling
* from a set of options. If a cryptographically secure value is needed, please generate that
* externally using [run].
* externally using [runBlock].
*
* You MUST NOT use this [Random] instance inside a [run].
* You MUST NOT use this [Random] instance inside a [runBlock].
*
* @return the [Random] instance.
*/
fun random(): RestateRandom
}

/**
* Execute a non-deterministic closure, recording the result value in the journal using
* [KtSerdes.json]. The result value will be re-played in case of re-invocation (e.g. because of
* failure recovery or suspension point) without re-executing the closure. Use this feature if you
* want to perform <b>non-deterministic operations</b>.
*
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
* times until it records a result.
*
* <h2>Error handling</h2>
*
* Errors occurring within this closure won't be propagated to the caller, unless they are
* [TerminalException]. Consider the following code:
* ```
* // Bad usage of try-catch outside the runBlock
* try {
* ctx.runBlock {
* throw IllegalStateException();
* };
* } catch (e: IllegalStateException) {
* // This will never be executed,
* // but the error will be retried by Restate,
* // following the invocation retry policy.
* }
*
* // Good usage of try-catch outside the runBlock
* try {
* ctx.runBlock {
* throw TerminalException("my error");
* };
* } catch (e: TerminalException) {
* // This is invoked
* }
* ```
*
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
*
* @param block closure to execute.
* @param T type of the return value.
* @return value of the runBlock operation.
*/
suspend inline fun <reified T : Any> Context.runBlock(noinline block: suspend () -> T): T {
return this.runBlock(KtSerdes.json(), block)
}

/**
* Create an [Awakeable] using [KtSerdes.json] deserializer, addressable through [Awakeable.id].
*
* You can use this feature to implement external asynchronous systems interactions, for example you
* can send a Kafka record including the [Awakeable.id], and then let another service consume from
* Kafka the responses of given external system interaction by using [awakeableHandle].
*
* @return the [Awakeable] to await on.
* @see Awakeable
*/
suspend inline fun <reified T : Any> Context.awakeable(): Awakeable<T> {
return this.awakeable(KtSerdes.json())
}

/**
* This interface extends [Context] adding access to the virtual object instance key-value state
* storage.
Expand Down Expand Up @@ -233,7 +287,7 @@ class RestateRandom(seed: Long, private val syscalls: Syscalls) : Random() {
private val r = Random(seed)

override fun nextBits(bitCount: Int): Int {
check(!syscalls.isInsideSideEffect) { "You can't use RestateRandom inside ctx.run!" }
check(!syscalls.isInsideSideEffect) { "You can't use RestateRandom inside ctx.runBlock!" }
return r.nextBits(bitCount)
}

Expand Down Expand Up @@ -369,3 +423,13 @@ sealed interface AwakeableHandle {
*/
suspend fun reject(reason: String)
}

/**
* Complete with success the [Awakeable] using [KtSerdes.json] serializer.
*
* @param payload the result payload.
* @see Awakeable
*/
suspend inline fun <reified T : Any> AwakeableHandle.resolve(payload: T) {
return this.resolve(KtSerdes.json(), payload)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import dev.restate.sdk.common.CoreSerdes
import dev.restate.sdk.core.AwakeableIdTestSuite
import dev.restate.sdk.core.TestDefinitions
import dev.restate.sdk.kotlin.KotlinCoroutinesTests.Companion.testDefinitionForService
Expand All @@ -17,7 +16,7 @@ class AwakeableIdTest : AwakeableIdTestSuite() {

override fun returnAwakeableId(): TestDefinitions.TestInvocationBuilder =
testDefinitionForService("ReturnAwakeableId") { ctx, _: Unit ->
val awakeable = ctx.awakeable(CoreSerdes.JSON_STRING)
val awakeable: Awakeable<String> = ctx.awakeable()
awakeable.id
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RandomTest : RandomTestSuite() {

override fun randomInsideSideEffect(): TestInvocationBuilder =
testDefinitionForService<Unit, Int>("RandomInsideSideEffect") { ctx, _: Unit ->
ctx.run { ctx.random().nextInt() }
ctx.runBlock { ctx.random().nextInt() }
throw IllegalStateException("This should not unreachable")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import dev.restate.sdk.common.CoreSerdes
import dev.restate.sdk.core.ProtoUtils.GREETER_SERVICE_TARGET
import dev.restate.sdk.core.SideEffectTestSuite
import dev.restate.sdk.core.TestDefinitions
Expand All @@ -23,15 +22,14 @@ class SideEffectTest : SideEffectTestSuite() {

override fun sideEffect(sideEffectOutput: String): TestInvocationBuilder =
testDefinitionForService("SideEffect") { ctx, _: Unit ->
val result = ctx.run(CoreSerdes.JSON_STRING) { sideEffectOutput }
val result = ctx.runBlock { sideEffectOutput }
"Hello $result"
}

override fun consecutiveSideEffect(sideEffectOutput: String): TestInvocationBuilder =
testDefinitionForService("ConsecutiveSideEffect") { ctx, _: Unit ->
val firstResult = ctx.run(CoreSerdes.JSON_STRING) { sideEffectOutput }
val secondResult =
ctx.run(CoreSerdes.JSON_STRING) { firstResult.uppercase(Locale.getDefault()) }
val firstResult = ctx.runBlock { sideEffectOutput }
val secondResult = ctx.runBlock { firstResult.uppercase(Locale.getDefault()) }
"Hello $secondResult"
}

Expand All @@ -42,8 +40,7 @@ class SideEffectTest : SideEffectTestSuite() {
Component.Options(
Dispatchers.Unconfined + CoroutineName("CheckContextSwitchingTestCoroutine"))) {
handler("run") { ctx, _: Unit ->
val sideEffectCoroutine =
ctx.run(CoreSerdes.JSON_STRING) { coroutineContext[CoroutineName]!!.name }
val sideEffectCoroutine = ctx.runBlock { coroutineContext[CoroutineName]!!.name }
check(sideEffectCoroutine == "CheckContextSwitchingTestCoroutine") {
"Side effect thread is not running within the same coroutine context of the handler method: $sideEffectCoroutine"
}
Expand All @@ -54,7 +51,7 @@ class SideEffectTest : SideEffectTestSuite() {

override fun sideEffectGuard(): TestInvocationBuilder =
testDefinitionForService<Unit, String>("SideEffectGuard") { ctx, _: Unit ->
ctx.run { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
ctx.runBlock { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
throw IllegalStateException("This point should not be reached")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class StateMachineFailuresTest : StateMachineFailuresTestSuite() {

override fun sideEffectFailure(serde: Serde<Int>): TestInvocationBuilder =
testDefinitionForService("SideEffectFailure") { ctx, _: Unit ->
ctx.run(serde) { 0 }
ctx.runBlock(serde) { 0 }
"Francesco"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class UserFailuresTest : UserFailuresTestSuite() {
): TestInvocationBuilder =
testDefinitionForService<Unit, Unit>("SideEffectThrowIllegalStateException") { ctx, _: Unit ->
try {
ctx.run { throw IllegalStateException("Whatever") }
ctx.runBlock { throw IllegalStateException("Whatever") }
} catch (e: Throwable) {
if (e !is CancellationException && e !is TerminalException) {
nonTerminalExceptionsSeen.addAndGet(1)
Expand All @@ -44,7 +44,7 @@ class UserFailuresTest : UserFailuresTestSuite() {

override fun sideEffectThrowTerminalException(code: Int, message: String): TestInvocationBuilder =
testDefinitionForService<Unit, Unit>("SideEffectThrowTerminalException") { ctx, _: Unit ->
ctx.run { throw TerminalException(code, message) }
ctx.runBlock<Unit> { throw TerminalException(code, message) }
throw IllegalStateException("Not expected to reach this point")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.core;

import static dev.restate.sdk.core.AssertUtils.containsOnlyExactErrorMessage;
import static dev.restate.sdk.core.AssertUtils.*;
import static dev.restate.sdk.core.ProtoUtils.*;

import dev.restate.sdk.core.TestDefinitions.TestDefinition;
Expand Down Expand Up @@ -37,7 +37,7 @@ public Stream<TestDefinition> definitions() {
this.randomInsideSideEffect()
.withInput(startMessage(1).setDebugId(debugId), ProtoUtils.inputMessage())
.assertingOutput(
containsOnlyExactErrorMessage(
new IllegalStateException("You can't use RestateRandom inside ctx.run!"))));
containsOnly(
errorMessageStartingWith(IllegalStateException.class.getCanonicalName()))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import dev.restate.sdk.common.CoreSerdes
import dev.restate.sdk.core.ProtoUtils.*
import dev.restate.sdk.core.TestDefinitions
import dev.restate.sdk.core.TestDefinitions.testInvocation
import dev.restate.sdk.kotlin.Component
import dev.restate.sdk.kotlin.*
import io.vertx.core.Vertx
import java.util.stream.Stream
import kotlin.coroutines.coroutineContext
Expand All @@ -36,7 +36,7 @@ class VertxExecutorsTest : TestDefinitions.TestSuite {
LOG.info("I am on the thread I am before executing side effect")
check(Vertx.currentContext() == null)
check(coroutineContext[CoroutineName] == nonBlockingCoroutineName)
ctx.run {
ctx.runBlock {
LOG.info("I am on the thread I am when executing side effect")
check(coroutineContext[CoroutineName] == nonBlockingCoroutineName)
check(Vertx.currentContext() == null)
Expand Down Expand Up @@ -65,7 +65,7 @@ class VertxExecutorsTest : TestDefinitions.TestSuite {
return Stream.of(
testInvocation(
dev.restate.sdk.kotlin.Component.service(
"CheckBlockingComponentTrampolineExecutor",
"CheckNonBlockingComponentTrampolineExecutor",
Component.Options(Dispatchers.Default + nonBlockingCoroutineName)) {
handler("do") { ctx, _: Unit ->
checkNonBlockingComponentTrampolineExecutor(ctx)
Expand Down
Loading