Skip to content

Commit

Permalink
Remove protobuf from the public api of sdk-common (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed May 21, 2024
1 parent 1a5817c commit cc52be1
Show file tree
Hide file tree
Showing 31 changed files with 270 additions and 199 deletions.
1 change: 1 addition & 0 deletions sdk-api-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
testImplementation(testingLibs.junit.jupiter)
testImplementation(testingLibs.assertj)
testImplementation(coreLibs.log4j.core)
testImplementation(coreLibs.protobuf.java)

testImplementation(project(":sdk-core", "testArchive"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.syscalls.Deferred
import dev.restate.sdk.common.syscalls.Result
import dev.restate.sdk.common.syscalls.Syscalls
import java.nio.ByteBuffer
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine

Expand Down Expand Up @@ -79,14 +79,14 @@ internal abstract class BaseSingleMappedAwaitableImpl<T : Any, U : Any>(
internal open class SingleSerdeAwaitableImpl<T : Any>
internal constructor(
syscalls: Syscalls,
deferred: Deferred<ByteString>,
deferred: Deferred<ByteBuffer>,
private val serde: Serde<T>,
) :
BaseSingleMappedAwaitableImpl<ByteString, T>(
BaseSingleMappedAwaitableImpl<ByteBuffer, T>(
SingleAwaitableImpl(syscalls, deferred),
) {
@Suppress("UNCHECKED_CAST")
override suspend fun map(res: Result<ByteString>): Result<T> {
override suspend fun map(res: Result<ByteBuffer>): Result<T> {
return if (res.isSuccess) {
// This propagates exceptions as non-terminal
Result.success(serde.deserializeWrappingException(syscalls, res.value!!))
Expand Down Expand Up @@ -151,7 +151,7 @@ internal fun wrapAnyAwaitable(awaitables: List<Awaitable<*>>): AnyAwaitable {
internal class AwakeableImpl<T : Any>
internal constructor(
syscalls: Syscalls,
deferred: Deferred<ByteString>,
deferred: Deferred<ByteBuffer>,
serde: Serde<T>,
override val id: String
) : SingleSerdeAwaitableImpl<T>(syscalls, deferred, serde), Awakeable<T> {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.*
import dev.restate.sdk.common.Target
import dev.restate.sdk.common.syscalls.Deferred
import dev.restate.sdk.common.syscalls.EnterSideEffectSyscallCallback
import dev.restate.sdk.common.syscalls.ExitSideEffectSyscallCallback
import dev.restate.sdk.common.syscalls.Syscalls
import java.nio.ByteBuffer
import kotlin.coroutines.resume
import kotlin.time.Duration
import kotlin.time.toJavaDuration
Expand All @@ -33,8 +33,8 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}

override suspend fun <T : Any> get(key: StateKey<T>): T? {
val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.get(key.name(), completingContinuation(cont))
}

Expand Down Expand Up @@ -109,8 +109,8 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
): Awaitable<R> {
val input = inputSerde.serializeWrappingException(syscalls, parameter)

val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.call(target, input, completingContinuation(cont))
}

Expand All @@ -136,19 +136,19 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
block: suspend () -> T
): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteString>>
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteBuffer>>
->
syscalls.enterSideEffectBlock(
name,
object : EnterSideEffectSyscallCallback {
override fun onSuccess(t: ByteString?) {
val deferred: CompletableDeferred<ByteString> = CompletableDeferred()
override fun onSuccess(t: ByteBuffer?) {
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.complete(t!!)
cont.resume(deferred)
}

override fun onFailure(t: TerminalException) {
val deferred: CompletableDeferred<ByteString> = CompletableDeferred()
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.completeExceptionally(t)
cont.resume(deferred)
}
Expand Down Expand Up @@ -182,7 +182,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)

val exitCallback =
object : ExitSideEffectSyscallCallback {
override fun onSuccess(t: ByteString?) {
override fun onSuccess(t: ByteBuffer?) {
exitResult.complete(t!!)
}

Expand All @@ -208,7 +208,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
override suspend fun <T : Any> awakeable(serde: Serde<T>): Awakeable<T> {
val (aid, deferredResult) =
suspendCancellableCoroutine {
cont: CancellableContinuation<Map.Entry<String, Deferred<ByteString>>> ->
cont: CancellableContinuation<Map.Entry<String, Deferred<ByteBuffer>>> ->
syscalls.awakeable(completingContinuation(cont))
}

Expand All @@ -234,17 +234,17 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
inner class DurablePromiseImpl<T : Any>(private val key: DurablePromiseKey<T>) :
DurablePromise<T> {
override suspend fun awaitable(): Awaitable<T> {
val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.promise(key.name(), completingContinuation(cont))
}

return SingleSerdeAwaitableImpl(syscalls, deferred, key.serde())
}

override suspend fun peek(): T? {
val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.peekPromise(key.name(), completingContinuation(cont))
}

Expand All @@ -265,8 +265,8 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}

override suspend fun isCompleted(): Boolean {
val deferred: Deferred<ByteString> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteString>> ->
val deferred: Deferred<ByteBuffer> =
suspendCancellableCoroutine { cont: CancellableContinuation<Deferred<ByteBuffer>> ->
syscalls.peekPromise(key.name(), completingContinuation(cont))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.TerminalException
import dev.restate.sdk.common.syscalls.HandlerSpecification
import dev.restate.sdk.common.syscalls.SyscallCallback
import dev.restate.sdk.common.syscalls.Syscalls
import io.opentelemetry.extension.kotlin.asContextElement
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -45,7 +45,7 @@ internal constructor(
handlerSpecification: HandlerSpecification<REQ, RES>,
syscalls: Syscalls,
options: Options?,
callback: SyscallCallback<ByteString>
callback: SyscallCallback<ByteBuffer>
) {
val ctx: Context = ContextImpl(syscalls)

Expand All @@ -57,7 +57,7 @@ internal constructor(
.asContextElement(syscalls) +
syscalls.request().otelContext()!!.asContextElement())
scope.launch {
val serializedResult: ByteString
val serializedResult: ByteBuffer

try {
// Parse input
Expand All @@ -77,7 +77,7 @@ internal constructor(

// Serialize output
try {
serializedResult = handlerSpecification.responseSerde.serializeToByteString(res)
serializedResult = handlerSpecification.responseSerde.serializeToByteBuffer(res)
} catch (e: Error) {
throw e
} catch (e: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.DurablePromiseKey
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.StateKey
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import kotlin.reflect.typeOf
import kotlinx.serialization.KSerializer
Expand Down Expand Up @@ -51,15 +51,15 @@ object KtSerdes {
return ByteArray(0)
}

override fun serializeToByteString(value: Unit?): ByteString {
return ByteString.EMPTY
override fun serializeToByteBuffer(value: Unit?): ByteBuffer {
return ByteBuffer.allocate(0)
}

override fun deserialize(value: ByteArray) {
return
}

override fun deserialize(byteString: ByteString) {
override fun deserialize(byteBuffer: ByteBuffer) {
return
}

Expand All @@ -71,12 +71,12 @@ object KtSerdes {
/** Creates a [Serde] implementation using the `kotlinx.serialization` json module. */
fun <T> json(serializer: KSerializer<T>): Serde<T> {
return object : Serde<T> {
override fun serialize(value: T?): ByteArray {
override fun serialize(value: T): ByteArray {
return Json.encodeToString(serializer, value!!).encodeToByteArray()
}

override fun deserialize(value: ByteArray?): T {
return Json.decodeFromString(serializer, String(value!!, StandardCharsets.UTF_8))
override fun deserialize(value: ByteArray): T {
return Json.decodeFromString(serializer, String(value, StandardCharsets.UTF_8))
}

override fun contentType(): String {
Expand Down
10 changes: 5 additions & 5 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import com.google.protobuf.ByteString
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.syscalls.SyscallCallback
import dev.restate.sdk.common.syscalls.Syscalls
import java.nio.ByteBuffer
import kotlin.coroutines.resume
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
Expand All @@ -32,9 +32,9 @@ internal fun completingUnitContinuation(
internal fun <T : Any?> Serde<T>.serializeWrappingException(
syscalls: Syscalls,
value: T?
): ByteString? {
): ByteBuffer {
return try {
this.serializeToByteString(value)
this.serializeToByteBuffer(value)
} catch (e: Exception) {
syscalls.fail(e)
throw CancellationException("Failed serialization", e)
Expand All @@ -43,10 +43,10 @@ internal fun <T : Any?> Serde<T>.serializeWrappingException(

internal fun <T : Any?> Serde<T>.deserializeWrappingException(
syscalls: Syscalls,
byteString: ByteString
ByteBuffer: ByteBuffer
): T {
return try {
this.deserialize(byteString)
this.deserialize(ByteBuffer)
} catch (e: Exception) {
syscalls.fail(e)
throw CancellationException("Failed deserialization", e)
Expand Down
6 changes: 3 additions & 3 deletions sdk-api/src/main/java/dev/restate/sdk/Awakeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk;

import com.google.protobuf.ByteString;
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.syscalls.Deferred;
import dev.restate.sdk.common.syscalls.Result;
import dev.restate.sdk.common.syscalls.Syscalls;
import java.nio.ByteBuffer;

/**
* An {@link Awakeable} is a special type of {@link Awaitable} which can be arbitrarily completed by
Expand All @@ -28,11 +28,11 @@
* <p>NOTE: This interface MUST NOT be accessed concurrently since it can lead to different
* orderings of user actions, corrupting the execution of the invocation.
*/
public final class Awakeable<T> extends Awaitable.MappedAwaitable<ByteString, T> {
public final class Awakeable<T> extends Awaitable.MappedAwaitable<ByteBuffer, T> {

private final String identifier;

Awakeable(Syscalls syscalls, Deferred<ByteString> deferred, Serde<T> serde, String identifier) {
Awakeable(Syscalls syscalls, Deferred<ByteBuffer> deferred, Serde<T> serde, String identifier) {
super(
Awaitable.single(syscalls, deferred),
res -> {
Expand Down
Loading

0 comments on commit cc52be1

Please sign in to comment.