Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to see topic partition offsets in version 1.3.3.4 #368

Closed
Ashish123gs opened this issue Mar 17, 2017 · 23 comments
Closed

Unable to see topic partition offsets in version 1.3.3.4 #368

Ashish123gs opened this issue Mar 17, 2017 · 23 comments

Comments

@Ashish123gs
Copy link

Hi,

I am using the latest version of KM 1.3.3.4 .
I added a kafka cluster version 0.10.0.1 and enabled JMX metrics and other features .
I am not able to see the offset for the topic partition , i can see the consumer offsets .

Previously , i was using version 1.3.0.8 and i was able to see the topic partition offsets , i looked at the code for 1.3.0.8 and i see that it uses ZK to fetch offset details .

In version 1.3.3.4 i see it uses Kafka to fetch offset details .

@Ashish123gs
Copy link
Author

I think i know why topic offsets are not displayed ..

In KafkaStateActor.scala , line 490
it says that don't collect topic offset if the cluster is secured ( my kafka cluster is secured )

def getTopicPartitionOffsets(topic: String, interactive: Boolean) : Future[PartitionOffsetsCapture] = {
if((interactive || loadOffsets) && !secureKafka) {
partitionOffsetsCache.get(topic)
} else {
emptyPartitionOffsetsCapture
}
}

i disabled that check and it worked for me

Can we remove that check in the next version

@Ashish123gs
Copy link
Author

I also had to use the unsecured port to get the partition offset .
Problem is KM always uses the secured port from the kafka cluster and in my case it tries using the secured port and fails with below error , i think that old 0.8.2.1 api doesn't support security .

Is there any way to get partition offset by connecting to a secured port ?

[error] k.m.a.c.OffsetCacheActive - [topic=MY_TOPIC] An error has occurred while getting topic offsets from broker List((BrokerIdentity(50,abc.com,XXXX,YYYY,true),1))
java.io.EOFException: null

@michaelsauter
Copy link

I'm running 1.3.3.4 against a test cluster which is not secured and can't see topic offsets either.

Log output:

