|
37 | 37 | import org.apache.kafka.server.config.ServerLogConfigs;
|
38 | 38 | import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
|
39 | 39 | import org.apache.kafka.storage.internals.log.UnifiedLog;
|
40 |
| -import org.apache.kafka.test.MockProducerInterceptor; |
| 40 | +import org.junit.jupiter.api.Timeout; |
41 | 41 |
|
42 | 42 | import java.io.File;
|
43 | 43 | import java.nio.charset.StandardCharsets;
|
|
49 | 49 | import java.util.Optional;
|
50 | 50 | import java.util.concurrent.CompletableFuture;
|
51 | 51 | import java.util.concurrent.CopyOnWriteArrayList;
|
| 52 | +import java.util.concurrent.atomic.AtomicInteger; |
52 | 53 | import java.util.stream.IntStream;
|
53 | 54 |
|
54 | 55 | import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
|
@@ -163,40 +164,40 @@ public void testSendWithRecreatedTopic() throws Exception {
|
163 | 164 | }
|
164 | 165 | }
|
165 | 166 |
|
| 167 | + @Timeout(90) |
166 | 168 | @ClusterTest
|
167 | 169 | public void testSendWhileTopicGetRecreated() {
|
168 |
| - int maxNumRecreatTopicAttempts = 20; |
| 170 | + int maxNumRecreatTopicAttempts = 10; |
169 | 171 | List<Uuid> topicIds = new CopyOnWriteArrayList<>();
|
170 |
| - var f = CompletableFuture.runAsync(() -> { |
| 172 | + var recreateTopicFuture = CompletableFuture.runAsync(() -> { |
171 | 173 | for (int i = 1; i <= maxNumRecreatTopicAttempts; i++) {
|
172 | 174 | Uuid topicId = recreateTopic();
|
173 | 175 | if (topicId != Uuid.ZERO_UUID) {
|
174 | 176 | topicIds.add(topicId);
|
175 | 177 | }
|
176 | 178 | }
|
177 | 179 | });
|
178 |
| - Map<String, Object> configs = new HashMap<>(); |
179 |
| - configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName()); |
180 |
| - configs.put(MockProducerInterceptor.APPEND_STRING_PROP, ""); |
181 |
| - configs.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
182 |
| - configs.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); |
183 |
| - |
184 |
| - var fs = IntStream.range(0, 3).mapToObj(ignored -> CompletableFuture.runAsync(() -> { |
185 |
| - try (var producer = cluster.producer(configs)) { |
| 180 | + |
| 181 | + AtomicInteger numSuccess = new AtomicInteger(0); |
| 182 | + var producerFutures = IntStream.range(0, 2).mapToObj(producerIndex -> CompletableFuture.runAsync(() -> { |
| 183 | + try (var producer = cluster.producer()) { |
186 | 184 | for (int i = 1; i <= numRecords; i++) {
|
187 |
| - producer.send(new ProducerRecord<>(topic, "value")); |
| 185 | + var resp = producer.send(new ProducerRecord<>(topic, null, ("value" + i).getBytes()), |
| 186 | + (metadata, exception) -> { |
| 187 | + if (metadata != null) { |
| 188 | + numSuccess.incrementAndGet(); |
| 189 | + } |
| 190 | + }).get(); |
| 191 | + assertEquals(resp.topic(), topic); |
188 | 192 | }
|
| 193 | + } catch (Exception e) { |
| 194 | + // ignore |
189 | 195 | }
|
190 | 196 | })).toList();
|
191 |
| - f.join(); |
192 |
| - fs.forEach(CompletableFuture::join); |
193 |
| - // Test will recreate topic successfully multiple times, however few recreation might fail. |
194 |
| - assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 3); |
195 |
| - assertEquals(30, MockProducerInterceptor.ON_ACKNOWLEDGEMENT_COUNT.intValue()); |
196 |
| - // Producer will encounter some metadata errors during topic recreation as the topic id wouldn't be accurate |
197 |
| - assertTrue(MockProducerInterceptor.ON_ERROR_COUNT.intValue() != 0); |
198 |
| - // Producer succeed to send data with some records without crashing |
199 |
| - assertTrue(MockProducerInterceptor.ON_SUCCESS_COUNT.intValue() != 0); |
| 197 | + recreateTopicFuture.join(); |
| 198 | + producerFutures.forEach(CompletableFuture::join); |
| 199 | + assertTrue(Math.abs(maxNumRecreatTopicAttempts - topicIds.size()) <= 5); |
| 200 | + assertEquals(20, numSuccess.intValue()); |
200 | 201 | }
|
201 | 202 |
|
202 | 203 | @ClusterTest
|
|
0 commit comments