Skip to content

Commit

Permalink
MINOR: Test assign() and assignment() in the integration test (apache…
Browse files Browse the repository at this point in the history
…#14086)

A missing piece from KAFKA-14950. This is to test assign() and assignment() in the integration test.

Also fixed an accidental mistake in the committed API.

Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
philipnee committed Jul 28, 2023
1 parent 19f9e1e commit 811ae01
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,12 @@ AbstractRequest.Builder<?> requestBuilder() {

@Override
public String toString() {
return "UnsentRequest(builder=" + requestBuilder + ")";
return "UnsentRequest{" +
"requestBuilder=" + requestBuilder +
", handler=" + handler +
", node=" + node +
", timer=" + timer +
'}';
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions);
eventHandler.add(event);
try {
return event.complete(Duration.ofMillis(100));
return event.complete(timeout);
} catch (InterruptedException e) {
throw new InterruptException(e);
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package kafka.api

import kafka.utils.TestUtils.waitUntilTrue
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test

import scala.jdk.CollectionConverters._

class BaseAsyncConsumerTest extends AbstractConsumerTest {
@Test
Expand All @@ -29,10 +31,12 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest {
val startingTimestamp = System.currentTimeMillis()
val cb = new CountConsumerCommitCallback
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
consumer.assign(List(tp).asJava)
consumer.commitAsync(cb)
waitUntilTrue(() => {
cb.successCount == 1
}, "wait until commit is completed successfully", 5000)
assertTrue(consumer.assignment.contains(tp))
}

@Test
Expand All @@ -42,6 +46,9 @@ class BaseAsyncConsumerTest extends AbstractConsumerTest {
val numRecords = 10000
val startingTimestamp = System.currentTimeMillis()
sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
consumer.assign(List(tp).asJava)
consumer.commitSync();

assertTrue(consumer.assignment.contains(tp))
}
}

0 comments on commit 811ae01

Please sign in to comment.