Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems running embedded kafka with multiple scalatests #28

Closed
IoDmitri opened this issue Jul 12, 2016 · 9 comments
Closed

Problems running embedded kafka with multiple scalatests #28

IoDmitri opened this issue Jul 12, 2016 · 9 comments

Comments

@IoDmitri
Copy link

Im having issues with embedded kafka when I run it within multiple scalatesst suits. Here is my SO question that displays more of the code and details the issue.

@manub
Copy link
Owner

manub commented Jul 13, 2016

I replied to the SO question - you should probably look into EmbeddedKafka.start() and EmbeddedKafka.stop() on beforeAll and afterAll.

@blbradley
Copy link

@IoDmitri There are certain situations when using this library that could cause your tests to fail because of SBT's parallel execution of tests.

I've found that disabling parallel execution helps with those situations: parallelExecution in Test := false.

@manub
Copy link
Owner

manub commented Jul 19, 2016

@IoDmitri any news on this?

@IoDmitri
Copy link
Author

IoDmitri commented Aug 2, 2016

@manub The issue has been resolved by running before and after each, this prevents the shutdown/startup from bleeding into other test suits.

@manub
Copy link
Owner

manub commented Aug 3, 2016

Brilliant!

@manub manub closed this as completed Aug 3, 2016
@prayagupa
Copy link

prayagupa commented Jan 23, 2017

@manub
Strange, adding start and stop beforeEach and afterEach does not work for me. I also turned off sbt parallelExecution

class KafkaEventConsumerSpecs extends FunSuite with BeforeAndAfterEach {

  implicit val streamingConfig = EmbeddedKafkaConfig(kafkaPort = 9092, zooKeeperPort = 2181)

  override protected def beforeEach(): Unit = {
    EmbeddedKafka.start()
  }

  override protected def afterEach(): Unit = {
    EmbeddedKafka.stop()
  }

 test("consumes 1") {}
 test("consumes 2") {}
}

sbt

name := "es-driver"

version := "1.0"

scalaVersion := "2.11.8"

parallelExecution in Test := false

In above example, the topics in first test are visible to second test.

@manub
Copy link
Owner

manub commented Jan 24, 2017

@prayagupd which version of scalatest-embedded-kafka are you using? You may be hit by a bug I fixed in ca287be.

@prayagupa
Copy link

@manub I'm using 0.11.0 (https://github.com/nihil-os/streaming-driver/blob/master/build.sbt#L29). The issue actually is while running multiple tests at the same time. If I run individually works fine.

Saw

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=testKafkaEventConsumer
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:695)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:584)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)

Seems first test did not clean up the consumer info, as the second test has similar group.id and consumer.id, it inherits consumer position from from test.

@manub
Copy link
Owner

manub commented Jan 25, 2017

It seems to me you're registering twice the same consumerId and Kafka doesn't allow that. If you're running tests in parallel you may want to change consumerId for every test (and potentially groupId to avoid registering the two tests in the same group which will lead to unexpected behaviour).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants