Skip to content

Commit

Permalink
Move all the encoding/decoding of messages within sdk-core (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Jun 4, 2024
1 parent 8788369 commit 4b2431d
Show file tree
Hide file tree
Showing 37 changed files with 735 additions and 855 deletions.
1 change: 1 addition & 0 deletions sdk-api-gen/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
testImplementation(platform(jacksonLibs.jackson.bom))
testImplementation(jacksonLibs.jackson.databind)
testImplementation(project(":sdk-serde-jackson"))
testImplementation("io.smallrye.reactive:mutiny:2.6.0")

// Import test suites from sdk-core
testImplementation(project(":sdk-core", "testArchive"))
Expand Down
1 change: 1 addition & 0 deletions sdk-api-kotlin-gen/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
testImplementation(coreLibs.log4j.core)
testImplementation(kotlinLibs.kotlinx.coroutines)
testImplementation(kotlinLibs.kotlinx.serialization.core)
testImplementation("io.smallrye.reactive:mutiny:2.6.0")

// Import test suites from sdk-core
testImplementation(project(":sdk-core", "testArchive"))
Expand Down
1 change: 1 addition & 0 deletions sdk-api-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
testImplementation(testingLibs.assertj)
testImplementation(coreLibs.log4j.core)
testImplementation(coreLibs.protobuf.java)
testImplementation("io.smallrye.reactive:mutiny:2.6.0")

testImplementation(project(":sdk-core", "testArchive"))
}
Expand Down
1 change: 1 addition & 0 deletions sdk-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
testImplementation(testingLibs.assertj)
testImplementation(coreLibs.protobuf.java)
testImplementation(coreLibs.log4j.core)
testImplementation("io.smallrye.reactive:mutiny:2.6.0")

// Import test suites from sdk-core
testImplementation(project(":sdk-core", "testArchive"))
Expand Down
1 change: 1 addition & 0 deletions sdk-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
implementation(coreLibs.opentelemetry.api)

testCompileOnly(coreLibs.jspecify)
testImplementation("io.smallrye.reactive:mutiny:2.6.0")
testImplementation(testingLibs.junit.jupiter)
testImplementation(testingLibs.assertj)
testImplementation(coreLibs.log4j.core)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@

import java.util.concurrent.Flow;

