-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wire transform::logging::manager
into transform API, rpc client
#16485
Conversation
2bf0e5f
to
262b54d
Compare
93d623b
to
f489326
Compare
@rockwotj - Fair number of number of open-ended ish TODOs/questions on here still, but I think the overall structure is worth reviewing if/when you have time. |
f489326
to
80c47fc
Compare
|
new failures in https://buildkite.com/redpanda/redpanda/builds/44790#018d81be-4e06-40e8-97e9-070a4d4c8b9c:
new failures in https://buildkite.com/redpanda/redpanda/builds/44790#018d81be-4dfc-4ad3-b32d-94260114f5e0:
new failures in https://buildkite.com/redpanda/redpanda/builds/44790#018d81be-4e00-4f31-aa2e-970722e3ff59:
new failures in https://buildkite.com/redpanda/redpanda/builds/44790#018d81be-4e03-49af-8164-bc0b64067eb4:
new failures in https://buildkite.com/redpanda/redpanda/builds/44790#018d81d0-2abd-4d44-924f-b11d98f8ac73:
new failures in https://buildkite.com/redpanda/redpanda/builds/44790#018d81d0-2ab7-4dfa-afa4-26c43a5b42a5:
new failures in https://buildkite.com/redpanda/redpanda/builds/44790#018d81d0-2aba-4468-92b6-c0346f96cf21:
new failures in https://buildkite.com/redpanda/redpanda/builds/44833#018d850e-573d-4a7a-ad56-883698ba81d7:
new failures in https://buildkite.com/redpanda/redpanda/builds/44833#018d850a-4b76-4213-960b-f2f0d8e55d18:
new failures in https://buildkite.com/redpanda/redpanda/builds/44921#018d8b3e-c49d-4f33-aeff-e2d4273735e3:
new failures in https://buildkite.com/redpanda/redpanda/builds/44921#018d8b3e-c497-4199-963c-c72f2a2d127b:
new failures in https://buildkite.com/redpanda/redpanda/builds/44953#018d8f72-6cc6-4997-ba6e-74ac3c443dbd:
new failures in https://buildkite.com/redpanda/redpanda/builds/44953#018d8f72-6cc5-4e5b-b4c5-440523fa56a4:
new failures in https://buildkite.com/redpanda/redpanda/builds/44953#018d8f72-6cc5-4d99-b675-087e8654b011:
new failures in https://buildkite.com/redpanda/redpanda/builds/44953#018d8f72-6cc6-4e41-9f25-241c2a274785:
new failures in https://buildkite.com/redpanda/redpanda/builds/44953#018d8f72-6cc5-4c24-bb12-0a8179fd358f:
|
80c47fc
to
5a807ac
Compare
force push to fix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. Just a couple of small things otherwise looks good!
|
||
inline const model::topic_namespace transform_log_internal_nt( | ||
model::kafka_namespace, model::transform_log_internal_topic); | ||
|
||
inline bool is_user_topic(topic_namespace_view tp_ns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: understand the ramifications of this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah seriously...I'll write something up today or tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry this was for myself, but yeah I like @nvartolomei's suggestion to instead have topic properties for the stuff we want to enforce, so things like: redpanda.canproduce
, redpanda.candelete
, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the effects of this particular thing are even a bit more subtle, but yeah, I like that suggestion too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we talking about the proliferation of special topic names and collections of names with a growing list of special behavior and traits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i believe we are. this topic, the schemas topic, and the audit log topic all require special treatment. is this a leading question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we talking about the proliferation of special topic names and collections of names with a growing list of special behavior and traits.
Yes, and it's not clear what happens when one adds something to this list. Anyways, not for this PR, but something we should cleanup at somepoint.
size_t estimate_record_size(size_t ksize, size_t vsize) { | ||
// NOTE(oren): a size estimate for vint fields on an individual record: | ||
// - key size (known) | ||
// - value size (known) | ||
// - offset delta (pessimize) | ||
// - timestamp delta (always nil) | ||
// - headers size (always nil) | ||
// - record size in bytes (calculate based on sum of above) | ||
// NOTE(oren): this seems to overestimate the size of the final batch | ||
// by ~1%. That's probably acceptably conservative to avoid exceeding | ||
// configured limits without losing too much in terms of maximizing | ||
// batch size | ||
constexpr size_t base_overhead_est | ||
= sizeof(model::record_attributes::type) // attrs | ||
+ vint::max_length // offset delta, ultra-conservative | ||
+ vint::vint_size(0) // timestamp_delta | ||
+ vint::vint_size(0); // headers size | ||
auto sz = vint::vint_size(static_cast<int64_t>(ksize)) // key size | ||
+ ksize // | ||
+ vint::vint_size(static_cast<int64_t>(vsize)) // val size | ||
+ vsize // | ||
+ base_overhead_est; | ||
return vint::vint_size(static_cast<int64_t>(sz)) + sz; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not your fault, but we really need to centralize these sorts of calculations.
Can we either:
- Create record objects (even if only temporary) to compute the size then release the key and value into the batch builder (or just hold a list of records until we roll)? I don't know if we want to over index on performance here.
- Create a ticket to move all our batch building stuff into the
model
namespace so it can be shared. Feels weird to have to depend on storage anyways.
IIRC the kafka client just creates record objects and at the last second turns them into batches, I think that would be fine here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it's pretty awful...would be happy to remove, but I was kind of surprised not to find anything more reusable already existing anywhere.
kafka client just creates record objects and at the last second turns them into batches
I must have missed that, will check it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wound up with something like:
- in
append
, create arecord_batch
with a single record - estimate the size of the record therein
- perform all the same checks and move the record's guts into the "real"
batch_builder
I think this is more or less in the spirit of what the internal kafka client is doing, but I might be misreading.
for (size_t i = 0; i < static_cast<size_t>(config->partition_count); | ||
++i) { | ||
model::partition_id candidate( | ||
(sstring_hash{}(name()) + i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh something I forgot to catch here. Absl hashing is not a persistent hash. Another way to say that is that it is seeded differently for every process. Let's use mumur2
or something.
Consider:
redpanda/src/v/cluster/distributed_kv_stm.h
Lines 204 to 205 in 510f459
auto result = model::partition_id{ | |
murmur2(bytes.c_str(), bytes.length()) % num_partitions.value()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice one, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh something I forgot to catch here. Absl hashing is not a persistent hash. Another way to say that is that it is seeded differently for every process. Let's use mumur2 or something.
yikes. that's an easy and devastating mistake to make
To create the transform logs topic on demand. Also adds code in log_manager.cc to perform this operation lazily at flush time. Additionally, update logging::client to return transform::logging::errc (or a result<> thereof). This becomes increasingly useful for error reporting as we start hooking things up to the rest of the cluster. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
A comment on model::packed_record_batch_header_size reflected a presumably outdated value for the constant. Was 57, should be 61. Replaces that comment with a static_assert to avoid similar errors in future. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
This is slightly easier to test and reason about, as well as being nominally less costly in terms of buffer capacity. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
``` record( record_attributes attributes, int64_t timestamp_delta, int32_t offset_delta, iobuf key, iobuf value, std::vector<record_header> hdrs) ```
acc4eff
to
d40b5a7
Compare
force push contents:
|
Encapsulates a storage::record_batch_builder and aims to build up batches of log records. Batches are kept under a specified maximum size to avoid data loss. If appending a given kvp would cause a batch to exceed the specified max, we roll to a new builder, appending the previous batch to a running collection of completed, size-limited record_batches. Includes a simple unit test for constructed record_batch sizes.
d40b5a7
to
6277edb
Compare
force push to remove dead code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good. Just a couple of small things at this point.
One other question I had, are we going to have a followup to cleanup some of the code here?
|
||
def consume_one_log_record(self, offset=0, timeout=10) -> LogRecord: | ||
return self.LogRecord( | ||
self._rpk.consume(self.logs_topic.name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: do all these rpk
commands need to be wrapped in retries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really think so. The timeout gets applied to the rpk shell-out under the hood of RpkTool
, and consume should just sit there until we either receive a record back OR we kill the subprocess. In the latter case we get a nice RpkException. Did you have a particular failure mode in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No just wanted to make sure these tests are robust as possible. For Admin requests that usually means wrapping stuff in retries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's a fair point. If rpk fails to establish a connection that'll immediately fail the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably fine, we can see if any issues crop up first
https://github.com/redpanda-data/core-internal/issues/1009 Doesn't seem urgent (unless I've missed something), but the ticket is queued up for sure. I'd like to at least draft a metrics probe first. |
Adapts transform::rpc::client for transform logging purposes. Implements transform::logging::client. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Also Add sharded<metadata_cache> field to transform::service in service of transform::logging::rpc_client. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Previously a static method on a SCRAM test. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Includes a slightly modified identity transform that also logs the incoming record to stderr. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
6277edb
to
a7f6ba0
Compare
force push few nits |
CI Failures:
|
/ci-repeat 5 |
/cdt |
/ci-repeat 1 |
/backport v23.3.x |
Failed to create a backport PR to v23.3.x branch. I tried:
|
if (fut.failed()) { | ||
vlog( | ||
tlog.error, | ||
"Failed to start transform::logging::manager: {}", | ||
fut.get_exception()); | ||
} | ||
co_return co_await std::move(fut); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will probably crash or hang. Once you eat the exception out of the future, it's put into an invalid state. When you co_await on it, at least from my reading of coroutine.hh, seastar will wait for the future to become available, but that will never happen because it was available (before get_exception) but now it is marked invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just remove the as future thing here. I dont think it adds value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be something like this, right?
if (fut.failed()) { | |
vlog( | |
tlog.error, | |
"Failed to start transform::logging::manager: {}", | |
fut.get_exception()); | |
} | |
co_return co_await std::move(fut); | |
if (fut.failed()) { | |
vlog( | |
tlog.error, | |
"Failed to start transform::logging::manager: {}", | |
fut.get_exception()); | |
co_return; | |
} | |
co_return co_await std::move(fut); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rockwotj - oh yeah, that's fine. will do later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about just
co_await _log_manager->start();
I don't think we want to silently fail to start the manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I'm committing right now.
This commit implements the end-to-end plumbing for data transform logging. This includes:
wasm::logger
that encapsulates amanager
instancetransform::service
logging::client
implementation to perform record batching, forward writes torpc::client
, create the transform logging topic, and compute output partition IDs for partiticular transforms.Closes https://github.com/redpanda-data/core-internal/issues/1057
Closes https://github.com/redpanda-data/core-internal/issues/1058
Backports Required
Release Notes
Features
_redpanda.transform_logs
). Data transform logs will no longer appear in broker logs.