-
Notifications
You must be signed in to change notification settings - Fork 553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce transform::logging::manager
#16301
Conversation
7d4d08c
to
d329120
Compare
/dt |
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44335#018d441a-b421-495d-98f2-2d2b5a2c7b80 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44344#018d44e8-9c10-4d81-ad43-d3590cbc0891 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44344#018d44f8-c1ab-4d8d-98ba-2f898ceffb4f ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44412#018d52f0-6bc6-4d86-9230-2b37c1bbd33e ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44511#018d5ecd-6675-4267-a9e2-b538a9704dc2 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44511#018d5ecd-6679-4873-8e42-09638df305df ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44511#018d60bd-9df6-436c-bc3f-c22ec5b771d1 ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/44561#018d6220-ed1d-4d10-aa48-18b6d009f697 |
d329120
to
59370b3
Compare
/dt |
59370b3
to
9124aa5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tests. A good first stab - I think the only substantial feedback is that I think we should do more aggressive batching in the manager.
"Flush interval for transform logs. When a timer expires, pending logs " | ||
"are collected and published to the transform_logs topic.", | ||
{.needs_restart = needs_restart::no, .visibility = visibility::tunable}, | ||
500ms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to do some quick math on at one scale this overwhelms a single partition. One of the reasons for the 3s commit interval for data transform progress is so that we don't have to worry too much about addition partitions at that commit interval (we would need a lot of cores for that). I agree with the faster publish/flush rate here, but let's derive some limits of this internally and figure out what we need to have partition counts for our cloud tiers based on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya, strongly agree. this value is totally arbitrary and frankly an artifact of a previous iteration (before I wired in manual_clock
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/v/config/configuration.h
Outdated
@@ -74,6 +74,11 @@ struct configuration final : public config_store { | |||
bounded_property<size_t> data_transforms_per_function_memory_limit; | |||
property<std::chrono::milliseconds> data_transforms_runtime_limit_ms; | |||
bounded_property<size_t> data_transforms_binary_max_size; | |||
bounded_property<size_t> data_transforms_logging_buffer_capacity_bytes; | |||
// TODO(oren): bounded? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to be that prescriptive personally - it's a tunable after all.
event event; | ||
ssx::semaphore_units units; | ||
}; | ||
using queue_t = ss::chunked_fifo<log_event>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think chunked_vector
will probably be a better structure once #16257 is merged
9124aa5
to
97fffa3
Compare
force push contents:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have not looked super close at the tests yet, but this is looking really good!
if constexpr (std::is_same_v<ClockType, ss::manual_clock>) { | ||
co_await _wakeup_signal.wait( | ||
ClockType::now() + _jitter.base_duration()); | ||
} else { | ||
co_await _wakeup_signal.wait(_jitter()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it simplify things if we always do:
co_await _wakeup_signal.wait<ClockType>(ClockType::now() + _jitter.base_duration());
Because I think the else
version you're using here uses timer::clock::now()
which defaults to steady_clock, which lowres should be fine here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, yeah you're absolutely right. I got mixed up associating the Clock
template param with the class rather than the wait methods.
template<typename ClockType> | ||
ss::future<> manager<ClockType>::stop() { | ||
_as.request_abort(); | ||
co_await _flusher->stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be closing the gate first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to break the semaphore first because the flush fiber is holding the gate open, right?
incidentally, the fact that this looks strange is probably a bit of an API smell for the flusher thing...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this class even need the gate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the suggestion is "push the gate down into flusher
", which sounds right to me, I've done that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a question, but a reasonable answer is that it doesn't and it can be pushed into the flusher :)
97fffa3
to
2c4ba7e
Compare
|
2c4ba7e
to
817dfe5
Compare
force push contents:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥
Just some documentation, and some possible code cleanup. Otherwise this LGTM.
|
||
} // namespace io | ||
|
||
class client { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please add documentation for the semantics of this interface.
|
||
auto validate_msg = | ||
[&msg_len](std::string_view message) -> std::optional<ss::sstring> { | ||
auto sub_view = message.substr(0, msg_len(message)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this can leave invalid UTF-8 due to a multiple byte character being cut in the middle, but personally I don't think we need to worry about it.
template<typename ClockType> | ||
ss::future<> manager<ClockType>::stop() { | ||
_as.request_abort(); | ||
co_await _flusher->stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this class even need the gate?
event{_self, event::clock_type::now(), level, std::move(*b)}, | ||
std::move(*units)); | ||
|
||
if (check_lwm() && _flusher != nullptr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When can _flusher
be nullptr
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If enqueue_log
were called before manager::start
. I think this shouldn't happen, but without all the wiring in place I'm not 100% sure. Maybe an assertion would express this better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we create the flusher in the constructor and wait to call flusher.start until start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably? I know for things that initialize metrics there's a good reason to construct in start
(global side effects?). Not an issue here, but I've seen this pattern on a couple other classes. TBH in this case I'm just cargo-culting a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah transform::service does this because other dependent sharded services are not initialized yet - kind of annoying. I think in this case, we don't have this concern because the memory is owned by the manager, so we can simplify.
} | ||
|
||
template<typename ClockType = ss::lowres_clock> | ||
class manager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some documentation for the purpose and semantics of this class? Also details that it's expected to be an instance per core, etc.
// timepoint overload template has a clocktype parameter, so we | ||
// use that one to get the behavior we want for testing | ||
co_await _wakeup_signal.wait<ClockType>( | ||
ClockType::now() + _jitter.base_duration()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why base_duration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦 for the manual clock specialization. got a bit overzealous removing code...we still need that constexpr conditional here.
}); | ||
} | ||
|
||
template<typename BuffersT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is BuffersT
templated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To limit the scope of buffer_entry
, buffer type, map type. I want to keep the details internal to manager
and flusher
out of the header. Maybe there's an angle I'm not seeing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to keeping the flusher out of the header. This is fine, just mostly curious.
}); | ||
|
||
vlog(tlg_log.trace, "Processed {} log events", n_events); | ||
co_return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: co_return
isn't needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, cruft. i had a status code there in a previous iteration, then removed it, added it back, removed it.
// released (i.e. buffer capacity freed) only once those records have | ||
// been produced. | ||
co_await _client->write(pid, std::move(events)); | ||
co_return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this co_return
it's not needed.
// This will cause a reactor stall in Debug mode but NOT | ||
// in Release |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this from? Is that because enqueue_log
is called multiple times synchronously? There isn't anything inherently in the manager that should cause stalls right? This is just an artifact of the test and Debug mode slowness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't anything inherently in the manager that should cause stalls right?
correct. doesn't look like it to me, anyway.
just an artifact of the test and debug mode slowness
I think so. It definitely stalls in enqueue_log
, but it's a sparse backtrace with some asan symbols and some fmtlib (???) symbols in. I was more confident about this yesterday...backtrace decoding seems like it could be wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eh, Seems like the cause is doing a bunch of slow asan mallocs in a tight loop? I threw an ss::maybe_yield().get()
in the test function and the stall went away. Would explain the garbled backtrace I think.
817dfe5
to
0df59c4
Compare
force push minor cleanups and some light documentation on client, manager, manager::enqueue_log |
- data_transforms_logging_buffer_capacity_bytes : integer (default 100_KiB) - data_transforms_logging_flush_interval_ms : integer (default 500ms) - data_transforms_logging_line_max_bytes : integer (default 1_KiB) Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Initial API for determining the partition ID for some transform's logs and writing those logs to the transform_logs topic. Abstract interface for testing purposes. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
`struct sstring_less` Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
For convenience. Call straight into the underlying property. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
0df59c4
to
5b8f00a
Compare
empty force push for commit signoff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/backport v23.3.x |
Oops! Something went wrong. |
/backport v23.3.x |
This PR introduces transform::logging::manager whose primary responsibility is to buffer transform logs, periodically flushing them to some transform::logging::sink as chunked_fifo of JSON-serialized OpenTelemetry-compatible log events.
Closes https://github.com/redpanda-data/core-internal/issues/1035
Closes https://github.com/redpanda-data/core-internal/issues/998
Backports Required
Release Notes