class ExceptionCatchingInvocationInputSubscriber
implements InvocationFlow.InvocationInputSubscriber {
class ExceptionCatchingSubscriber<T> implements Flow.Subscriber<T> {

InvocationFlow.InvocationInputSubscriber invocationInputSubscriber;
Flow.Subscriber<T> invocationInputSubscriber;

public ExceptionCatchingInvocationInputSubscriber(
InvocationFlow.InvocationInputSubscriber invocationInputSubscriber) {
public ExceptionCatchingSubscriber(Flow.Subscriber<T> invocationInputSubscriber) {
this.invocationInputSubscriber = invocationInputSubscriber;
}

Expand All @@ -31,9 +29,9 @@ public void onSubscribe(Flow.Subscription subscription) {
}

@Override
public void onNext(InvocationFlow.InvocationInput invocationInput) {
public void onNext(T t) {
try {
invocationInputSubscriber.onNext(invocationInput);
invocationInputSubscriber.onNext(t);
} catch (Throwable throwable) {
invocationInputSubscriber.onError(throwable);
throw throwable;
Expand Down
40 changes: 5 additions & 35 deletions sdk-core/src/main/java/dev/restate/sdk/core/InvocationFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,16 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.core;

import com.google.protobuf.MessageLite;
import java.nio.ByteBuffer;
import java.util.concurrent.Flow;

public interface InvocationFlow {

interface InvocationInput {
MessageHeader header();
interface InvocationInputPublisher extends Flow.Publisher<ByteBuffer> {}

MessageLite message();
interface InvocationOutputPublisher extends Flow.Publisher<ByteBuffer> {}

static InvocationInput of(MessageHeader header, MessageLite message) {
return new InvocationInput() {
@Override
public MessageHeader header() {
return header;
}
interface InvocationInputSubscriber extends Flow.Subscriber<ByteBuffer> {}

@Override
public MessageLite message() {
return message;
}

@Override
public String toString() {
return header.toString() + " " + message.toString();
}
};
}
}

interface InvocationInputPublisher extends Flow.Publisher<InvocationInput> {}

interface InvocationOutputPublisher extends Flow.Publisher<MessageLite> {}

interface InvocationInputSubscriber extends Flow.Subscriber<InvocationInput> {}

interface InvocationOutputSubscriber extends Flow.Subscriber<MessageLite> {}

interface InvocationProcessor
extends Flow.Processor<InvocationInput, MessageLite>,
InvocationInputSubscriber,
InvocationOutputPublisher {}
interface InvocationOutputSubscriber extends Flow.Subscriber<ByteBuffer> {}
}
36 changes: 36 additions & 0 deletions sdk-core/src/main/java/dev/restate/sdk/core/InvocationInput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.core;

import com.google.protobuf.MessageLite;

public interface InvocationInput {
MessageHeader header();

MessageLite message();

static InvocationInput of(MessageHeader header, MessageLite message) {
return new InvocationInput() {
@Override
public MessageHeader header() {
return header;
}

@Override
public MessageLite message() {
return message;
}

@Override
public String toString() {
return header.toString() + " " + message.toString();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

class InvocationStateMachine implements InvocationFlow.InvocationProcessor {
class InvocationStateMachine implements Flow.Processor<InvocationInput, MessageLite> {

private static final Logger LOG = LogManager.getLogger(InvocationStateMachine.class);

Expand Down Expand Up @@ -131,7 +131,7 @@ public void onSubscribe(Flow.Subscription subscription) {
}

@Override
public void onNext(InvocationFlow.InvocationInput invocationInput) {
public void onNext(InvocationInput invocationInput) {
MessageLite msg = invocationInput.message();
LOG.trace("Received input message {} {}", msg.getClass(), msg);
if (this.invocationState == InvocationState.WAITING_START) {
Expand Down
196 changes: 196 additions & 0 deletions sdk-core/src/main/java/dev/restate/sdk/core/MessageDecoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.core;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.UnsafeByteOperations;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Flow;

class MessageDecoder implements InvocationFlow.InvocationInputSubscriber {

private enum State {
WAITING_HEADER,
WAITING_PAYLOAD,
FAILED
}

private final Flow.Subscriber<InvocationInput> inner;
private Flow.Subscription inputSubscription;
private long invocationInputRequests = 0;

private final Queue<InvocationInput> parsedMessages;
private ByteString internalBuffer;

private State state;
private MessageHeader lastParsedMessageHeader;
private RuntimeException lastParsingFailure;

MessageDecoder(Flow.Subscriber<InvocationInput> inner) {
this.inner = inner;
this.parsedMessages = new ArrayDeque<>();
this.internalBuffer = ByteString.EMPTY;

this.state = State.WAITING_HEADER;
this.lastParsedMessageHeader = null;
this.lastParsingFailure = null;
}

// -- Subscriber methods

@Override
public void onSubscribe(Flow.Subscription byteBufferSubscription) {
this.inputSubscription = byteBufferSubscription;
this.inner.onSubscribe(
new Flow.Subscription() {
@Override
public void request(long n) {
// We ask for MAX VALUE, then we buffer in this class.
// This class could be implemented with more backpressure in mind, but for now this is
// fine.
byteBufferSubscription.request(Long.MAX_VALUE);
handleSubscriptionRequest(n);
}

@Override
public void cancel() {
byteBufferSubscription.cancel();
}
});
}

@Override
public void onNext(ByteBuffer item) {
this.offer(UnsafeByteOperations.unsafeWrap(item));
tryProgress();
}

@Override
public void onError(Throwable throwable) {
if (this.inputSubscription == null) {
return;
}
this.inner.onError(throwable);
}

@Override
public void onComplete() {
if (this.inputSubscription == null) {
return;
}
this.inner.onComplete();
}

private void handleSubscriptionRequest(long l) {
if (l == Long.MAX_VALUE) {
this.invocationInputRequests = l;
} else {
this.invocationInputRequests += l;
// Overflow check
if (this.invocationInputRequests < 0) {
this.invocationInputRequests = Long.MAX_VALUE;
}
}

tryProgress();
}

private void tryProgress() {
if (this.inputSubscription == null) {
return;
}
if (this.state == State.FAILED) {
this.inner.onError(lastParsingFailure);
this.inputSubscription.cancel();
this.inputSubscription = null;
}
while (this.invocationInputRequests > 0) {
InvocationInput input = this.parsedMessages.poll();
if (input == null) {
return;
}
this.invocationInputRequests--;
this.inner.onNext(input);
}
}

// -- Internal methods to handle decoding

private void offer(ByteString buffer) {
if (this.state != State.FAILED) {
this.internalBuffer = this.internalBuffer.concat(buffer);
this.tryConsumeInternalBuffer();
}
}

private void tryConsumeInternalBuffer() {
while (this.state != State.FAILED && this.internalBuffer.size() >= wantBytes()) {
if (state == State.WAITING_HEADER) {
try {
this.lastParsedMessageHeader = MessageHeader.parse(readLongAtBeginning());
this.state = State.WAITING_PAYLOAD;
this.sliceInternalBuffer(8);
} catch (RuntimeException e) {
this.lastParsingFailure = e;
this.state = State.FAILED;
}
} else {
try {
this.parsedMessages.offer(
InvocationInput.of(
this.lastParsedMessageHeader,
this.lastParsedMessageHeader
.getType()
.messageParser()
.parseFrom(
this.internalBuffer.substring(
0, this.lastParsedMessageHeader.getLength()))));
this.state = State.WAITING_HEADER;
this.sliceInternalBuffer(this.lastParsedMessageHeader.getLength());
} catch (InvalidProtocolBufferException e) {
this.lastParsingFailure = new RuntimeException("Cannot parse the protobuf message", e);
this.state = State.FAILED;
} catch (RuntimeException e) {
this.lastParsingFailure = e;
this.state = State.FAILED;
}
}
}
}

private int wantBytes() {
if (state == State.WAITING_HEADER) {
return 8;
} else {
return lastParsedMessageHeader.getLength();
}
}

private void sliceInternalBuffer(int substring) {
if (this.internalBuffer.size() == substring) {
this.internalBuffer = ByteString.EMPTY;
} else {
this.internalBuffer = this.internalBuffer.substring(substring);
}
}

private long readLongAtBeginning() {
return ((this.internalBuffer.byteAt(7) & 0xffL)
| ((this.internalBuffer.byteAt(6) & 0xffL) << 8)
| ((this.internalBuffer.byteAt(5) & 0xffL) << 16)
| ((this.internalBuffer.byteAt(4) & 0xffL) << 24)
| ((this.internalBuffer.byteAt(3) & 0xffL) << 32)
| ((this.internalBuffer.byteAt(2) & 0xffL) << 40)
| ((this.internalBuffer.byteAt(1) & 0xffL) << 48)
| ((this.internalBuffer.byteAt(0) & 0xffL) << 56));
}
}
Loading

0 comments on commit 4b2431d

Please sign in to comment.