-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce invoker::message module #80
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @slinkydeveloper. The additions make sense to me. The one question I do have is whether we want to replace the macro for generating the MessageTypes
and their implementations with something simpler and more explicit. I believe that this would help long term maintainability.
src/invoker/src/message/encoding.rs
Outdated
fn generate_header(msg: &ProtocolMessage, protocol_version: u16) -> MessageHeader { | ||
match msg { | ||
ProtocolMessage::Start(m) => { | ||
MessageHeader::new_start(protocol_version, m.encoded_len().try_into().unwrap()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it ok to unwrap
here? Maybe use expect
and state why it is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually an interesting point. Right now the protocol assumes max length of u32 for messages (which if i'm not mistaken is what the protobuf java apis impose us). I guess this is fine. Perhaps should we check this somewhere?
src/invoker/src/message/header.rs
Outdated
// This macro generates: | ||
// * MessageType enum | ||
// * MessageType#kind(&self) -> MessageKind | ||
// * From<MessageType> for MessageTypeId (used for serializing message header) | ||
// * TryFrom<MessageTypeId> for MessageType (used for deserializing message header) | ||
// * From<EntryType> for MessageType (used to convert from journal entry to message header) | ||
// * TryFrom<MessageType> for EntryType (used to convert from message header to journal entry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the docs could benefit from an expanded example (end result of the macro).
src/invoker/src/message/header.rs
Outdated
(@gen_kind_impl [] -> [$($variant:ident, $kind:ident,)*]) => { | ||
impl MessageType { | ||
pub fn kind(&self) -> MessageKind { | ||
match self { | ||
$(MessageType::$variant => MessageKind::$kind,)* | ||
MessageType::Unknown(_) => MessageKind::Unknown | ||
} | ||
} | ||
} | ||
}; | ||
(@gen_kind_impl [$variant:ident Entry $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
paste::paste! { gen_message_type_enum!(@gen_kind_impl [$($tail)*] -> [[<$variant Entry>], $kind, $($body)*]); } | ||
}; | ||
(@gen_kind_impl [$variant:ident $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
gen_message_type_enum!(@gen_kind_impl [$($tail)*] -> [$variant, $kind, $($body)*]); | ||
}; | ||
|
||
(@gen_to_id [] -> [$($variant:ident, $id:literal,)*]) => { | ||
impl From<MessageType> for MessageTypeId { | ||
fn from(mt: MessageType) -> Self { | ||
match mt { | ||
$(MessageType::$variant => $id,)* | ||
MessageType::Unknown(id) => id | ||
} | ||
} | ||
} | ||
}; | ||
(@gen_to_id [$variant:ident Entry $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
paste::paste! { gen_message_type_enum!(@gen_to_id [$($tail)*] -> [[<$variant Entry>], $id, $($body)*]); } | ||
}; | ||
(@gen_to_id [$variant:ident $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
gen_message_type_enum!(@gen_to_id [$($tail)*] -> [$variant, $id, $($body)*]); | ||
}; | ||
|
||
(@gen_from_id [] -> [$($variant:ident, $id:literal,)*]) => { | ||
impl TryFrom<MessageTypeId> for MessageType { | ||
type Error = UnknownMessageType; | ||
|
||
fn try_from(value: MessageTypeId) -> Result<Self, Self::Error> { | ||
match value { | ||
$($id => Ok(MessageType::$variant),)* | ||
v if ((v & UNKNOWN_MESSAGE_MASK) != 0) => Ok(MessageType::Unknown(v)), | ||
v => Err(UnknownMessageType(v)) | ||
} | ||
} | ||
} | ||
}; | ||
(@gen_from_id [$variant:ident Entry $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
paste::paste! { gen_message_type_enum!(@gen_from_id [$($tail)*] -> [[<$variant Entry>], $id, $($body)*]); } | ||
}; | ||
(@gen_from_id [$variant:ident $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
gen_message_type_enum!(@gen_from_id [$($tail)*] -> [$variant, $id, $($body)*]); | ||
}; | ||
|
||
(@gen_to_entry_type [] -> [$($variant:ident, $res:expr,)*]) => { | ||
impl TryFrom<MessageType> for EntryType { | ||
type Error = &'static str; | ||
|
||
fn try_from(mt: MessageType) -> Result<Self, Self::Error> { | ||
match mt { | ||
$(MessageType::$variant => $res,)* | ||
MessageType::Unknown(_) => Err("Unknown is not an entry message") | ||
} | ||
} | ||
} | ||
}; | ||
(@gen_to_entry_type [$variant:ident Entry $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
paste::paste! { gen_message_type_enum!(@gen_to_entry_type [$($tail)*] -> [[<$variant Entry>], Ok(EntryType::$variant), $($body)*]); } | ||
}; | ||
(@gen_to_entry_type [$variant:ident $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
gen_message_type_enum!(@gen_to_entry_type [$($tail)*] -> [$variant, Err(concat!(stringify!($variant), " is not an entry message")), $($body)*]); | ||
}; | ||
|
||
(@gen_from_entry_type [] -> [$($variant:ident,)*]) => { | ||
impl From<EntryType> for MessageType { | ||
fn from(et: EntryType) -> Self { | ||
match et { | ||
$(EntryType::$variant => paste::paste! { MessageType::[<$variant Entry >] },)* | ||
EntryType::Unknown(id) => MessageType::Unknown(id) | ||
} | ||
} | ||
} | ||
}; | ||
(@gen_from_entry_type [$variant:ident Entry $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
gen_message_type_enum!(@gen_from_entry_type [$($tail)*] -> [$variant, $($body)*]); | ||
}; | ||
(@gen_from_entry_type [$variant:ident $kind:ident = $id:literal; $($tail:tt)*] -> [$($body:tt)*]) => { | ||
gen_message_type_enum!(@gen_from_entry_type [$($tail)*] -> [$($body)*]); | ||
}; | ||
|
||
// Entrypoint of the macro | ||
($($tokens:tt)*) => { | ||
gen_message_type_enum!(@gen_enum [$($tokens)*] -> []); | ||
gen_message_type_enum!(@gen_kind_impl [$($tokens)*] -> []); | ||
gen_message_type_enum!(@gen_to_id [$($tokens)*] -> []); | ||
gen_message_type_enum!(@gen_from_id [$($tokens)*] -> []); | ||
gen_message_type_enum!(@gen_to_entry_type [$($tokens)*] -> []); | ||
gen_message_type_enum!(@gen_from_entry_type [$($tokens)*] -> []); | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not certain that this is the most maintainable form. I know that this looks insanely clever but I am afraid that you are the only person that understands what is happening here. I admit that by writing the different enums out we have to do a bit more typing and there is a chance that one mixes the codes up. Maybe this can be circumvented by assigning discriminants that are equal to the serialized form and then having a simpler macro that converts from i32 to MessageType
.
For the other implementations I feel that having an explicit implementation would not hurt since the compiler will notify us whenever we add a new variant and forget about extending them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this can be circumvented by assigning discriminants that are equal to the serialized form and then having a simpler macro that converts from i32 to MessageType.
This cannot be done because of the Unknown
:(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that this looks insanely clever but I am afraid that you are the only person that understands what is happening here
I understand it, but I can argue that this is standard business for macros, and i'm not really doing anything particularly special than following commonly used practices, as documented here https://veykril.github.io/tlborm/decl-macros/patterns/push-down-acc.html. The value of the macro here is to not repeat myself, and even more to avoid mistakes when adding new entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your point about not repeating yourself and trying to avoid mistakes. It might also perfectly be the case that this is standard macro usage for every decent Rust programmer. For me it is certainly not the case. To me it is hard because I need to look at the macro to understand the custom DSL you have introduced which is non-obvious (e.g. what are the valid values, where to put things, etc.). Sure I can dig through it but why making it harder for others to understand and contribute by not spelling things explicitly out? How many places would there be for introducing a mistake? I guess one or two places where we map the MessageType
to integer and back. The other impls should all be straight forward to extend when we add new variants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is about avoiding the problem with the mapping between variants and integer code, then we could have a smaller and simpler macro for this (only implementing the mapping from MessageType
-> u32
and u32
-> MessageType
w/o defining the enum, the other impls, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simpler macro
But this macro would look the same though, just smaller. It will still be a pushdown accumulator macro. And the other non macro-generated definitions will just repeat the input of this "simpler macro"
8be10f5
to
c0f879c
Compare
I added the requires ack flag, used for acking side effects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @slinkydeveloper, really liked the way you structured this!
I a comment about the expected byte endianness while parsing, and a sanity check for the expected payload size that is coming from the header.
wdyt?
let mut res = None; | ||
|
||
*self = match mem::take(self) { | ||
DecoderState::WaitingHeader => DecoderState::WaitingPayload(buf.get_u64().try_into()?), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should have here a sanity check, perhaps even a debug assertion.
- the body size is reasonable. Let's say less than
100MB
. - the version is within some expected range.
- Also, should we make the byte endiannes explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the body size is reasonable. Let's say less than 100MB.
Right now the size of the body is constrained to u32
by the header.length
field (if i'm not mistaken is the same limit used by grpc). Not sure which criteria to use to limit more the body size. Perhaps let's get to this when we have such a criteria, and let's write it down in the spec, so we could implement the body length check also in the sdk, to avoid altogether to send the large body.
the version is within some expected range.
For the version check what I had in mind is that the invocation/stream state machine will take care of this, eventually negotiating a new version.
Also, should we make the byte endiannes explicit?
IIRC The byte endianess is defined somewhere in the protocol spec as big endian.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the size of the body is constrained to u32 by the header.length field (if i'm not mistaken is the same limit used by grpc). Not sure which criteria to use to limit more the body size. Perhaps let's get to this when we have such a criteria, and let's write it down in the spec, so we could implement the body length check also in the sdk, to avoid altogether to send the large body.
I think that putting something reasonable right now, is a good first step, even a value like 10 MB. We can always bump it later.
The reason is exactly to protect agisnt an SDK bug (for example wrong endiness)
IIRC The byte endianess is defined somewhere in the protocol spec as big endian.
Thats good! and is the read here is defined to use big endiness or is it up to the platform?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buf.get_u64
reads a long in big-endian byte order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is exactly to protect agisnt an SDK bug (for example wrong endiness)
An SDK bug like this should pop up the first time we do an integration test.
I think that putting something reasonable right now, is a good first step, even a value like 10 MB. We can always bump it later.
I don't really agree on putting an arbitrary limit like this for all messages. We should spend some time to define a limit per message type, depending on other constraints as well. e.g. before the message limit we should define a limit on state key/state value size, on grpc req/res payloads, etc... This arbitrary limit feels artificial and a source of unexpected errors during the testing phase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to defer this discussion about message size limits to a later point in time, and make it configurable, with the default value depending on other constraints. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To unblock this PR, I've opened this issue #90 to implement size limits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating the issue, and unblocking this PR.
The limits should definitely be there, otherwise what we currently have is a plain denial of service vulnerability.
@@ -0,0 +1,296 @@ | |||
use super::header::UnknownMessageType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very well done, I really like the way you have structured the encoding and decoding!
src/invoker/src/message/header.rs
Outdated
impl From<MessageType> for MessageTypeId { | ||
fn from(mt: MessageType) -> Self { | ||
match mt { | ||
MessageType::Start => 0x0000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can these be a constants somewhere in the beginning of this file?
They are referenced in multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could also be a macro that generates these impls and make sure we always correctly associate these ids with the respective variants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting with something simple like constant should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like constants as well. +1 for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting with something simple like constant should be enough.
I wouldn't say constants are simple, sure they improve the first time you read the code, but they're more error prone for maintainers on the long term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @slinkydeveloper. The changes look good to me. +1 for merging after resolving the last open comments.
let mut res = None; | ||
|
||
*self = match mem::take(self) { | ||
DecoderState::WaitingHeader => DecoderState::WaitingPayload(buf.get_u64().try_into()?), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The buf.get_u64
reads a long in big-endian byte order.
src/invoker/src/message/header.rs
Outdated
impl From<MessageType> for MessageTypeId { | ||
fn from(mt: MessageType) -> Self { | ||
match mt { | ||
MessageType::Start => 0x0000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like constants as well. +1 for it.
impl TryFrom<u64> for MessageHeader { | ||
type Error = UnknownMessageType; | ||
|
||
fn try_from(value: u64) -> Result<Self, Self::Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a short comment about how the message header materializes into a u64
could help illustrate the implementation of this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For these details of the protocol, I would like to link directly to the document defining the protocol (which at the moment is wip), rather than putting it in every comment of every implementation. will add a todo for linking to the proto spec doc.
…8d559..29b28f98 29b28f98 Modify Input/Output schema in deployment_manifest_schema.json (restatedev#80) 26d91e69 Replace protocol Empty message with custom one, so we remove the dependency on protobuf built-in messages (restatedev#79) 61ae44b2 Update documentation on error codes (restatedev#77) 576a1b26 Add HandlerType. Fix restatedev#1092 (restatedev#76) 0624d092 Payload schema for input and output. This provides basic format awareness. (restatedev#74) git-subtree-dir: crates/service-protocol/service-protocol git-subtree-split: 29b28f9867734bc01dd47c4666d9d56c90b626f5
…8d559..29b28f98 29b28f98 Modify Input/Output schema in deployment_manifest_schema.json (restatedev#80) 26d91e69 Replace protocol Empty message with custom one, so we remove the dependency on protobuf built-in messages (restatedev#79) 61ae44b2 Update documentation on error codes (restatedev#77) 576a1b26 Add HandlerType. Fix restatedev#1092 (restatedev#76) 0624d092 Payload schema for input and output. This provides basic format awareness. (restatedev#74) git-subtree-dir: crates/service-protocol/service-protocol git-subtree-split: 29b28f9867734bc01dd47c4666d9d56c90b626f5
…8d559..29b28f98 29b28f98 Modify Input/Output schema in deployment_manifest_schema.json (#80) 26d91e69 Replace protocol Empty message with custom one, so we remove the dependency on protobuf built-in messages (#79) 61ae44b2 Update documentation on error codes (#77) 576a1b26 Add HandlerType. Fix #1092 (#76) 0624d092 Payload schema for input and output. This provides basic format awareness. (#74) git-subtree-dir: crates/service-protocol/service-protocol git-subtree-split: 29b28f9867734bc01dd47c4666d9d56c90b626f5
Depends on #78, Fix #79
This module contains:
ProtocolMessage
andMessageHeader
Encoder
/Decoder
ofProtocolMessage