Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Issue 95: Make Kafka OffsetCommit and Pulsar backlog align #105

Merged
merged 9 commits into from
Feb 17, 2020

Conversation

jiazhai
Copy link
Contributor

@jiazhai jiazhai commented Feb 16, 2020

Fix #95.
Make Kafka OffsetCommit and Pulsar backlog align.

  • Add backlog tracker in GroupCoordinator.
  • After fetch message, ack the fetched position.

@jiazhai jiazhai closed this Feb 16, 2020
@jiazhai jiazhai reopened this Feb 16, 2020
@jiazhai jiazhai closed this Feb 17, 2020
@jiazhai jiazhai reopened this Feb 17, 2020
@jiazhai jiazhai merged commit 4c8651f into master Feb 17, 2020
@jiazhai jiazhai deleted the jia/offset_commit_cursor branch February 17, 2020 15:15
@jiazhai jiazhai modified the milestones: 0.0.1, 0.1.0 Feb 17, 2020
@BewareMyPower BewareMyPower mentioned this pull request May 20, 2021
BewareMyPower added a commit that referenced this pull request May 21, 2021
Fixes #392

### Motivation

#105 introduced the `OffsetAcker` to reuse the backlog stat of Pulsar subscription. However, after #296 introduced the continuous offset, when the `OffsetAcker` want to find the `MessageId` or `Position`, which contains the ledger id and entry id of a batch, it needs to get the `PersistentTopic` using `BrokerService#getTopic`. If the current broker was not the owner broker of the partition, `getTopic` would fail.

However, Kafka consumers send the `OFFSET_COMMIT` requests to the coordinator broker, which is associated with the specified group name. A coordinator broker may not own the partition after the pulsar's topic ownership changed, so the `getTopic` would fail and couldn't be recovered, which affect the performance significantly and a lot of warnings logs would be printed. 

### Modifications

- Remove the `OffsetAcker` with the related metrics (`CoordinatorStats`) and tests (`CommitOffsetBacklogTest`)
- Remove the reused consumer stats from #263 with the related tests (`testKafkaConsumerMetrics`)
- Add `testMixedConsumersWithSameSubscription` to cover the case that Kafka consumer and Pulsar consumer subscribes the same topic with same subscription name. Because before this PR, Pulsar consumer were aware of the Kafka consumer's consumed position (committed offset).
- Fix the current tests:
    - `testDeleteTopics`: After this PR, the topic could be deleted even the Kafka consumers were still connected because no broker side Consumer were attached to the topic.
    - `testPulsarProduceKafkaConsume2`: It's a test for Kafka consumer with `enable.auto.commit=true`, however the validation is wrong because even a consumer hasn't committed any offset, the consumer can still consume from the latest offset.

Regarding to the cases that Kafka or Pulsar producer produces and Kafka or Pulsar consumer consumes, they are covered by existed `KafkaRequestTypeTest`. And the offset commit case is covered by `DifferentNamespaceTestBase#testCommitOffset`.
Demogorgon314 pushed a commit to Demogorgon314/kop that referenced this pull request Oct 30, 2023
Master issue: streamnative/ksn#63

### Modifications

For `SASL_*` requests, forward them to any broker (`authenticateBroker`)
to verify if the authentication succeeded. At the same time, cache these
requests in `BrokerConnectionGroup` because we might need to forward
further requests (e.g. `PRODUCE`) to a new connection, in this case, we
need to forward the `SASL_*` requests again so that the
`KafkaRequestHandler#authenticator` will have the principal info for
authorization.

`SaslPlainProxyTest` is added to verify this patch.

In addition, close the connections to broker in `BrokerConnectionGroup`
when the client connection to proxy is closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE] Make sure Kafka offsets sync to Pulsar subscription state
1 participant