Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[FEATURE] cursor replication with kop #1778

Open
nareshv opened this issue Mar 28, 2023 · 3 comments
Open

[FEATURE] cursor replication with kop #1778

nareshv opened this issue Mar 28, 2023 · 3 comments
Labels
type/feature Indicates new functionality

Comments

@nareshv
Copy link

nareshv commented Mar 28, 2023

Is your feature request related to a problem? Please describe.
One of the good things solved by pulsar is the subscription cursor replication for native pulsar applications. When kop is used there is same understanding, But in practice things are a bit different with kafka's consumer groups

Describe the solution you'd like
kafka consumer group offset replication should work across clusters similar to the native subscription replication for consumers

Describe alternatives you've considered
None

Additional context
We have 2 clusters running kop with namespace level replication setup.

  1. start the kafka producer to cluster-scus on topic-1 for 10900 messages
  2. start the kafka consumer to cluster-scus on topic-1, stop at 10900 offset, lag 0
  3. stop the kafka consumer to cluster-scus
  4. start the kafka consumer to cluster-wus on topic-1, consumer stats at 500 offset, lag 10400
  5. stop the kafka consumer to cluster-wus
  6. start the kafka consumer to cluster-scus on topic-1, consumer resumes from 10900 offset, lag 0
Consumer Logs
-- on SCUS ---

Kafka version: 2.8.0 Cluster ID: cluster-scus
Setting offset for partition topic-1-0 to the committed offset FetchPosition{offset=10900, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[x.x.x.x:9092 (id: 1 rack: null)], epoch=absent}}


-- on WUS ---

Kafka version: 2.8.0 Cluster ID: cluster-wus
Setting offset for partition topic-1-0 to the committed offset FetchPosition{offset=500, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[y.y.y.y:9092 (id: 2 rack: null)], epoch=absent}}


--- on SCUS ---
Kafka version: 2.8.0 Cluster ID: cluster-scus
Setting offset for partition topic-1-0 to the committed offset FetchPosition{offset=10900, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[x.x.x.x:9092 (id: 683016547 rack: null)], epoch=absent}}
@nareshv nareshv added the type/feature Indicates new functionality label Mar 28, 2023
@BewareMyPower
Copy link
Collaborator

KoP does not use cursor to store the committed offsets. Like Kafka, KoP just stores the offsets in the offset topic, which is public/__kafka/__consumer_offsets by default. If this topic was replicated successfully, the offsets should also be loaded successfully.

I think the reason is that KoP reads the committed offset from the memory cache directly. KoP only loads the cache from the offset topic when starting.

@nareshv
Copy link
Author

nareshv commented Jun 28, 2023

are there any configs to setup for the consumer offsets to be replicated or the default pulsar things will work ? How to know the cache of KoP and flush if needed for a topic/consumer group ?

@BewareMyPower
Copy link
Collaborator

TL; DR, current KoP does not work well with geo-replication, we might need to improve it later.

are there any configs to setup for the consumer offsets to be replicated or the default pulsar things will work ?

You only needs to replicate the public/__kafka/__consumer_offsets topic.

How to know the cache of KoP and flush if needed for a topic/consumer group ?

All Kafka admin operations just query the cache of KoP.

To understand this issue, you might need to know some implementation details:

  1. KoP loads the metadata from the offset topic to the memory cache only when:
    1.1 The first time when KoP handles a request that queries group or offset metadata (e.g. the consumer subscribed)
    1.2 Namespace bundle ownership changes (e.g. a broker is down)
  2. When receiving some requests (e.g. offset commit) from Kafka clients, KoP updates the memory cache and persist the metadata into the offset topic.

Assuming you replicated the offset topic from cluster-A to cluster-B, if a Kafka consumer connected to KoP when the offset topic is not fully replicated, KoP might not load the latest metadata, e.g.

  • __consumer_offsets in cluster-A: group-A-offset-101, group-A-offset-201, ..., group-A-offset-1001
  • __consumer_offsets in cluster-B: group-A-offset-101, group-A-offset-201

Then the consumer to cluster-B will start consuming from offset 201, not 1001.

What's worse is, if you triggered the metadata loading, the metadata won't update. e.g.

  1. Consumer in group-A connected to KoP
  2. KoP loaded metadata from the offset topic and maintain the (group-A, 201) pair in memory.
  3. Consumer did nothing and exited.
  4. Wait for a while, the __consumer_offsets in cluster-B was fully replicated.
  5. Consumer in group-A connected to KoP again.
  6. KoP would still return offset 201 to the consumer, while the metadata in the __consumer_offsets is 1001.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
type/feature Indicates new functionality
Projects
None yet
Development

No branches or pull requests

2 participants