-
Notifications
You must be signed in to change notification settings - Fork 553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transform: list committed offsets #16185
Conversation
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Expose methods in our service to list what offsets are committed. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
0131062
to
f95a1a1
Compare
new failures in https://buildkite.com/redpanda/redpanda/builds/43995#018d23ca-f357-4ef3-8ecd-f2d28a44a4a5:
new failures in https://buildkite.com/redpanda/redpanda/builds/43995#018d2438-0858-40b3-87d9-e00451053e99:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. few questions, but nothing blocking I think
for (const auto& entry : _kvs) { | ||
copy.insert(entry); | ||
co_await ss::coroutine::maybe_yield(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the bare loop primarily so you can yield after each insert? Is the yield precautionary, or have you seen stalls w/o it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the yield precautionary,
This one :)
ret_t(model::transform_offsets_map{}), | ||
[](ret_t agg, ret_t one) { | ||
if (agg.has_error()) { | ||
return agg; | ||
} | ||
if (one.has_error()) { | ||
return one; | ||
} | ||
agg.value().merge(one.value()); | ||
return agg; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tangential: I've been thinking about this pattern lately. I wonder whether there will be an abstraction for this type of computation included with std::expected
at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it would be nice, although there are a few different policies I could see here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some of the low level bits are pretty composable, but yeah, I think they'll eventually run into a wall trying to shoehorn Haskell into C++ :P
src/v/transform/rpc/client.h
Outdated
/** | ||
* Find all the partitions in the cluster for offset tracking and list all | ||
* the transforms they have tracked. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Micro-nit: I don't really understand this comment. Doesn't this method just list the committed offset for each <transform-id, input partition-id>
pair?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this description is a bit mechanical. Will try to clarify.
src/v/redpanda/admin/transform.cc
Outdated
ss::json::json_list<ss::httpd::transform_json::transform_metadata> | ||
list_result; | ||
|
||
co_return ss::json::json_return_type( | ||
ss::json::stream_range_as_array(report.transforms, [](const auto& entry) { | ||
co_return ss::json::json_return_type(ss::json::stream_range_as_array( | ||
admin::lw_shared_container(std::move(report.transforms)), | ||
[](const auto& entry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this related to listing committed offsets? just a bugfix? if the latter, no issue with including as part of this PR, but it might be a good idea to split into a separate commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it avoids a copy because json::stream_range_as_array
returns std::function
not ss::non_copyable_function
. I will split into another commit.
Add the ability to list committed offsets we will use this to expose a debug admin endpoint to list the committed offsets of all transforms. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This can be useful for debugging transforms. We'll also use it for some internal tests. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
This will be used by tests to ensure we're doing cleanup correctly. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Add a test that we cleanup offsets after deleting a transform. We currently do not do this, and the test assertion that is commentted out would fail. Followup work will be done to uncomment the assertion. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
`stream_range_as_array` takes a value, so we need to move it to prevent copies every time a list operation happens. Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
f95a1a1
to
e49d787
Compare
Force push:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44110#018d3319-8042-423a-87d8-1feb3451ac22 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've broken up that change into two parts, this one, which exposes the ability to view committed offsets, and another to do the cleanup. We'll use the infrastructure from this PR to enable the tests for ensuring that the offsets are cleaned up.
i really appreciate this approach of breaking things up into immediately usable and testable prs.
boost::irange(0, cfg->partition_count), | ||
[this](int32_t id) { | ||
auto partition = model::partition_id(id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's really unsatisfying to my brain that we are generating the partition ids here rather than having the set of partitions available to iterate over. is that information not available here, or is this code fundamentally acting like a client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only know how many partitions there are and we need to map reduce over all of them. This code is a client fwiw. Is there a way to get a materialized set of all partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure. but it representing a client fundamentally makes much more sense to me!
We need to cleanup committed offsets when transforms are deleted. At the moment we do not do that. In an effort to do that and ensure we have tests for this, I've broken up that change into two parts, this one, which exposes the ability to view committed offsets, and another to do the cleanup. We'll use the infrastructure from this PR to enable the tests for ensuring that the offsets are cleaned up.
Backports Required
Release Notes
Features