Skip to content

Commit

Permalink
More implicitness in kotlin (#277)
Browse files Browse the repository at this point in the history
We had to rename ctx.run to ctx.runBlock in kotlin to avoid clashing with stdlib run extension method.
  • Loading branch information
slinkydeveloper committed Apr 9, 2024
1 parent f8244e2 commit cca0f96
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 40 deletions.
5 changes: 2 additions & 3 deletions examples/src/main/kotlin/my/restate/sdk/examples/CounterKt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ package my.restate.sdk.examples

import dev.restate.sdk.annotation.Handler
import dev.restate.sdk.annotation.VirtualObject
import dev.restate.sdk.common.StateKey
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder
import dev.restate.sdk.kotlin.KtSerdes
import dev.restate.sdk.kotlin.KtStateKey
import dev.restate.sdk.kotlin.ObjectContext
import kotlinx.serialization.Serializable
import org.apache.logging.log4j.LogManager
Expand All @@ -24,7 +23,7 @@ import org.apache.logging.log4j.Logger
class CounterKt {

companion object {
private val TOTAL = StateKey.of<Long>("total", KtSerdes.json())
private val TOTAL = KtStateKey.json<Long>("total")
private val LOG: Logger = LogManager.getLogger(CounterKt::class.java)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}
}

override suspend fun <T : Any?> run(serde: Serde<T>, sideEffectAction: suspend () -> T): T {
override suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteString>>
->
Expand Down Expand Up @@ -165,7 +165,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
var actionReturnValue: T? = null
var actionFailure: TerminalException? = null
try {
actionReturnValue = sideEffectAction()
actionReturnValue = block()
} catch (e: TerminalException) {
actionFailure = e
} catch (e: Error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,21 @@ package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.StateKey
import java.nio.charset.StandardCharsets
import kotlin.reflect.typeOf
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer

object KtStateKey {

/** Creates a json [StateKey]. */
inline fun <reified T> json(name: String): StateKey<T> {
return StateKey.of(name, KtSerdes.json())
}
}

object KtSerdes {

/** Creates a [Serde] implementation using the `kotlinx.serialization` json module. */
Expand Down
94 changes: 79 additions & 15 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ sealed interface Context {
* Errors occurring within this closure won't be propagated to the caller, unless they are
* [TerminalException]. Consider the following code:
* ```
* // Bad usage of try-catch outside the run
* // Bad usage of try-catch outside the runBlock
* try {
* ctx.run {
* ctx.runBlock {
* throw IllegalStateException();
* };
* } catch (e: IllegalStateException) {
Expand All @@ -125,9 +125,9 @@ sealed interface Context {
* // following the invocation retry policy.
* }
*
* // Good usage of try-catch outside the run
* // Good usage of try-catch outside the runBlock
* try {
* ctx.run {
* ctx.runBlock {
* throw TerminalException("my error");
* };
* } catch (e: TerminalException) {
Expand All @@ -138,16 +138,11 @@ sealed interface Context {
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
*
* @param serde the type tag of the return value, used to serialize/deserialize it.
* @param action closure to execute.
* @param block closure to execute.
* @param T type of the return value.
* @return value of the run operation.
* @return value of the runBlock operation.
*/
suspend fun <T : Any?> run(serde: Serde<T>, sideEffectAction: suspend () -> T): T

/** Like [run] without a return value. */
suspend fun run(sideEffectAction: suspend () -> Unit) {
run(KtSerdes.UNIT, sideEffectAction)
}
suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T

/**
* Create an [Awakeable], addressable through [Awakeable.id].
Expand Down Expand Up @@ -176,15 +171,74 @@ sealed interface Context {
*
* This instance is useful to generate identifiers, idempotency keys, and for uniform sampling
* from a set of options. If a cryptographically secure value is needed, please generate that
* externally using [run].
* externally using [runBlock].
*
* You MUST NOT use this [Random] instance inside a [run].
* You MUST NOT use this [Random] instance inside a [runBlock].
*
* @return the [Random] instance.
*/
fun random(): RestateRandom
}

/**
* Execute a non-deterministic closure, recording the result value in the journal using
* [KtSerdes.json]. The result value will be re-played in case of re-invocation (e.g. because of
* failure recovery or suspension point) without re-executing the closure. Use this feature if you
* want to perform <b>non-deterministic operations</b>.
*
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
* times until it records a result.
*
* <h2>Error handling</h2>
*
* Errors occurring within this closure won't be propagated to the caller, unless they are
* [TerminalException]. Consider the following code:
* ```
* // Bad usage of try-catch outside the runBlock
* try {
* ctx.runBlock {
* throw IllegalStateException();
* };
* } catch (e: IllegalStateException) {
* // This will never be executed,
* // but the error will be retried by Restate,
* // following the invocation retry policy.
* }
*
* // Good usage of try-catch outside the runBlock
* try {
* ctx.runBlock {
* throw TerminalException("my error");
* };
* } catch (e: TerminalException) {
* // This is invoked
* }
* ```
*
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
*
* @param block closure to execute.
* @param T type of the return value.
* @return value of the runBlock operation.
*/
suspend inline fun <reified T : Any> Context.runBlock(noinline block: suspend () -> T): T {
return this.runBlock(KtSerdes.json(), block)
}

/**
* Create an [Awakeable] using [KtSerdes.json] deserializer, addressable through [Awakeable.id].
*
* You can use this feature to implement external asynchronous systems interactions, for example you
* can send a Kafka record including the [Awakeable.id], and then let another service consume from
* Kafka the responses of given external system interaction by using [awakeableHandle].
*
* @return the [Awakeable] to await on.
* @see Awakeable
*/
suspend inline fun <reified T : Any> Context.awakeable(): Awakeable<T> {
return this.awakeable(KtSerdes.json())
}

/**
* This interface extends [Context] adding access to the virtual object instance key-value state
* storage.
Expand Down Expand Up @@ -233,7 +287,7 @@ class RestateRandom(seed: Long, private val syscalls: Syscalls) : Random() {
private val r = Random(seed)

override fun nextBits(bitCount: Int): Int {
check(!syscalls.isInsideSideEffect) { "You can't use RestateRandom inside ctx.run!" }
check(!syscalls.isInsideSideEffect) { "You can't use RestateRandom inside ctx.runBlock!" }
return r.nextBits(bitCount)
}

Expand Down Expand Up @@ -369,3 +423,13 @@ sealed interface AwakeableHandle {
*/
suspend fun reject(reason: String)
}

/**
* Complete with success the [Awakeable] using [KtSerdes.json] serializer.
*
* @param payload the result payload.
* @see Awakeable
*/
suspend inline fun <reified T : Any> AwakeableHandle.resolve(payload: T) {
return this.resolve(KtSerdes.json(), payload)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import dev.restate.sdk.common.CoreSerdes
import dev.restate.sdk.core.AwakeableIdTestSuite
import dev.restate.sdk.core.TestDefinitions
import dev.restate.sdk.kotlin.KotlinCoroutinesTests.Companion.testDefinitionForService
Expand All @@ -17,7 +16,7 @@ class AwakeableIdTest : AwakeableIdTestSuite() {

override fun returnAwakeableId(): TestDefinitions.TestInvocationBuilder =
testDefinitionForService("ReturnAwakeableId") { ctx, _: Unit ->
val awakeable = ctx.awakeable(CoreSerdes.JSON_STRING)
val awakeable: Awakeable<String> = ctx.awakeable()
awakeable.id
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RandomTest : RandomTestSuite() {

override fun randomInsideSideEffect(): TestInvocationBuilder =
testDefinitionForService<Unit, Int>("RandomInsideSideEffect") { ctx, _: Unit ->
ctx.run { ctx.random().nextInt() }
ctx.runBlock { ctx.random().nextInt() }
throw IllegalStateException("This should not unreachable")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import dev.restate.sdk.common.CoreSerdes
import dev.restate.sdk.core.ProtoUtils.GREETER_SERVICE_TARGET
import dev.restate.sdk.core.SideEffectTestSuite
import dev.restate.sdk.core.TestDefinitions
Expand All @@ -23,15 +22,14 @@ class SideEffectTest : SideEffectTestSuite() {

override fun sideEffect(sideEffectOutput: String): TestInvocationBuilder =
testDefinitionForService("SideEffect") { ctx, _: Unit ->
val result = ctx.run(CoreSerdes.JSON_STRING) { sideEffectOutput }
val result = ctx.runBlock { sideEffectOutput }
"Hello $result"
}

override fun consecutiveSideEffect(sideEffectOutput: String): TestInvocationBuilder =
testDefinitionForService("ConsecutiveSideEffect") { ctx, _: Unit ->
val firstResult = ctx.run(CoreSerdes.JSON_STRING) { sideEffectOutput }
val secondResult =
ctx.run(CoreSerdes.JSON_STRING) { firstResult.uppercase(Locale.getDefault()) }
val firstResult = ctx.runBlock { sideEffectOutput }
val secondResult = ctx.runBlock { firstResult.uppercase(Locale.getDefault()) }
"Hello $secondResult"
}

Expand All @@ -42,8 +40,7 @@ class SideEffectTest : SideEffectTestSuite() {
Component.Options(
Dispatchers.Unconfined + CoroutineName("CheckContextSwitchingTestCoroutine"))) {
handler("run") { ctx, _: Unit ->
val sideEffectCoroutine =
ctx.run(CoreSerdes.JSON_STRING) { coroutineContext[CoroutineName]!!.name }
val sideEffectCoroutine = ctx.runBlock { coroutineContext[CoroutineName]!!.name }
check(sideEffectCoroutine == "CheckContextSwitchingTestCoroutine") {
"Side effect thread is not running within the same coroutine context of the handler method: $sideEffectCoroutine"
}
Expand All @@ -54,7 +51,7 @@ class SideEffectTest : SideEffectTestSuite() {

override fun sideEffectGuard(): TestInvocationBuilder =
testDefinitionForService<Unit, String>("SideEffectGuard") { ctx, _: Unit ->
ctx.run { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
ctx.runBlock { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
throw IllegalStateException("This point should not be reached")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class StateMachineFailuresTest : StateMachineFailuresTestSuite() {

override fun sideEffectFailure(serde: Serde<Int>): TestInvocationBuilder =
testDefinitionForService("SideEffectFailure") { ctx, _: Unit ->
ctx.run(serde) { 0 }
ctx.runBlock(serde) { 0 }
"Francesco"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class UserFailuresTest : UserFailuresTestSuite() {
): TestInvocationBuilder =
testDefinitionForService<Unit, Unit>("SideEffectThrowIllegalStateException") { ctx, _: Unit ->
try {
ctx.run { throw IllegalStateException("Whatever") }
ctx.runBlock { throw IllegalStateException("Whatever") }
} catch (e: Throwable) {
if (e !is CancellationException && e !is TerminalException) {
nonTerminalExceptionsSeen.addAndGet(1)
Expand All @@ -44,7 +44,7 @@ class UserFailuresTest : UserFailuresTestSuite() {

override fun sideEffectThrowTerminalException(code: Int, message: String): TestInvocationBuilder =
testDefinitionForService<Unit, Unit>("SideEffectThrowTerminalException") { ctx, _: Unit ->
ctx.run { throw TerminalException(code, message) }
ctx.runBlock<Unit> { throw TerminalException(code, message) }
throw IllegalStateException("Not expected to reach this point")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.core;

import static dev.restate.sdk.core.AssertUtils.containsOnlyExactErrorMessage;
import static dev.restate.sdk.core.AssertUtils.*;
import static dev.restate.sdk.core.ProtoUtils.*;

import dev.restate.sdk.core.TestDefinitions.TestDefinition;
Expand Down Expand Up @@ -37,7 +37,7 @@ public Stream<TestDefinition> definitions() {
this.randomInsideSideEffect()
.withInput(startMessage(1).setDebugId(debugId), ProtoUtils.inputMessage())
.assertingOutput(
containsOnlyExactErrorMessage(
new IllegalStateException("You can't use RestateRandom inside ctx.run!"))));
containsOnly(
errorMessageStartingWith(IllegalStateException.class.getCanonicalName()))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import dev.restate.sdk.common.CoreSerdes
import dev.restate.sdk.core.ProtoUtils.*
import dev.restate.sdk.core.TestDefinitions
import dev.restate.sdk.core.TestDefinitions.testInvocation
import dev.restate.sdk.kotlin.Component
import dev.restate.sdk.kotlin.*
import io.vertx.core.Vertx
import java.util.stream.Stream
import kotlin.coroutines.coroutineContext
Expand All @@ -36,7 +36,7 @@ class VertxExecutorsTest : TestDefinitions.TestSuite {
LOG.info("I am on the thread I am before executing side effect")
check(Vertx.currentContext() == null)
check(coroutineContext[CoroutineName] == nonBlockingCoroutineName)
ctx.run {
ctx.runBlock {
LOG.info("I am on the thread I am when executing side effect")
check(coroutineContext[CoroutineName] == nonBlockingCoroutineName)
check(Vertx.currentContext() == null)
Expand Down Expand Up @@ -65,7 +65,7 @@ class VertxExecutorsTest : TestDefinitions.TestSuite {
return Stream.of(
testInvocation(
dev.restate.sdk.kotlin.Component.service(
"CheckBlockingComponentTrampolineExecutor",
"CheckNonBlockingComponentTrampolineExecutor",
Component.Options(Dispatchers.Default + nonBlockingCoroutineName)) {
handler("do") { ctx, _: Unit ->
checkNonBlockingComponentTrampolineExecutor(ctx)
Expand Down

0 comments on commit cca0f96

Please sign in to comment.