New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto.offset.reset - smallest vs earliest #16

Closed
l15k4 opened this Issue May 17, 2016 · 6 comments

Comments

Projects
None yet
2 participants
@l15k4
Contributor

l15k4 commented May 17, 2016

Hey,

I believe smallest is deprecated in kafka 0.9.x :

you can configure the consumer by setting auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest" for the old consumer.

Btw there are some issues with auto.create.topics.enable :
https://issues.apache.org/jira/browse/KAFKA-3334

Have you thought about adding support for creating topics on start?

@manub

This comment has been minimized.

Show comment
Hide comment
@manub

manub May 17, 2016

Owner

Good catch - thanks for this. Definitely I'll be changing the value of auto.offset.reset. Or if you want feel free to submit a PR!

Regarding the auto creation of topics, #15 added a way to create manually topics. This is still lacking documentation but it will be fixed by the end of the week and a new release (possibly 0.5.1 or 0.6.0) will be pushed. While this won't create topics automatically at start, it will give the users the possibility to create topics before actually producing/consuming on them.

Owner

manub commented May 17, 2016

Good catch - thanks for this. Definitely I'll be changing the value of auto.offset.reset. Or if you want feel free to submit a PR!

Regarding the auto creation of topics, #15 added a way to create manually topics. This is still lacking documentation but it will be fixed by the end of the week and a new release (possibly 0.5.1 or 0.6.0) will be pushed. While this won't create topics automatically at start, it will give the users the possibility to create topics before actually producing/consuming on them.

@l15k4

This comment has been minimized.

Show comment
Hide comment
@l15k4

l15k4 May 17, 2016

Contributor

I tried the current 0.6.0-SNAPSHOT version and I now after calling EmbeddedKafka.stop() there is a Zookeeper client thread hanging trying to inevitably connect to ZK that has been stopped. I have a suspicion that the ZK client connection is from :

EmbeddedKafka.createCustomTopic("topic1", new Properties())
19:37:31.842 WARN  org.apache.zookeeper.ClientCnxn - Session 0x154bfcb1b8d0002 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

What do you think ? This is my code, using reactive-akka : https://gist.github.com/l15k4/5cdd6be19ec2fcb9a3a32c23f0f6866c

Contributor

l15k4 commented May 17, 2016

I tried the current 0.6.0-SNAPSHOT version and I now after calling EmbeddedKafka.stop() there is a Zookeeper client thread hanging trying to inevitably connect to ZK that has been stopped. I have a suspicion that the ZK client connection is from :

EmbeddedKafka.createCustomTopic("topic1", new Properties())
19:37:31.842 WARN  org.apache.zookeeper.ClientCnxn - Session 0x154bfcb1b8d0002 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

What do you think ? This is my code, using reactive-akka : https://gist.github.com/l15k4/5cdd6be19ec2fcb9a3a32c23f0f6866c

@manub

This comment has been minimized.

Show comment
Hide comment
@manub

manub May 17, 2016

Owner

Could you please check with the changes I pushed on master related to #17 if this is fixed? New consumer doesn't connect to Zookeeper anymore.

Owner

manub commented May 17, 2016

Could you please check with the changes I pushed on master related to #17 if this is fixed? New consumer doesn't connect to Zookeeper anymore.

@l15k4

This comment has been minimized.

Show comment
Hide comment
@l15k4

l15k4 May 17, 2016

Contributor

That didn't help. It is really caused by :

EmbeddedKafka.createCustomTopic("topic1", new Properties())

Everything is ok if I don't call it, but if I do :

  override def beforeAll(): Unit = try super.beforeAll() finally {
    EmbeddedKafka.start()
    EmbeddedKafka.createCustomTopic("topic1", new Properties())
    EmbeddedKafka.createCustomTopic("topic2", new Properties())
  }
  override def afterAll(): Unit = try super.afterAll() finally {
    EmbeddedKafka.stop()
  }

Then :

[info] KafkaPlayground:
[info] - publish events from topic to topic (5 seconds, 320 milliseconds)
[info] Run completed in 8 seconds, 748 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 9 s, completed May 18, 2016 12:33:51 AM
> 00:33:53.475 WARN  org.apache.zookeeper.ClientCnxn - Session 0x154c0da78020001 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
00:33:53.492 WARN  org.apache.zookeeper.ClientCnxn - Session 0x154c0da78020002 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
00:33:54.706 WARN  org.apache.zookeeper.ClientCnxn - Session 0x154c0da78020002 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused

I believe that after creating the topic this must be called :

org.I0Itec.zkclient.ZkClient#close
Contributor

l15k4 commented May 17, 2016

That didn't help. It is really caused by :

EmbeddedKafka.createCustomTopic("topic1", new Properties())

Everything is ok if I don't call it, but if I do :

  override def beforeAll(): Unit = try super.beforeAll() finally {
    EmbeddedKafka.start()
    EmbeddedKafka.createCustomTopic("topic1", new Properties())
    EmbeddedKafka.createCustomTopic("topic2", new Properties())
  }
  override def afterAll(): Unit = try super.afterAll() finally {
    EmbeddedKafka.stop()
  }

Then :

[info] KafkaPlayground:
[info] - publish events from topic to topic (5 seconds, 320 milliseconds)
[info] Run completed in 8 seconds, 748 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 9 s, completed May 18, 2016 12:33:51 AM
> 00:33:53.475 WARN  org.apache.zookeeper.ClientCnxn - Session 0x154c0da78020001 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
00:33:53.492 WARN  org.apache.zookeeper.ClientCnxn - Session 0x154c0da78020002 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
00:33:54.706 WARN  org.apache.zookeeper.ClientCnxn - Session 0x154c0da78020002 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused

I believe that after creating the topic this must be called :

org.I0Itec.zkclient.ZkClient#close
@l15k4

This comment has been minimized.

Show comment
Hide comment
@l15k4

l15k4 May 17, 2016

Contributor

So instead of :

  def createCustomTopic(topic: String, topicConfig: Properties)(implicit config: EmbeddedKafkaConfig): Unit = {
    AdminUtils.createTopic(ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled), topic, 1, 1, topicConfig)
  }

It must be :

  def createCustomTopic(topic: String, topicConfig: Properties)(implicit config: EmbeddedKafkaConfig): Unit = {
    val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
    try AdminUtils.createTopic(zkUtils, topic, 1, 1, topicConfig) finally zkUtils.close()
  }
Contributor

l15k4 commented May 17, 2016

So instead of :

  def createCustomTopic(topic: String, topicConfig: Properties)(implicit config: EmbeddedKafkaConfig): Unit = {
    AdminUtils.createTopic(ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled), topic, 1, 1, topicConfig)
  }

It must be :

  def createCustomTopic(topic: String, topicConfig: Properties)(implicit config: EmbeddedKafkaConfig): Unit = {
    val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
    try AdminUtils.createTopic(zkUtils, topic, 1, 1, topicConfig) finally zkUtils.close()
  }
@manub

This comment has been minimized.

Show comment
Hide comment
@manub

manub May 17, 2016

Owner

That's good - sorry but I did have limited time today and I couldn't test your snippet. It does make sense to me. Do you want to submit a PR for this?

Owner

manub commented May 17, 2016

That's good - sorry but I did have limited time today and I couldn't test your snippet. It does make sense to me. Do you want to submit a PR for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment