|
37 | 37 | import org.apache.kafka.server.config.ServerLogConfigs;
|
38 | 38 | import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
|
39 | 39 | import org.apache.kafka.storage.internals.log.UnifiedLog;
|
40 |
| -import org.apache.kafka.test.MockProducerInterceptor; |
41 | 40 |
|
42 | 41 | import java.io.File;
|
43 | 42 | import java.nio.charset.StandardCharsets;
|
|
49 | 48 | import java.util.Optional;
|
50 | 49 | import java.util.concurrent.CompletableFuture;
|
51 | 50 | import java.util.concurrent.CopyOnWriteArrayList;
|
| 51 | + |
| 52 | +import java.util.concurrent.atomic.AtomicInteger; |
52 | 53 | import java.util.stream.IntStream;
|
53 | 54 |
|
54 | 55 | import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
|
@@ -165,38 +166,37 @@ public void testSendWithRecreatedTopic() throws Exception {
|
165 | 166 |
|
166 | 167 | @ClusterTest
|
167 | 168 | public void testSendWhileTopicGetRecreated() {
|
168 |
| - int maxNumRecreatTopicAttempts = 20; |
| 169 | + int maxNumRecreatTopicAttempts = 10; |
169 | 170 | List<Uuid> topicIds = new CopyOnWriteArrayList<>();
|
170 |
| - var f = CompletableFuture.runAsync(() -> { |
| 171 | + var recreateTopicFuture = CompletableFuture.runAsync(() -> { |
171 | 172 | for (int i = 1; i <= maxNumRecreatTopicAttempts; i++) {
|
172 | 173 | Uuid topicId = recreateTopic();
|
173 | 174 | if (topicId != Uuid.ZERO_UUID) {
|
174 | 175 | topicIds.add(topicId);
|
175 | 176 | }
|
176 | 177 | }
|
177 | 178 | });
|
178 |
| - Map<String, Object> configs = new HashMap<>(); |
179 |
| - configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName()); |
180 |
| - configs.put(MockProducerInterceptor.APPEND_STRING_PROP, ""); |
181 |
| - configs.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
182 |
| - configs.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
183 |
| - |
184 |
| - var fs = IntStream.range(0, 3).mapToObj(ignored -> CompletableFuture.runAsync(() -> { |
185 |
| - try (var producer = cluster.producer(configs)) { |
| 179 | + |
| 180 | + AtomicInteger numSuccess = new AtomicInteger(0); |
| 181 | + var producerFutures = IntStream.range(0, 2).mapToObj(producerIndex -> CompletableFuture.runAsync(() -> { |
| 182 | + try (var producer = cluster.producer()) { |
186 | 183 | for (int i = 1; i <= numRecords; i++) {
|
187 |
| - producer.send(new ProducerRecord<>(topic, "value")); |
| 184 | + var resp = producer.send(new ProducerRecord<>(topic, null, ("value" + i).getBytes()), |
| 185 | + (metadata, exception) -> { |
| 186 | + if (metadata != null) { |
| 187 | + numSuccess.incrementAndGet(); |
| 188 | + } |
| 189 | + }).get(); |
| 190 | + assertEquals(resp.topic(), topic); |
188 | 191 | }
|
| 192 | + } catch (Exception e) { |
| 193 | + // ignore |
189 | 194 | }
|
190 | 195 | })).toList();
|
191 |
| - f.join(); |
192 |
| - fs.forEach(CompletableFuture::join); |
193 |
| - // Test will recreate topic successfully multiple times, however few recreation might fail. |
194 |
| - assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 3); |
195 |
| - assertEquals(30, MockProducerInterceptor.ON_ACKNOWLEDGEMENT_COUNT.intValue()); |
196 |
| - // Producer will encounter some metadata errors during topic recreation as the topic id wouldn't be accurate |
197 |
| - assertTrue(MockProducerInterceptor.ON_ERROR_COUNT.intValue() != 0); |
198 |
| - // Producer succeed to send data with some records without crashing |
199 |
| - assertTrue(MockProducerInterceptor.ON_SUCCESS_COUNT.intValue() != 0); |
| 196 | + recreateTopicFuture.join(); |
| 197 | + producerFutures.forEach(CompletableFuture::join); |
| 198 | + assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5); |
| 199 | + assertEquals(20, numSuccess.intValue()); |
200 | 200 | }
|
201 | 201 |
|
202 | 202 | @ClusterTest
|
|
0 commit comments