-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transform: data path for multiple output topics #16440
Conversation
4e48fe9
to
ce714e8
Compare
ce714e8
to
59f0659
Compare
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44623#018d672e-0cd2-40c7-bb90-c6aa44182946 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44772#018d815d-141a-466b-ae45-30f980c04a36 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44861#018d86cd-b45f-495f-a8d9-393aa8fbd0c6 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44861#018d86f5-61a4-49aa-b8c4-812e2a449ce4 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/45796#018e19d2-58da-4a11-b902-099bc03421d5 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/45796#018e19d2-58e1-4c05-96f9-fd66372866e3 |
827a9bf
to
2be8cfa
Compare
Force push: rebase with dev (to pick up chunked_vector) |
2be8cfa
to
c18c0da
Compare
Force push: extend comment |
c18c0da
to
acdc064
Compare
acdc064
to
92928b1
Compare
Force push: rebase with dev |
new failures in https://buildkite.com/redpanda/redpanda/builds/44846#018d8591-c6f4-40dc-ab4a-0c7a3092e74e:
|
CI failure: #16535 Unrelated bad log lines in cloud storage |
There is a very slight perf regression for the identity transform in this PR, but that's mostly based upon the buffer size. That should be dynamic based on how much memory is in the transform subsystem (will happen in a followup), if we bump the buffer size to 1 MiB the performance regression is ~2% for the noop transform. I think that's acceptable as we really haven't done a ton of performance tuning. I would guess the small bump is from the async mechanics in the VM. We could potentially have a more complex mechanism for applying backpressure (keep the write method sync, and have another method to suspend until we can retry the write). I'm going to hold off on that for now, as that also might apply in the other async methods (such as read_next_record which always calls maybe_yield). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
// In most cases you should not need specify a template parameter using this | ||
// function over seastar's make_ready_future function. | ||
template<typename T> | ||
seastar::future<std::remove_cvref_t<T>> now(T&& v) noexcept { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we remove cvref as opposed to letting seastar enforce whatever it allows/disallows inside future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise something like: ssx::now(str)
would have a return type of ss::future<ss::string&>
which is dangerous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also see: scylladb/seastar#2065
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ss::futuress::string&
that would just be part of the fun lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad you have fun tracking down crashes 😉
Anyways, do you think it's worth changing something here? I am open to suggestions.
[&transformed]( | ||
std::optional<model::topic_view> topic, | ||
model::transformed_data data) { | ||
vassert(topic == std::nullopt, "not supported yet 🙂"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"not supported yet 🙂");
ha, i can't tell if github is rendering this from ascii or its actually some unicode thingy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a real emoji :)
std::make_move_iterator(futures.begin()), | ||
std::make_move_iterator(futures.end())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, seastar already moves from begin/end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I pushed a commit to clean this up: 07dc8ff
if (in.bytes_left() > h._bytes_left_limit) { | ||
in.skip(in.bytes_left() - h._bytes_left_limit); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, i sorta thought this would be done automatically...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// In cases where we committed the start of the log without any | ||
// records, then the log has added records, we will overflow | ||
// computing small_offset - min_offset. Instead normalize last | ||
// processed to -1 so that the computed lag is correct (these ranges | ||
// are inclusive). For example: latest(1) - last_processed(-1) = | ||
// lag(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
@@ -30,7 +30,8 @@ concept MemoryMeasurable = requires(const T v) { | |||
constexpr size_t default_items_per_chunk = 128; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ss::semaphore::wait` with an abort_source has some
stack-use-after-return issue that we haven't yet been able to track
down.
is this discussed somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No @ballard26 had mentioned that he was seeing segfaults in release mode in the standup notes once and we both have seen stack-use-after-return
with ss::semaphore::wait
. This commit removes the asan violation from my testing.
We both took a look and it seems hard to repro in a unit test and it's not obvious where the bug is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
07dc8ff
to
e770135
Compare
Force push: move local variable into lambda (there is no use-after-return here, but it looks like it could be, so remove the question). |
gentle ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I had a handful of questions mostly around commit history & things I don't fully understand.
std::optional<model::topic_view> topic, | ||
model::transformed_data data) { | ||
vassert(topic == std::nullopt, "not supported yet 🙂"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
short lived emoji 😂
using ::testing::Contains; | ||
using ::testing::Pair; | ||
|
||
TEST_P(MultipleOutputsProcessorTestFixture, TracksProcessPerOutput) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really nice test
e770135
to
ae3f48b
Compare
Force push: rebase against dev |
ae3f48b
to
b40204a
Compare
Make ready future supports requires specifying the type of the future due to the default type being `void`. However in the vast majority of cases we provide a fully formed `T` to the future, making the template parameter redundant. Add a special function for this in ssx. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
In order to support writing to multiple output topics we need to know which output topic to send the emitted record. Allow passing that data around (right now it's always nullopt because we need new ABI methods to support adding the topic). We make this a view so we don't have to copy bytes out of the VM. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
And a reader for string_view. These methods will be needed to read the write options struct that Wasm guest modules will pass back when writing records in the format specified in the RFC. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This is v2 of our ABI. We add a new method that supports also passing the name of the topic we want to write to. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
So that the processor can properly apply backoff to the VM if memory is limited. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
b40204a
to
06763e7
Compare
It should not be valid to require the same batch strategy (IE all input records in a single batch will be in the same output batch). And indeed this will not always be the case later on, so instead make the infrastructure for tests to work on a per record basis instead of a per batch one. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
The wasm transform callback can now return topic names, so we need to be able to handle an explicitly specified name. In order to do that keep, sinks in a map and do lookup. Also due to this change breakup batching to happen at the sink instead of during the transform. This will allow transforms to do better batching, but we need to be careful not to over batch. Future work will plumb in max batch sizes so we will not overbatch. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
As we move to multiple output topics, each topic will need to track it's own committed offset. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Suppport multiple output topics by having a producer per output and resuming at the minimum progress of all sinks. Individual sinks will have to suppress records they have already processed. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Report lag by individual output topic. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This required some reworking of the underlying testing infrastructure. All our existing tests pass if we add extra (unused) output topics. Additionally, we have some new tests specifically for multiple output topics. Probably the most interesting one is the last one that verifies that outputs are processed independently, and resuming with different committed records works as indended without duplicates. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
`ss::semaphore::wait` with an abort_source has some stack-use-after-return issue that we haven't yet been able to track down. Instead manually implement the semaphore pattern using a condition variable. Since this is only used in a SPSC queue context, we can reuse the existing condition_variable for this. If this was used with multiple producers or multiple consumers it would be prone to races between when waiters were unblocked and when `_used_memory` was mutated. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
In a previous commit I accidently stopped emitting stats when writing to outputs. Ensure I don't do that again by adding tests that we are emitting tput stats. I am not asserting on specific values because that feels brittle. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This saved about 2.5% of CPU time for a noop transform. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Simplify the code with `ss::parallel_for_each` instead of collecting the futures and calling `ss::when_all` Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
06763e7
to
1027568
Compare
force pushes: respond to review feedback The last two force pushes were just fixing up the history |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Update the data path of transforms to support multiple output topics, here is a summary of the changes:
Next up we need to support using these new methods in the SDKs, and remove validation in the deploy path that a single output is used.
Backports Required
Release Notes