-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port key extraction logic from the PoC #95
Conversation
34c5eef
to
747e5a8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @slinkydeveloper. The code looks good to me. The only comment I had was concerning the allocations that happen during the key extraction. It would be awesome if we could avoid them since the key extraction will unfortunately be part of the hot path of the worker.
src/service_key_extractor/src/lib.rs
Outdated
(StartGroup, KeyStructure::Nested(expected_message_fields)) => { | ||
let mut message_fields = HashMap::new(); | ||
loop { | ||
let (next_field_number, next_wire_type) = decode_key(buf)?; | ||
if next_wire_type == EndGroup { | ||
break; | ||
} | ||
match expected_message_fields.get(&next_field_number) { | ||
None => { | ||
// Unknown field, just skip it | ||
skip_field( | ||
next_wire_type, | ||
next_field_number, | ||
buf, | ||
DecodeContext::default(), | ||
)?; | ||
continue; | ||
} | ||
Some(next_parser_directive) => { | ||
message_fields.insert( | ||
next_field_number, | ||
deep_extract(buf, next_wire_type, next_parser_directive)?, | ||
); | ||
} | ||
}; | ||
} | ||
expected_message_fields | ||
.keys() | ||
.map(|k| { | ||
message_fields | ||
.remove(k) | ||
// Ensure we have defaulting | ||
.unwrap_or_default() | ||
}) | ||
.for_each(|b| result_buf.put(b)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a lot of allocation going on (hash map and insertions into it, appending of buffers to a BytesMut
) and this key extraction will be on the hot path of the partition processor. Can we reuse structs and buffers to avoid the allocations for every extraction somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These BytesMut
are the reason why I wanted to give a try to SegmentedBuf
, or build something similar ourselves. Injecting the BytesMut
from the input, pooling it, helps with the allocation but still requires copying back and forth from the input buffer. A solution with SegmentedBuf
skips the copy, as we just keep around the slices of the original buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a worthwhile approach to explore at some point.
src/service_key_extractor/src/lib.rs
Outdated
(LengthDelimited, KeyStructure::Nested(expected_message_fields)) => { | ||
let mut message_fields = HashMap::new(); | ||
let inner_message_len = decode_varint(buf)? as usize; | ||
let mut current_buf = buf.split_to(inner_message_len); | ||
while current_buf.has_remaining() { | ||
let (next_field_number, next_wire_type) = decode_key(&mut current_buf)?; | ||
match expected_message_fields.get(&next_field_number) { | ||
None => { | ||
// Unknown field, just skip it | ||
skip_field( | ||
next_wire_type, | ||
next_field_number, | ||
&mut current_buf, | ||
DecodeContext::default(), | ||
)?; | ||
} | ||
Some(next_parser_directive) => { | ||
message_fields.insert( | ||
next_field_number, | ||
deep_extract( | ||
&mut current_buf, | ||
next_wire_type, | ||
next_parser_directive, | ||
)?, | ||
); | ||
} | ||
}; | ||
} | ||
|
||
expected_message_fields | ||
.keys() | ||
.map(|k| { | ||
message_fields | ||
.remove(k) | ||
// Ensure we have defaulting | ||
.unwrap_or_default() | ||
}) | ||
.for_each(|b| result_buf.put(b)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. It would be kick ass if we could pass in something like a ByteMut
or Vec<u8>
that we allocate once and reuse for subsequent key extractions (if we only need the key to write directly to RocksDB, then we could use a single buffer for different messages).
14ddce8
to
3f6ba51
Compare
Fix #87. The code is more or less 1:1 with https://github.com/restatedev/runtime/blob/main/src/worker/key_extractor.rs. The only difference is that now we use
ArcSwap
to bake the internal map of services, and we useServiceInstanceType
as data structure representing the key extraction structure/schema