-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Let PartitionProcessor and Consensus module use restate_wal_protocol::Envelope #1204
Let PartitionProcessor and Consensus module use restate_wal_protocol::Envelope #1204
Conversation
5cdcdf5
to
9ca0864
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very impressive work @tillrohrmann. 🍾
LGTM.
sequence_number: msg_index, | ||
dedup_key: deduplication_source, | ||
// todo: Retrieve proper node config version | ||
nodes_config_version: Version::MIN, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it'd have been nice if access to metadata is global (static). Also, it's possible to use NodeID::my_node_id() here if you like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, we can remove node_id
from DispatcherLoopHandler
. On the other hand, it is currently wired up. Maybe something to do once we touch this part again.
One option for sharing the metdata could be to make it a task local key of the TaskCenter
.
} | ||
|
||
pub(super) async fn handle(&self, actuator_output: ActionEffect) { | ||
match actuator_output { | ||
ActionEffect::Invoker(invoker_output) => { | ||
let header = self.create_header(invoker_output.full_invocation_id.partition_key()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a future iteration, we could change the envelope such that it acts like a builder. Different layers in the system will populate various bits in the header as the envelope bubbles up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, that would be nice :-)
leader_epoch: self.leader_epoch, | ||
// todo: Add support for deduplicating self proposals | ||
sequence_number: None, | ||
node_id: NodeId::my_node_id().expect("NodeId should be set").id(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
@@ -63,7 +59,7 @@ impl<Codec: RawEntryCodec> StateMachine<Codec> { | |||
is_leader: bool, | |||
) -> Result<InterpretationResult<Transaction<TransactionType>, Collector>, Error> { | |||
// Handle the command, returns the span_relation to use to log effects | |||
let command_type = command.type_human(); | |||
let command_type = command.name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
|
||
fn create_header(partition_key: PartitionKey) -> Header { | ||
Header { | ||
source: Source::ControlPlane {}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice!
Thanks a lot for the quick review @AhmedSoliman. Merging this PR now. |
…:Envelope This commit removes the partition Command and replaces it with the restate_wal_protocol::Envelope. This is a preparational step for removing the Consensus module and replacing it with Bifrost.
9ca0864
to
ba7a9a4
Compare
This commit removes the partition Command and replaces it with the
restate_wal_protocol::Envelope. This is a preparational step for removing
the Consensus module and replacing it with Bifrost.