Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasm: transform callback #16253

Merged
merged 6 commits into from
Jan 30, 2024
Merged

wasm: transform callback #16253

merged 6 commits into from
Jan 30, 2024

Conversation

rockwotj
Copy link
Contributor

Instead of buffering in the wasm layer we will have a callback for when data is emitted from the VM.

This will allow us to account for memory easier and will allow us to recieved additional data in the callback like which topic it should be output to.

Additional benefits/bug fixes:

  • Fixes timing of execution for a single transform.
  • Will allow us to batch independant of the transform, which will be important in the future as we need to handle output batches being over the size limit.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.3.x
  • v23.2.x
  • v23.1.x

Release Notes

  • none

@rockwotj rockwotj marked this pull request as ready for review January 24, 2024 21:20
@rockwotj rockwotj self-assigned this Jan 25, 2024
@oleiman
Copy link
Member

oleiman commented Jan 25, 2024

/var/lib/buildkite-agent/builds/buildkite-amd64-builders-i-07d887f081b52b211-1/redpanda/redpanda/src/v/wasm/tests/wasm_transform_bench.cc:100:60: error: too few arguments to function call, expected 3, have 2 [clang-diagnostic-error]
        return _engine->transform(std::move(batch), &_probe).then([](auto) {

Glad I'm not the only one getting spanked by this...presumably we don't build the benchmark on debug builds because the resulting binary is not useful, but it seems like the compiler exit code would be a good signal here.

@rockwotj
Copy link
Contributor Author

Glad I'm not the only one getting spanked by this...presumably we don't build the benchmark on debug builds because the resulting binary is not useful, but it seems like the compiler exit code would be a good signal here.

Ugh yes. Because we don't build in debug and I use that compilation_database.json to power LSP my editor is unaware that file even exists

@oleiman
Copy link
Member

oleiman commented Jan 25, 2024

Glad I'm not the only one getting spanked by this...presumably we don't build the benchmark on debug builds because the resulting binary is not useful, but it seems like the compiler exit code would be a good signal here.

Ugh yes. Because we don't build in debug and I use that compilation_database.json to power LSP my editor is unaware that file even exists

yeah exactly. if it's not in my compile_commands it might as well not exist 😂

@rockwotj
Copy link
Contributor Author

Force push fix broken benchmark 🤦

@oleiman
Copy link
Member

oleiman commented Jan 25, 2024

FYI - I think your build-debug is going to time out anyway https://redpandadata.slack.com/archives/C044RD18NMV/p1706217702537079

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Jan 25, 2024

new failures in https://buildkite.com/redpanda/redpanda/builds/44313#018d42c0-3688-4b97-b313-b41b2d1c0eaa:

"rptest.tests.data_transforms_test.DataTransformsLeadershipChangingTest.test_leadership_changing_randomly"

new failures in https://buildkite.com/redpanda/redpanda/builds/44313#018d42c0-3692-4190-96e0-b872288b5c3d:

"rptest.tests.data_transforms_test.DataTransformsChainingTest.test_multiple_transforms_chained_together"

new failures in https://buildkite.com/redpanda/redpanda/builds/44313#018d42c0-368f-43a4-8b74-02b2e0d787a3:

"rptest.tests.data_transforms_test.DataTransformsTest.test_identity.transactional=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/44313#018d42c0-368c-4698-ba13-2716b611794e:

"rptest.tests.data_transforms_test.DataTransformsTest.test_identity.transactional=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/44313#018d42d0-3d9a-4cf8-af52-40fb12e71dbf:

"rptest.tests.data_transforms_test.DataTransformsChainingTest.test_multiple_transforms_chained_together"

new failures in https://buildkite.com/redpanda/redpanda/builds/44313#018d42d0-3d9d-4385-89cb-a37d15b446ba:

"rptest.tests.data_transforms_test.DataTransformsLeadershipChangingTest.test_leadership_changing_randomly"

new failures in https://buildkite.com/redpanda/redpanda/builds/44313#018d42d0-3d96-430a-9bfa-802ca28db8e9:

"rptest.tests.data_transforms_test.DataTransformsTest.test_identity.transactional=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/44313#018d42d0-3d92-4de3-9a02-b2b6ee6a5289:

"rptest.tests.data_transforms_test.DataTransformsTest.test_identity.transactional=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/44363#018d4767-9455-4464-848b-30f242f2080c:

"rptest.tests.cloud_storage_timing_stress_test.CloudStorageTimingStressTest.test_cloud_storage_with_partition_moves.cleanup_policy=delete"

Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me.

With respect to the transfer_queue, do we have a way of testing the cost of growing/shrinking chunked_fifo compared to the circular_buffer-ness of ss::queue?

src/v/transform/transform_processor.cc Show resolved Hide resolved
model::timestamp::now(), std::move(transformed));
ss::future<> invoke_transform(
model::record_batch batch, transform_probe* p, transform_callback cb) {
class callback_impl final : public record_callback {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the most limited scope 😅

Comment on lines +39 to +69
// We want to be able to always insert at least one item, so cap memory
// usage by _max_memory so we don't overload our semaphore and so we
// can't get stuck.
size_t mem = std::min(entry.memory_usage(), _max_memory);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea here is that, at worst, we exhaust the semaphore with a single entry and exceed the preset memory limit but still make progress, or, at best, we can enqueue a bunch of entries where memory_usage() << _max_memory?

And, regardless of whether we're "lying" about the size of an incoming entry, if there's already something in the queue and we wind up having to wait, that's fine because we're making progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we have to make progress, so eating all the mutex is important. Overall the memory limits are still in place because I think somewhere else needs to have already accounted for the memory here...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhere else needs to have already accounted for the memory here

You mean when the event was constructed? That seems like an easy precondition to break. This is sort of confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean when the event was constructed? That seems like an easy precondition to break. This is sort of confusing.

Yeah the transform subsystem needs to have a limit on the total memory it uses for what is available - it does not right now, we have a limit defined, but don't really respect it. At the end of the day batches are variable sized, and it feels odd to have a fixed buffer size when each batch could have wildly different actual memory costs.

However we can't not make progress because we get a batch that is over our limit, we just need to go slower. This is sort of how our log reader uses the max_bytes limit FWIW, it's a soft limit and you'll always make progress over adhering to the limit.

Let's continue this in the parent conversation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, yeah that makes good sense 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me expand more on this in the comments for this class

src/v/transform/transform_processor.cc Outdated Show resolved Hide resolved
@rockwotj
Copy link
Contributor Author

With respect to the transfer_queue, do we have a way of testing the cost of growing/shrinking chunked_fifo compared to the circular_buffer-ness of ss::queue?

I want to prevent oversized allocs hence the usage of chunked_fifo, but don't want force a number of elements because batch sizes vary. So the thought was to bound these buffers based on size. I still need to address keeping the entire transform subsystem under it's limit.

I dropped #16257 because I think that will be a better alternative to chunked_fifo here, but it feels wrong to limit the buffer based on the number of variable sized objects.

@rockwotj
Copy link
Contributor Author

Force push: Fix tests and a lazy bug in transfer queue

Create a nice way to simulate `zip` similar to the python function for
two different containers.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
There is no gaurentee that the batches returned are the same in terms of
offset/etc and this will change soon. So introduce a test helper to
ensure that returned batch records are the same excluding metadata.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
@rockwotj
Copy link
Contributor Author

force push: reorder commits so each one compiles independently

@rockwotj
Copy link
Contributor Author

Force push: Document transfer queue, add abort source to push.

@oleiman call me out please if I forget to document a class 😅

@rockwotj
Copy link
Contributor Author

Force push: remove a promise we didn't keep

@rockwotj
Copy link
Contributor Author

Force push: remove a file that shouldn't be there

* ```
*/
template<typename... Args>
auto zip(Args&&... args) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 🐍 🕳️

oleiman
oleiman previously approved these changes Jan 26, 2024
Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm honestly. might want to push for a second set of eyes, particularly on the large-ish mechanical change: 2d97fc1. your call.

@rockwotj
Copy link
Contributor Author

/ci-repeat

@rockwotj
Copy link
Contributor Author

force push: fix empty batches not committing

Instead of creating the batch in the wasm layer, we will create the
batch at the upstream. We'll need this when we're writing these records
to different topics.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
`int32_t` is not the right type, even though we shouldn't be dealing
with more than 4GiB anyways, all the users of this function were using
`size_t` as storage anyways.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Add a queue we can use in the processor to have memory limits and
tracking for the queues while transforms are inflight.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
Instead of the fixed size ss::queue, use a queue that is bounded by
actual memory usage. The other benefit of this queue, is that it has
first class support for an abort source, instead of having to abort and
reconstruct the queue, we can just clear the queue and use our existing
abort source in the processor.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
@rockwotj
Copy link
Contributor Author

Force push: Fix post_record being called before pre_record for the first record in a batch.

@rockwotj rockwotj requested a review from oleiman January 27, 2024 18:08
Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new stuff lgtm

@rockwotj rockwotj merged commit e7c31c7 into redpanda-data:dev Jan 30, 2024
17 checks passed
@rockwotj rockwotj deleted the callback branch January 31, 2024 01:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/redpanda area/wasm WASM Data Transforms
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants