Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KIP-496 OffsetDelete #8669

Merged
merged 10 commits into from
Feb 10, 2023

Conversation

graphcareful
Copy link
Contributor

@graphcareful graphcareful commented Feb 7, 2023

This pull request adds support for the OffsetDelete API to redpanda. This API allows the user to manually remove offsets that are tracked by redpanda within the __consumer_groups topic. The offsets requested to be removed will be removed only if the group in question is in a state of dead or there are no active subscriptions for the topic/partitions to be deleted. If there are active subscriptions for a requested offset to be deleted a new kafka error code group_subscribed_to_topic is to be returned.

Backports Required

  • none - not a bug fix
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v22.3.x
  • v22.2.x
  • v22.1.x

UX Changes

  • none

Release Notes

Features

  • Support for OffsetDelete introduced.

- Returned specifically in offset_delete requests when a deletion is
attempted for an offset for which contains an active group subscription.
@graphcareful
Copy link
Contributor Author

Changes in force-push

  • Adding more test coverage for untested cases
  • Asserting on non-expected KCL output
  • Removed unused variable in OffsetDelete handler

@michael-redpanda
Copy link
Contributor

nit: I think this is KIP-496 not KIP-498

Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks real good. just a few questions about how the group is changing state in various situations.

src/v/kafka/server/server.cc Outdated Show resolved Hide resolved
if (in_state(group_state::empty) || !subscribed(offset.topic)) {
vlog(_ctxlog.debug, "Deleting group offset {}", offset);
_offsets.erase(offset);
_pending_offset_commits.erase(offset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about transactional committed offsets? /cc @bharathv @rystsov

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread started on slack to discuss

src/v/kafka/server/group_manager.cc Show resolved Hide resolved
src/v/kafka/server/group.cc Outdated Show resolved Hide resolved
src/v/kafka/server/group.cc Show resolved Hide resolved
src/v/kafka/server/group_manager.cc Show resolved Hide resolved
@graphcareful graphcareful changed the title Implement KIP-498 OffsetDelete Implement KIP-496 OffsetDelete Feb 7, 2023
- This public method can be invoked to allow the caller to manually
delete desired offsets.

- The internal `_offsets` and `_pending_offset_commits` caches will have
the respecitive offsets removed only if the group is in an `empty` state
or if there are no active subscriptions for the given offset.
- Genericising the delete_expired_offsets method into a private method
called delete_offsets which will delete the requested offsets of a
group.

- `delete_expired_offsets` then calls `delete_offsets` with the
expired_offsets that it has obtained as designated for deletion.
@dotnwat
Copy link
Member

dotnwat commented Feb 8, 2023

Seeing a timeout in CI for the offset deletion test.

[INFO  - 2023-02-08 02:04:09,214 - runner_client - log - lineno:278]: RunnerClient: rptest.tests.offset_retention_test.OffsetDeletionTest.test_offset_deletion: Summary: TimeoutError('')
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/ducktape/tests/runner_client.py", line 135, in run
    data = self.run_test()
  File "/usr/local/lib/python3.10/dist-packages/ducktape/tests/runner_client.py", line 227, in run_test
    return self.test_context.function(self.test)
  File "/root/tests/rptest/services/cluster.py", line 35, in wrapped
    r = f(self, *args, **kwargs)
  File "/root/tests/rptest/tests/offset_retention_test.py", line 367, in test_offset_deletion
    wait_until(wait_for_unsubscription, timeout_sec=5, backoff_sec=1)
  File "/usr/local/lib/python3.10/dist-packages/ducktape/utils/util.py", line 57, in wait_until
    raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception
ducktape.errors.TimeoutError

@graphcareful
Copy link
Contributor Author

Changes in force-push

  • Fixed failing DT test

@dotnwat
Copy link
Member

dotnwat commented Feb 9, 2023

/ci-repeat 10
dt-repeat=25
tests/rptest/tests/offset_retention_test.py

@dotnwat
Copy link
Member

dotnwat commented Feb 9, 2023

One failure. It looks like when we delete the group in kafka::group_manager::offset_delete(kafka::offset_delete_request&&) [clone .resume] at group_manager.cc:? that there are active futures under group::gate. the actual delete i guess probably happens in the shared delete_offsets method?

It didn't actually occur to me that group had a gate. So there may also be a similar bug present in offset retention, but just far more likely to occur with offset delete and your extensive testing.

TRACE 2023-02-09 04:39:08,708 [shard 0] raft - [group_id:7, {kafka/__consumer_offsets/2}] replicate_entries_stm.cc:196 - Leader append result: {append_time:9344926, base_offset:{10}, last_offset:{13}, byte_size:223}
DEBUG 2023-02-09 04:39:08,709 [shard 0] kafka - group_manager.cc:322 - Wrote 3 tombstone records for group id={hey_hey_group} state=Dead gen=4 proto_type={{consumer}} proto={nullopt} leader={nullopt} empty=true ntp={kafka/__consumer_offsets/2} num_members_joining=0 new_member_added=false join_timer={nullopt} pending members [] full members [] expired offsets
redpanda: /vectorized/include/seastar/core/gate.hh:56: seastar::gate::~gate(): Assertion `!_count && "gate destroyed with outstanding requests"' failed.

seastar::gate::~gate() at application.cc:?
kafka::group::~group() at ??:?
seastar::internal::lw_shared_ptr_accessors_esft<kafka::group>::dispose(kafka::group*) at ??:?
seastar::internal::lw_shared_ptr_accessors_esft<kafka::group>::dispose(seastar::lw_shared_ptr_counter_base*) at ??:?
kafka::group_manager::offset_delete(kafka::offset_delete_request&&) [clone .resume] at group_manager.cc:?
std::__1::coroutine_handle<seastar::internal::coroutine_traits_base<kafka::offset_delete_response>::promise_type>::resume() const at group_manager.cc:?
seastar::internal::coroutine_traits_base<kafka::offset_delete_response>::promise_type::run_and_dispose() at ??:?
seastar::reactor::run_tasks(seastar::reactor::task_queue&) at reactor.cc:?
seastar::reactor::run_some_tasks() at reactor.cc:?
seastar::reactor::do_run() at reactor.cc:?
seastar::reactor::run() at reactor.cc:?
seastar::app_template::run_deprecated(int, char**, std::__1::function<void ()>&&) at app-template.cc:?
seastar::app_template::run(int, char**, std::__1::function<seastar::future<int> ()>&&) at app-template.cc:?
application::run(int, char**) at ??:?
main at ??:?



Aborting on shard 0.
Backtrace:
  0x393157da
  0x4c5367f4
  0x4c536413
  0x4c3128c9
  0x4c33eed5
  0x4c4a2a19
  0x4c4a2c4e
  0x4c4a2a9a
  /opt/redpanda_installs/ci/lib/libc.so.6+0x42abf
  /opt/redpanda_installs/ci/lib/libc.so.6+0x92e3b
  /opt/redpanda_installs/ci/lib/libc.so.6+0x42a15
  /opt/redpanda_installs/ci/lib/libc.so.6+0x2c82e
  /opt/redpanda_installs/ci/lib/libc.so.6+0x2c75a
  /opt/redpanda_installs/ci/lib/libc.so.6+0x3b595
  0x39924f23
  0x3a39d5ff
  0x3a39d58e
  0x3a39d525
  0x3d0495a9
  0x3d1d23c0
  0x3d1d1d75
  0x4c332a7d
  0x4c3388de
  0x4c33d43f
  0x4c33afc8
  0x4c05166c
  0x4c04ed25
  0x393dcb44
  0x3939a176
  /opt/redpanda_installs/ci/lib/libc.so.6+0x2d58f
  /opt/redpanda_installs/ci/lib/libc.so.6+0x2d648
  0x392d9ca4

@graphcareful
Copy link
Contributor Author

Changes in force-push

  • Fix bug identified in CI where deletion of group caused a crash

@graphcareful
Copy link
Contributor Author

/ci-repeat 10
dt-repeat=25
tests/rptest/tests/offset_retention_test.py

@dotnwat
Copy link
Member

dotnwat commented Feb 9, 2023

/ci-repeat 10
dt-repeat=25
tests/rptest/tests/offset_retention_test.py

i thought the plan was to run more than this?

@dotnwat
Copy link
Member

dotnwat commented Feb 9, 2023

/ci-repeat 10
dt-repeat=100
tests/rptest/tests/offset_retention_test.py

1 similar comment
@dotnwat
Copy link
Member

dotnwat commented Feb 9, 2023

/ci-repeat 10
dt-repeat=100
tests/rptest/tests/offset_retention_test.py

dotnwat
dotnwat previously approved these changes Feb 9, 2023
Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending ci results

src/v/kafka/server/group_manager.cc Outdated Show resolved Hide resolved
@graphcareful
Copy link
Contributor Author

/ci-repeat 10
dt-repeat=100
tests/rptest/tests/offset_retention_test.py::OffsetDeletionTest

- Offset delete requests will route to the group_manager where the
request will be parsed into a list of topic/partitions and eventually
passed to group::delete_offsets.

- The response is built up from the result of deleted offsets, error
code kafka::group_subscribed_to_topic will be returned for offsets that
could not be deleted due to having existing subscriptions active.
@graphcareful
Copy link
Contributor Author

/ci-repeat 10
dt-repeat=100
tests/rptest/tests/offset_retention_test.py::OffsetDeletionTest

- The group must be shutdown before _groups.erase(it) is called. There
is an active gate and there could be pending futures executing the
moment erase is called.

- Calling shutdown() on the group shuts the gate and closes all pending
work so that erase(group) may safely be called.
@graphcareful
Copy link
Contributor Author

Changes in force-push

  • Fixed python fmt error
  • Good to merge on green CI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants