Skip to content

Commit

Permalink
Updates with renamings
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Apr 22, 2024
1 parent 4de7bfe commit bf3db07
Show file tree
Hide file tree
Showing 19 changed files with 109 additions and 115 deletions.
24 changes: 12 additions & 12 deletions sdk-core/src/main/java/dev/restate/sdk/core/DeploymentManifest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import dev.restate.sdk.common.HandlerType;
import dev.restate.sdk.common.ServiceType;
import dev.restate.sdk.common.syscalls.ServiceDefinition;
import dev.restate.sdk.core.manifest.Component;
import dev.restate.sdk.core.manifest.DeploymentManifestSchema;
import dev.restate.sdk.core.manifest.Handler;
import dev.restate.sdk.core.manifest.Service;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -28,19 +28,19 @@ public DeploymentManifest(
.withMinProtocolVersion(1)
.withMaxProtocolVersion(1)
.withProtocolMode(protocolMode)
.withComponents(
.withServices(
components
.map(
svc ->
new Component()
.withFullyQualifiedComponentName(svc.getServiceName())
.withComponentType(convertServiceType(svc.getServiceType()))
new Service()
.withName(svc.getServiceName())
.withTy(convertServiceType(svc.getServiceType()))
.withHandlers(
svc.getHandlers().stream()
.map(
method ->
new Handler()
.withHandlerType(
.withTy(
convertHandlerType(method.getHandlerType()))
.withName(method.getName()))
.collect(Collectors.toList())))
Expand All @@ -51,23 +51,23 @@ public DeploymentManifestSchema manifest() {
return this.manifest;
}

private static Component.ComponentType convertServiceType(ServiceType serviceType) {
private static Service.Ty convertServiceType(ServiceType serviceType) {
switch (serviceType) {
case WORKFLOW:
case SERVICE:
return Component.ComponentType.SERVICE;
return Service.Ty.SERVICE;
case VIRTUAL_OBJECT:
return Component.ComponentType.VIRTUAL_OBJECT;
return Service.Ty.VIRTUAL_OBJECT;
}
throw new IllegalStateException();
}

private static Handler.HandlerType convertHandlerType(HandlerType handlerType) {
private static Handler.Ty convertHandlerType(HandlerType handlerType) {
switch (handlerType) {
case EXCLUSIVE:
return Handler.HandlerType.EXCLUSIVE;
return Handler.Ty.EXCLUSIVE;
case SHARED:
return Handler.HandlerType.SHARED;
return Handler.Ty.SHARED;
}
throw new IllegalStateException();
}
Expand Down
33 changes: 16 additions & 17 deletions sdk-core/src/main/java/dev/restate/sdk/core/Entries.java
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public Result<Void> parseCompletionResult(CompletionMessage actual) {
}
}

static final class InvokeEntry<R> extends CompletableJournalEntry<InvokeEntryMessage, R> {
static final class InvokeEntry<R> extends CompletableJournalEntry<CallEntryMessage, R> {

private final Function<ByteString, Result<R>> valueParser;

Expand All @@ -384,43 +384,42 @@ static final class InvokeEntry<R> extends CompletableJournalEntry<InvokeEntryMes
}

@Override
void trace(InvokeEntryMessage expected, Span span) {
void trace(CallEntryMessage expected, Span span) {
span.addEvent(
"Invoke",
Attributes.of(
Tracing.RESTATE_COORDINATION_CALL_SERVICE,
expected.getServiceName(),
Tracing.RESTATE_COORDINATION_CALL_METHOD,
expected.getMethodName()));
expected.getHandlerName()));
}

@Override
public boolean hasResult(InvokeEntryMessage actual) {
return actual.getResultCase() != Protocol.InvokeEntryMessage.ResultCase.RESULT_NOT_SET;
public boolean hasResult(CallEntryMessage actual) {
return actual.getResultCase() != Protocol.CallEntryMessage.ResultCase.RESULT_NOT_SET;
}

@Override
String getName(InvokeEntryMessage expected) {
String getName(CallEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(InvokeEntryMessage expected, MessageLite actual)
throws ProtocolException {
if (!(actual instanceof InvokeEntryMessage)) {
void checkEntryHeader(CallEntryMessage expected, MessageLite actual) throws ProtocolException {
if (!(actual instanceof CallEntryMessage)) {
throw ProtocolException.entryDoesNotMatch(expected, actual);
}
InvokeEntryMessage actualInvoke = (InvokeEntryMessage) actual;
CallEntryMessage actualInvoke = (CallEntryMessage) actual;

if (!(expected.getServiceName().equals(actualInvoke.getServiceName())
&& expected.getMethodName().equals(actualInvoke.getMethodName())
&& expected.getHandlerName().equals(actualInvoke.getHandlerName())
&& expected.getParameter().equals(actualInvoke.getParameter()))) {
throw ProtocolException.entryDoesNotMatch(expected, actualInvoke);
}
}

@Override
public Result<R> parseEntryResult(InvokeEntryMessage actual) {
public Result<R> parseEntryResult(CallEntryMessage actual) {
if (actual.hasValue()) {
return valueParser.apply(actual.getValue());
}
Expand All @@ -439,30 +438,30 @@ public Result<R> parseCompletionResult(CompletionMessage actual) {
}
}

static final class BackgroundInvokeEntry extends JournalEntry<BackgroundInvokeEntryMessage> {
static final class BackgroundInvokeEntry extends JournalEntry<OneWayCallEntryMessage> {

static final BackgroundInvokeEntry INSTANCE = new BackgroundInvokeEntry();

private BackgroundInvokeEntry() {}

@Override
public void trace(BackgroundInvokeEntryMessage expected, Span span) {
public void trace(OneWayCallEntryMessage expected, Span span) {
span.addEvent(
"BackgroundInvoke",
Attributes.of(
Tracing.RESTATE_COORDINATION_CALL_SERVICE,
expected.getServiceName(),
Tracing.RESTATE_COORDINATION_CALL_METHOD,
expected.getMethodName()));
expected.getHandlerName()));
}

@Override
String getName(BackgroundInvokeEntryMessage expected) {
String getName(OneWayCallEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(BackgroundInvokeEntryMessage expected, MessageLite actual)
void checkEntryHeader(OneWayCallEntryMessage expected, MessageLite actual)
throws ProtocolException {
Util.assertEntryEquals(expected, actual);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,18 +404,18 @@ <E extends MessageLite> void processJournalEntry(

void enterSideEffectBlock(String name, EnterSideEffectSyscallCallback callback) {
checkInsideSideEffectGuard();
this.nextJournalEntry(name, MessageType.SideEffectEntryMessage);
this.nextJournalEntry(name, MessageType.RunEntryMessage);

if (this.invocationState == InvocationState.CLOSED) {
callback.onCancel(AbortedExecutionException.INSTANCE);
} else if (this.invocationState == InvocationState.REPLAYING) {
// Retrieve the entry
this.readEntry(
msg -> {
Util.assertEntryClass(Protocol.SideEffectEntryMessage.class, msg);
Util.assertEntryClass(Protocol.RunEntryMessage.class, msg);

// We have a result already, complete the callback
completeSideEffectCallbackWithEntry((Protocol.SideEffectEntryMessage) msg, callback);
completeSideEffectCallbackWithEntry((Protocol.RunEntryMessage) msg, callback);
},
callback::onCancel);
} else if (this.invocationState == InvocationState.PROCESSING) {
Expand All @@ -431,7 +431,7 @@ void enterSideEffectBlock(String name, EnterSideEffectSyscallCallback callback)
}

void exitSideEffectBlock(
Protocol.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
Protocol.RunEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
this.insideSideEffect = false;
if (this.invocationState == InvocationState.CLOSED) {
callback.onCancel(AbortedExecutionException.INSTANCE);
Expand All @@ -454,7 +454,7 @@ void exitSideEffectBlock(
this.writeEntry(sideEffectEntry);

// Wait for entry to be acked
Protocol.SideEffectEntryMessage finalSideEffectEntry = sideEffectEntry;
Protocol.RunEntryMessage finalSideEffectEntry = sideEffectEntry;
this.sideEffectAckStateMachine.waitLastSideEffectAck(
new SideEffectAckStateMachine.SideEffectAckCallback() {
@Override
Expand All @@ -480,7 +480,7 @@ public void onError(Throwable e) {
}

void completeSideEffectCallbackWithEntry(
Protocol.SideEffectEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
Protocol.RunEntryMessage sideEffectEntry, ExitSideEffectSyscallCallback callback) {
if (sideEffectEntry.hasFailure()) {
callback.onFailure(Util.toRestateException(sideEffectEntry.getFailure()));
} else {
Expand Down
12 changes: 6 additions & 6 deletions sdk-core/src/main/java/dev/restate/sdk/core/MessageHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ public static MessageHeader fromMessage(MessageLite msg) {
? DONE_FLAG
: 0,
msg.getSerializedSize());
} else if (msg instanceof Protocol.InvokeEntryMessage) {
} else if (msg instanceof Protocol.CallEntryMessage) {
return new MessageHeader(
MessageType.InvokeEntryMessage,
((Protocol.InvokeEntryMessage) msg).getResultCase()
!= Protocol.InvokeEntryMessage.ResultCase.RESULT_NOT_SET
MessageType.CallEntryMessage,
((Protocol.CallEntryMessage) msg).getResultCase()
!= Protocol.CallEntryMessage.ResultCase.RESULT_NOT_SET
? DONE_FLAG
: 0,
msg.getSerializedSize());
Expand All @@ -94,9 +94,9 @@ public static MessageHeader fromMessage(MessageLite msg) {
? DONE_FLAG
: 0,
msg.getSerializedSize());
} else if (msg instanceof Protocol.SideEffectEntryMessage) {
} else if (msg instanceof Protocol.RunEntryMessage) {
return new MessageHeader(
MessageType.SideEffectEntryMessage, REQUIRES_ACK_FLAG, msg.getSerializedSize());
MessageType.RunEntryMessage, REQUIRES_ACK_FLAG, msg.getSerializedSize());
}
// Messages with no flags
return new MessageHeader(MessageType.fromMessage(msg), 0, msg.getSerializedSize());
Expand Down
42 changes: 21 additions & 21 deletions sdk-core/src/main/java/dev/restate/sdk/core/MessageType.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public enum MessageType {

// Syscalls
SleepEntryMessage,
InvokeEntryMessage,
BackgroundInvokeEntryMessage,
CallEntryMessage,
OneWayCallEntryMessage,
AwakeableEntryMessage,
CompleteAwakeableEntryMessage,
SideEffectEntryMessage,
RunEntryMessage,

// SDK specific
CombinatorAwaitableEntryMessage;
Expand Down Expand Up @@ -94,18 +94,18 @@ public Parser<? extends MessageLite> messageParser() {
return Protocol.GetStateKeysEntryMessage.parser();
case SleepEntryMessage:
return Protocol.SleepEntryMessage.parser();
case InvokeEntryMessage:
return Protocol.InvokeEntryMessage.parser();
case BackgroundInvokeEntryMessage:
return Protocol.BackgroundInvokeEntryMessage.parser();
case CallEntryMessage:
return Protocol.CallEntryMessage.parser();
case OneWayCallEntryMessage:
return Protocol.OneWayCallEntryMessage.parser();
case AwakeableEntryMessage:
return Protocol.AwakeableEntryMessage.parser();
case CompleteAwakeableEntryMessage:
return Protocol.CompleteAwakeableEntryMessage.parser();
case CombinatorAwaitableEntryMessage:
return Java.CombinatorAwaitableEntryMessage.parser();
case SideEffectEntryMessage:
return Protocol.SideEffectEntryMessage.parser();
case RunEntryMessage:
return Protocol.RunEntryMessage.parser();
}
throw new IllegalStateException();
}
Expand Down Expand Up @@ -140,17 +140,17 @@ public short encode() {
return GET_STATE_KEYS_ENTRY_MESSAGE_TYPE;
case SleepEntryMessage:
return SLEEP_ENTRY_MESSAGE_TYPE;
case InvokeEntryMessage:
case CallEntryMessage:
return INVOKE_ENTRY_MESSAGE_TYPE;
case BackgroundInvokeEntryMessage:
case OneWayCallEntryMessage:
return BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE;
case AwakeableEntryMessage:
return AWAKEABLE_ENTRY_MESSAGE_TYPE;
case CompleteAwakeableEntryMessage:
return COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE;
case CombinatorAwaitableEntryMessage:
return COMBINATOR_AWAITABLE_ENTRY_MESSAGE_TYPE;
case SideEffectEntryMessage:
case RunEntryMessage:
return SIDE_EFFECT_ENTRY_MESSAGE_TYPE;
}
throw new IllegalStateException();
Expand Down Expand Up @@ -187,17 +187,17 @@ public static MessageType decode(short value) throws ProtocolException {
case SLEEP_ENTRY_MESSAGE_TYPE:
return SleepEntryMessage;
case INVOKE_ENTRY_MESSAGE_TYPE:
return InvokeEntryMessage;
return CallEntryMessage;
case BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE:
return BackgroundInvokeEntryMessage;
return OneWayCallEntryMessage;
case AWAKEABLE_ENTRY_MESSAGE_TYPE:
return AwakeableEntryMessage;
case COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE:
return CompleteAwakeableEntryMessage;
case COMBINATOR_AWAITABLE_ENTRY_MESSAGE_TYPE:
return CombinatorAwaitableEntryMessage;
case SIDE_EFFECT_ENTRY_MESSAGE_TYPE:
return SideEffectEntryMessage;
return RunEntryMessage;
}
throw ProtocolException.unknownMessageType(value);
}
Expand Down Expand Up @@ -227,18 +227,18 @@ public static MessageType fromMessage(MessageLite msg) {
return MessageType.GetStateKeysEntryMessage;
} else if (msg instanceof Protocol.SleepEntryMessage) {
return MessageType.SleepEntryMessage;
} else if (msg instanceof Protocol.InvokeEntryMessage) {
return MessageType.InvokeEntryMessage;
} else if (msg instanceof Protocol.BackgroundInvokeEntryMessage) {
return MessageType.BackgroundInvokeEntryMessage;
} else if (msg instanceof Protocol.CallEntryMessage) {
return MessageType.CallEntryMessage;
} else if (msg instanceof Protocol.OneWayCallEntryMessage) {
return MessageType.OneWayCallEntryMessage;
} else if (msg instanceof Protocol.AwakeableEntryMessage) {
return MessageType.AwakeableEntryMessage;
} else if (msg instanceof Protocol.CompleteAwakeableEntryMessage) {
return MessageType.CompleteAwakeableEntryMessage;
} else if (msg instanceof Java.CombinatorAwaitableEntryMessage) {
return MessageType.CombinatorAwaitableEntryMessage;
} else if (msg instanceof Protocol.SideEffectEntryMessage) {
return MessageType.SideEffectEntryMessage;
} else if (msg instanceof Protocol.RunEntryMessage) {
return MessageType.RunEntryMessage;
} else if (msg instanceof Protocol.CompletionMessage) {
throw new IllegalArgumentException("SDK should never send a CompletionMessage");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import dev.restate.sdk.common.BindableServiceFactory;
import dev.restate.sdk.common.syscalls.HandlerDefinition;
import dev.restate.sdk.common.syscalls.ServiceDefinition;
import dev.restate.sdk.core.manifest.Component;
import dev.restate.sdk.core.manifest.DeploymentManifestSchema;
import dev.restate.sdk.core.manifest.Service;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
Expand Down Expand Up @@ -94,9 +94,7 @@ public DeploymentManifestSchema handleDiscoveryRequest() {
DeploymentManifestSchema response = this.deploymentManifest.manifest();
LOG.info(
"Replying to discovery request with services [{}]",
response.getComponents().stream()
.map(Component::getFullyQualifiedComponentName)
.collect(Collectors.joining(",")));
response.getServices().stream().map(Service::getName).collect(Collectors.joining(",")));
return response;
}

Expand Down
Loading

0 comments on commit bf3db07

Please sign in to comment.