[warn] k.m.a.c.KafkaManagedOffsetCache - Failed to process a message from offset topic on cluster foo!
java.lang.NullPointerException: null
        at java.nio.ByteBuffer.wrap(ByteBuffer.java:396) ~[na:1.8.0_121]
        at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$3.apply(KafkaStateActor.scala:259) [kafka-manager.kafka-manager-1.3.3.4-sans-externalized.jar:na]
        at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$3.apply(KafkaStateActor.scala:235) [kafka-manager.kafka-manager-1.3.3.4-sans-externalized.jar:na]
        at scala.util.Success.foreach(Try.scala:236) [org.scala-lang.scala-library-2.11.8.jar:na]
        at kafka.manager.actor.cluster.KafkaManagedOffsetCache.run(KafkaStateActor.scala:235) [kafka-manager.kafka-manager-1.3.3.4-sans-externalized.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

@DavidLiuXh
Copy link

Please refer to #296

@dongjubae
Copy link

Hello. Same issue for me. Is there any update for the issue?

@dongjubae
Copy link

Thanks simplesteph. Now I understand what's the problem :)
Hope that kafka manager woulbe be updated so that kafka manager would work well for a secured cluster.

@patelh
Copy link
Collaborator

patelh commented Jun 5, 2017

Yes, will be working on this. What security do you have enabled on your clusters?

@Ashish123gs
Copy link
Author

We use SASL_PLAINTEXT

@patelh
Copy link
Collaborator

patelh commented Jun 6, 2017

Do you provide the jaas config as JVM params and the consumer properties file with security.protocol set to SASL_PLAINTEXT ?

@Ashish123gs
Copy link
Author

Nopes .

I think the problem is the use of the old scala consumer api that fetches the topic offset .
The old scala consumer api doesn't support security so i am not sure how we can use the new consumer api to fetch topic offset .

I think we have to rely on the old scala api to get the topic offset and to do that we have to use the unsecured port .

If Kafka team provides an api in the new client version to fetch topic offset that might help to get rid of old scala api in Kafka Manager .

@Ashish123gs
Copy link
Author

In Kafka Manager we use :
This is the old Scala consumer api to fetch topic offset

// Get the latest offsets for the partitions of the topic,
// Code based off of the GetOffsetShell tool in kafka.tools, kafka 0.8.2.1
private[this] def loadPartitionOffsets(topic: String): Future[PartitionOffsetsCapture] = {
// Get partition leader broker information
val optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getTopicPartitionLeaders(topic)

val clientId = "partitionOffsetGetter"
val time = -1
val nOffsets = 1
val simpleConsumerBufferSize = 256 * 1024
val currentActiveBrokerSet:Set[String] = getBrokerList().list.map(_.host).toSet

@Ashish123gs
Copy link
Author

So this leaves us with only one option.

  1. All Kafka cluster registered in KM should have a unsecured port open .
  2. Internally Kafka Manager should use the unsecured port to fetch topic offset details .

I have done this as a workaround:

https://github.com/yahoo/kafka-manager/blob/1.3.3.4/app/kafka/manager/model/ActorModel.scala

Changes :

266 for {
267 tpl <- hostPortResult
268 host = tpl._1
269 port <- portResult // use the unsecured port always
270 secure = false // make it false
271 jmxPort <- jmxPortResult
272 } yield {
273 BrokerIdentity(brokerId, host, port, jmxPort, secure)

Let us know your thoughts ?

@patelh
Copy link
Collaborator

patelh commented Jun 6, 2017

The consumer offset already uses the new kafka consumer. The topic / producer offset code still uses the old consumer. Please provide KM with the JVM params to configure JAAS and a corresponding consumer.properties file which has correct security.protocol

@Ashish123gs
Copy link
Author

Using the jaas file will connect to secured port , will that get the topic offset details ?

@patelh
Copy link
Collaborator

patelh commented Jun 6, 2017

Topic offset is only available through simple consumer last I checked. That is why it is still using the unsecured method. The consumer offsets use the new consumer, so to see lag you have to provide JAAS and consumer.properties.

@patelh
Copy link
Collaborator

patelh commented Jun 6, 2017

The real issue is on line 261, where we find the first item from the end point list. This means if the secure one is listed first in the json, it gets used, and what we should do is try to find an unsecured one first and then use a secured one as fallback.

              parsedList.find(_.isSuccess).fold({

@Ashish123gs
Copy link
Author

I think it make sense to find the unsecured port first and then try for secured port .

Or

How about an option 'Use security' while registering a cluster in kafka Manager ?
If someone enables 'Use security' then he has to provide the jaas and consmer.properties file and then internally we only use secured port ?

Using an option will be transparent for the users ?

@patelh
Copy link
Collaborator

patelh commented Jun 6, 2017

Use security at time of adding a cluster doesn't work unless there is a way to provide jaas configuration programmatically.

@Ashish123gs
Copy link
Author

Agree.

I was suggesting a checkbox ?

This will help users to ensure whether they need to provide a jaas/consumer properties file or not ?
Its more about clarity , user will know that if i select 'security' i need to provide jaas/consumer properties file ?
and then Kafka Manager fails if jaas file is not provided after selecting 'security'

@weand
Copy link

weand commented Jun 10, 2017

  • I've configured SASL_PLAINTEXT as first endpoint in Kafka
  • provided JAAS configuration for KafkaManager via -Djava.security.auth.login.config
  • provided corresponding consumer.properties for KafkaManager with proper settings

then I also don't see any Topics Offsets anymore.

Switching back to PLAINTEXT as first endpoint works, but makes the Kafka unsecure.

@patelh Do I understand that correctly: Kafka does not offer an API for getting the Topic Offset when using SASL?

@tanuj83
Copy link

tanuj83 commented Jul 19, 2017

The same issue I have while using SSL while using kafka 0.10.0 and KM 1.3.3.6
k.m.a.c.OffsetCacheActive - [topic=__consumer_offsets] An error has occurred while getting topic offsets from broker List((BrokerIdentity(

@patelh
Copy link
Collaborator

patelh commented Aug 2, 2017

Use latest version, you can now provide security protocol to use in cluster config.

@Ashish123gs
Copy link
Author

This is awesome .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants