Skip to content

Spring Kafka Listener uses stale oauthbearer token, fails to connect after initial token expired. #2612

@mustafacantekir

Description

@mustafacantekir

In what version(s) of Spring for Apache Kafka are you seeing this issue?

2.8.11

Also tried with version 2.9.6 didn't work.

Describe the bug

I am connecting to a secure kafka that uses OAUTHBEARER SASL Mechnanism. The application initially starts as expected, gets the token and connects to the Kafka. The mentioned token is valid for 30 minutes. After ~25 minutes Kafka client executes reLogin using ExpiringCredentialRefreshingLogin. Relogin completed successfully and loginContext updated with the new token. Although after 30 minutes, KafkaMessageListenerContainer stops due to TopicAuthorizationException. Because the Kafka Consumers still using the stale token and doesn't fetch the new one. I am sharing the related logs below.

2023-03-09 17:25:57.510  INFO 62343 --- [           main] .o.i.e.ExpiringCredentialRefreshingLogin : Successfully logged in.
2023-03-09 17:25:57.512  INFO 62343 --- [71-3b21d55ff7c8] .o.i.e.ExpiringCredentialRefreshingLogin : [Principal=:xxx]: Expiring credential re-login thread started.
2023-03-09 17:25:57.514  INFO 62343 --- [71-3b21d55ff7c8] .o.i.e.ExpiringCredentialRefreshingLogin : [Principal=xxx]: Expiring credential valid from Thu Mar 09 17:25:57 CET 2023 to Thu Mar 09 17:55:57 CET 2023
2023-03-09 17:25:57.514  INFO 62343 --- [71-3b21d55ff7c8] .o.i.e.ExpiringCredentialRefreshingLogin : [Principal=xxx]: Proposed refresh time of Thu Mar 09 17:51:23 CET 2023 extends into the desired buffer time of 300 seconds before expiration, so refresh it at the desired buffer begin point, at Thu Mar 09 17:50:57 CET 2023
2023-03-09 17:25:57.515  INFO 62343 --- [71-3b21d55ff7c8] .o.i.e.ExpiringCredentialRefreshingLogin : [Principal=:xxx]: Expiring credential re-login sleeping until: Thu Mar 09 17:50:57 CET 2023
2023-03-09 17:25:57.585  INFO 62343 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-stale-token-bug-1, groupId=stale-token-bug] Subscribed to topic(s): mock-secure-topic
..
..
..
..
After ~25 minutes
2023-03-09 17:50:57.022  INFO 62343 --- [71-3b21d55ff7c8] .o.i.e.ExpiringCredentialRefreshingLogin : Initiating re-login for xxx, logout() still needs to be called on a previous login = true
2023-03-09 17:50:57.634  INFO 62343 --- [71-3b21d55ff7c8] .o.i.e.ExpiringCredentialRefreshingLogin : [Principal=xxx]: Expiring credential valid from Thu Mar 09 17:50:57 CET 2023 to Thu Mar 09 18:20:57 CET 2023
2023-03-09 17:50:57.634  INFO 62343 --- [71-3b21d55ff7c8] .o.i.e.ExpiringCredentialRefreshingLogin : [Principal=xxx]: Proposed refresh time of Thu Mar 09 18:16:06 CET 2023 extends into the desired buffer time of 300 seconds before expiration, so refresh it at the desired buffer begin point, at Thu Mar 09 18:15:57 CET 2023
2023-03-09 17:50:57.634  INFO 62343 --- [71-3b21d55ff7c8] .o.i.e.ExpiringCredentialRefreshingLogin : [Principal=:xxx]: Expiring credential re-login sleeping until: Thu Mar 09 18:15:57 CET 2023
2023-03-09 17:55:57.498  WARN 62343 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-stale-token-bug-1, groupId=stale-token-bug] Not authorized to read from partition mock-secure-topic-8.
2023-03-09 17:55:57.501 ERROR 62343 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Authentication/Authorization Exception and no authExceptionRetryInterval set

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [mock-secure-topic]

2023-03-09 17:55:57.502 ERROR 62343 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Fatal consumer exception; stopping container
..
..


I also tried defining authExceptionRetryInterval but it didn't fix the issue. The client keep trying to reconnect to the kafka with stale token.

To overcome the issue, I implemented my own consumer refresher. It regularly checks the MessageListenerContainers that associated with the secure kafka topics and restarts them using stop and start methods. My solution works but wanted to report the issue here to get it fixed.

To Reproduce

Steps to reproduce the behavior.

  1. Set up kafka config to use OAUTHBEARER sasl mechanism and configure it similar to the sample app.
  2. Run the application and verify consumers connected to kafka.
  3. When token expires, the application gets TopicAuthorizationException and stops the container.

Expected behavior

Kafka Listener should recognize the new token from the login context and made the subsequent request with valid token.

Sample

A link to a GitHub repository with a minimal, reproducible, sample.

https://github.com/mustafacantekir/stale-token-bug

You can check the kafka configuration. Unfortunately I don't have access to publicly available auth server that I can share. To run the application you need to set valid sasl.oauthbearer.token.endpoint.url and update clientId and clientSecret options in the application.yml.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions