Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EmbeddedKafkaBroker#consumeFromEmbeddedTopics ignores latest offset reset strategy #2061

Closed
coney opened this issue Jan 8, 2022 · 3 comments · Fixed by #2064
Closed

EmbeddedKafkaBroker#consumeFromEmbeddedTopics ignores latest offset reset strategy #2061

coney opened this issue Jan 8, 2022 · 3 comments · Fixed by #2064
Milestone

Comments

@coney
Copy link

coney commented Jan 8, 2022

According to the document https://docs.spring.io/spring-kafka/reference/html/#junit it says:

Starting with version 2.5, the consumerProps method sets the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest. This is because, in most cases, you want the consumer to consume any messages sent in a test case. The ConsumerConfig default is latest which means that messages already sent by a test, before the consumer starts, will not receive those records. To revert to the previous behavior, set the property to latest after calling the method.

I created my consumer and manually overrides the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to latest as below:

    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(generateConsumerGroupName(), "true",
            embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        consumer = new DefaultKafkaConsumerFactory<>(consumerProps, keyDeserializer, valueDeserializer)
            .createConsumer();

        // start consuming before any records are written to kafka, make sure offset moves to latest
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);

But implementation of EmbeddedKafkaBroker#consumeFromEmbeddedTopics, it tries to poll records and reset the offset to earilest if there are records present:

public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume) {
               ...
		ConsumerRecords<?, ?> records = null;
		int n = 0;
		while (!assigned.get() && n++ < 600) { // NOSONAR magic #
			records = consumer.poll(Duration.ofMillis(100)); // force assignment NOSONAR magic #
		}
		if (records != null && records.count() > 0) {
			final ConsumerRecords<?, ?> theRecords = records;
			logger.debug(() -> "Records received on initial poll for assignment; re-seeking to beginning; "
					+ theRecords.partitions().stream()
					.flatMap(p -> theRecords.records(p).stream())
					// map to same format as send metadata toString()
					.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
					.collect(Collectors.toList()));
                        // the line below ignores the latest strategy and always seek to the beginning
			consumer.seekToBeginning(records.partitions());
		}
               ...
	}

Could EmbeddedKafkaBroker respects the offset reset config or provides options that just create consumer but not trying to pull the records and reset the offset?

@garyrussell
Copy link
Contributor

Yes, this is wrong, but it is extremely unusual to write a test that wants to ignore records published by the same test (so unusual, nobody has complained in nearly 4 years since this was added).

Please explain the use case for why you want the test to ignore records published in the same test.

@garyrussell garyrussell added this to the 2.8.2 milestone Jan 8, 2022
@coney
Copy link
Author

coney commented Jan 9, 2022

Indeed it shouldn't ignore records from current test.

In my scenario, I have an async task which could produce records, the previous test triggerred the async task but didn't wait for the task to be completed. And in the following test the async task finally produced a record and ran into the code snippet I pasted above.

Actually After I have fixed the test by waiting for the async task, all my test passed without changing the behavior of EmbeddedKafkaBroker#consumeFromEmbeddedTopics , but reset the offset to the eariliest do confuse me for a while.

I have serveral tests work with same embedded kafka and same topic. It's hard to clear(remove) topics from kafka after each test. So I manually set the offset strategy to the latest to make sure current test only read newly produced records.
I am wondering whether there is a better way to isolate the impacts of different tests.

@garyrussell
Copy link
Contributor

Of course, we'll fix the bug, but it is generally best practice to use a different topic for each test to avoid this kind of contamination.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants