From e20e64641f2eefbd6fbaa2b4d31f72a520024660 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 1 Feb 2023 11:03:29 +0100 Subject: [PATCH] Revert "Dedicate a Kafka producer to each of the publishing SLO buckets (#1490)" (#1492) This reverts commit 3266042803ee4aa0d2988bff9636c9406407ff65. --- .../repository/kafka/KafkaRepositoryAT.java | 13 +--- .../nakadi/webservice/hila/HilaAT.java | 9 +-- .../webservice/hila/HilaRepartitionAT.java | 5 +- .../EventTypeControllerTestCase.java | 2 +- .../nakadi/EventPublishingController.java | 5 +- app/src/main/resources/application.yml | 4 +- .../zalando/nakadi/config/NakadiSettings.java | 7 ++ .../repository/KafkaRepositoryCreator.java | 3 +- .../nakadi/repository/kafka/KafkaFactory.java | 76 ++++++++++--------- .../kafka/KafkaLocationManager.java | 6 +- .../repository/kafka/KafkaSettings.java | 9 +-- .../kafka/KafkaTopicRepository.java | 27 ++----- .../nakadi/service/TracingService.java | 15 ++++ .../org/zalando/nakadi/util/SLOBuckets.java | 19 ----- .../repository/kafka/KafkaFactoryTest.java | 49 +++++------- .../kafka/KafkaTopicRepositoryTest.java | 16 +--- .../publishing/EventPublisherTest.java | 2 +- 17 files changed, 107 insertions(+), 160 deletions(-) delete mode 100644 core-common/src/main/java/org/zalando/nakadi/util/SLOBuckets.java diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index f69d1bc1cf..83d5b5d6cd 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -32,7 +32,6 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.zalando.nakadi.repository.kafka.KafkaTestHelper.createKafkaProperties; public class KafkaRepositoryAT extends BaseAT { @@ -48,6 +47,7 @@ public class KafkaRepositoryAT extends BaseAT { private static final int ZK_SESSION_TIMEOUT = 30000; private static final int ZK_CONNECTION_TIMEOUT = 10000; private static final int ZK_MAX_IN_FLIGHT_REQUESTS = 1000; + private static final int ACTIVE_PRODUCERS_COUNT = 4; private static final int NAKADI_SEND_TIMEOUT = 10000; private static final int NAKADI_POLL_TIMEOUT = 10000; private static final Long DEFAULT_RETENTION_TIME = 100L; @@ -57,7 +57,6 @@ public class KafkaRepositoryAT extends BaseAT { private static final int KAFKA_REQUEST_TIMEOUT = 30000; private static final int KAFKA_DELIVERY_TIMEOUT = 30000; private static final int KAFKA_MAX_BLOCK_TIMEOUT = 5000; - private static final int KAFKA_METADATA_MAX_AGE_MS = 1000; private static final String KAFKA_COMPRESSION_TYPE = "lz4"; private static final int KAFKA_BATCH_SIZE = 1048576; private static final long KAFKA_BUFFER_MEMORY = KAFKA_BATCH_SIZE * 10L; @@ -93,6 +92,7 @@ public void setup() { DEFAULT_TOPIC_RETENTION, DEFAULT_TOPIC_ROTATION, DEFAULT_COMMIT_TIMEOUT, + ACTIVE_PRODUCERS_COUNT, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, TIMELINE_WAIT_TIMEOUT, @@ -109,8 +109,7 @@ public void setup() { kafkaSettings = new KafkaSettings(KAFKA_RETRIES, KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY, KAFKA_LINGER_MS, KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, KAFKA_ENABLE_AUTO_COMMIT, - KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE, - KAFKA_METADATA_MAX_AGE_MS); + KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE); kafkaHelper = new KafkaTestHelper(KAFKA_URL); defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY, Optional.of(DEFAULT_RETENTION_TIME)); @@ -283,11 +282,7 @@ private KafkaTopicRepository createKafkaTopicRepository() { Mockito .doReturn(kafkaHelper.createProducer()) .when(factory) - .takeDefaultProducer(); - Mockito - .doReturn(kafkaHelper.createProducer()) - .when(factory) - .takeProducer(anyString()); + .takeProducer(); return new KafkaTopicRepository.Builder() .setKafkaZookeeper(kafkaZookeeper) diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 70a2636f50..9a9769fe71 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -100,27 +100,20 @@ public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartiti final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(1); NakadiTestUtils.publishBusinessEventWithUserDefinedPartition( eventType.getName(), 1, x -> "{\"foo\":\"bar\"}", p -> "0"); - NakadiTestUtils.repartitionEventType(eventType, 2); - Thread.sleep(1500); - final Subscription subscription = createSubscription( RandomSubscriptionBuilder.builder() .withEventType(eventType.getName()) .withStartFrom(BEGIN) .buildSubscriptionBase()); - final TestStreamingClient clientAfterRepartitioning = TestStreamingClient .create(URL, subscription.getId(), "") .start(); - NakadiTestUtils.publishBusinessEventWithUserDefinedPartition( eventType.getName(), 1, x -> "{\"foo\":\"bar" + x + "\"}", p -> "1"); - waitFor(() -> assertThat(clientAfterRepartitioning.getJsonBatches(), Matchers.hasSize(2))); - Assert.assertTrue(clientAfterRepartitioning.getJsonBatches().stream() - .anyMatch(batch -> batch.getCursor().getPartition().equals("1"))); + .anyMatch(event -> event.getCursor().getPartition().equals("1"))); } @Test(timeout = 10000) diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRepartitionAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRepartitionAT.java index d511312967..febe4c85b3 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRepartitionAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRepartitionAT.java @@ -118,8 +118,6 @@ public void whenEventTypeRepartitionedSubscriptionStartsStreamNewPartitions() th NakadiTestUtils.repartitionEventType(eventType, 2); TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false))); - Thread.sleep(1500); - final TestStreamingClient clientAfterRepartitioning = TestStreamingClient .create(URL, subscription.getId(), "") .startWithAutocommit(streamBatches -> LOG.info("{}", streamBatches)); @@ -155,9 +153,8 @@ public void shouldRepartitionTimelinedEventType() throws Exception { NakadiTestUtils.createTimeline(eventType.getName()); NakadiTestUtils.repartitionEventType(eventType, 2); - TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false))); - Thread.sleep(1500); + TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false))); final TestStreamingClient clientAfterRepartitioning = TestStreamingClient .create(URL, subscription.getId(), "") diff --git a/api-metastore/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java b/api-metastore/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java index 1c10f57c9d..5a6f86adf4 100644 --- a/api-metastore/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java +++ b/api-metastore/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java @@ -102,7 +102,7 @@ public EventTypeControllerTestCase() { @Before public void init() throws Exception { - final NakadiSettings nakadiSettings = new NakadiSettings(32, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, + final NakadiSettings nakadiSettings = new NakadiSettings(32, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, 1, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, 0, NAKADI_EVENT_MAX_BYTES, NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "org/zalando/nakadi", "I am warning you", "I am warning you, even more", "nakadi_archiver", "nakadi_to_s3", 100, 10000); diff --git a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java index 3d608b1cd6..9a5ff2407c 100644 --- a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java +++ b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java @@ -35,7 +35,6 @@ import org.zalando.nakadi.service.publishing.BinaryEventPublisher; import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.publishing.NakadiKpiPublisher; -import org.zalando.nakadi.util.SLOBuckets; import javax.servlet.http.HttpServletRequest; import java.io.IOException; @@ -174,7 +173,7 @@ private ResponseEntity postBinaryEvents(final String eventTypeName, final int eventCount = result.getResponses().size(); final long totalSizeBytes = countingInputStream.getCount(); - TracingService.setTag("slo_bucket", SLOBuckets.getNameForBatchSize(totalSizeBytes)); + TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes)); reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount); reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client); @@ -246,7 +245,7 @@ private ResponseEntity postEventInternal(final String eventTypeName, final EventPublishResult result; final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length; - TracingService.setTag("slo_bucket", SLOBuckets.getNameForBatchSize(totalSizeBytes)); + TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes)); if (delete) { result = publisher.delete(eventsAsString, eventTypeName); diff --git a/app/src/main/resources/application.yml b/app/src/main/resources/application.yml index 1b7d6e6b40..af77696b31 100644 --- a/app/src/main/resources/application.yml +++ b/app/src/main/resources/application.yml @@ -55,6 +55,7 @@ nakadi: maxConnections: 5 maxStreamMemoryBytes: 50000000 # ~50 MB kafka: + producers.count: 1 retries: 0 request.timeout.ms: 30000 instanceType: t2.large @@ -71,7 +72,6 @@ nakadi: enable.auto.commit: false delivery.timeout.ms: 30000 # request.timeout.ms + linger.ms max.block.ms: 5000 # kafka default 60000 - metadata.max.age.ms: 300000 # 5 min, default zookeeper: connectionString: zookeeper://zookeeper:2181 sessionTimeoutMs: 10000 @@ -166,8 +166,6 @@ nakadi: AUDIT_LOG_COLLECTION: true EVENT_OWNER_SELECTOR_AUTHZ: false ACCESS_LOG_ENABLED: true - kafka: - metadata.max.age.ms: 1000 kpi: config: stream-data-collection-frequency-ms: 100 diff --git a/core-common/src/main/java/org/zalando/nakadi/config/NakadiSettings.java b/core-common/src/main/java/org/zalando/nakadi/config/NakadiSettings.java index 5d6d1b80b0..b94b6e3873 100644 --- a/core-common/src/main/java/org/zalando/nakadi/config/NakadiSettings.java +++ b/core-common/src/main/java/org/zalando/nakadi/config/NakadiSettings.java @@ -15,6 +15,7 @@ public class NakadiSettings { private final long defaultTopicRetentionMs; private final long defaultTopicRotationMs; private final long maxCommitTimeout; + private final int kafkaActiveProducersCount; private final long kafkaPollTimeoutMs; private final long kafkaSendTimeoutMs; private final long timelineWaitTimeoutMs; @@ -35,6 +36,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo @Value("${nakadi.topic.default.retentionMs}") final long defaultTopicRetentionMs, @Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs, @Value("${nakadi.stream.max.commitTimeout}") final long maxCommitTimeout, + @Value("${nakadi.kafka.producers.count}") final int kafkaActiveProducersCount, @Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs, @Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs, @Value("${nakadi.timeline.wait.timeoutMs}") final long timelineWaitTimeoutMs, @@ -58,6 +60,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo this.defaultTopicRetentionMs = defaultTopicRetentionMs; this.defaultTopicRotationMs = defaultTopicRotationMs; this.maxCommitTimeout = maxCommitTimeout; + this.kafkaActiveProducersCount = kafkaActiveProducersCount; this.kafkaPollTimeoutMs = kafkaPollTimeoutMs; this.kafkaSendTimeoutMs = kafkaSendTimeoutMs; this.eventMaxBytes = eventMaxBytes; @@ -96,6 +99,10 @@ public long getMaxCommitTimeout() { return maxCommitTimeout; } + public int getKafkaActiveProducersCount() { + return kafkaActiveProducersCount; + } + public long getKafkaPollTimeoutMs() { return kafkaPollTimeoutMs; } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java b/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java index 1ff1618635..b342a97447 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java @@ -64,7 +64,8 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic zookeeperSettings.getZkConnectionTimeoutMs(), nakadiSettings); final KafkaLocationManager kafkaLocationManager = new KafkaLocationManager(zooKeeperHolder, kafkaSettings); - final KafkaFactory kafkaFactory = new KafkaFactory(kafkaLocationManager); + final KafkaFactory kafkaFactory = new KafkaFactory(kafkaLocationManager, + nakadiSettings.getKafkaActiveProducersCount()); final KafkaZookeeper zk = new KafkaZookeeper(zooKeeperHolder, objectMapper); final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository.Builder() diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaFactory.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaFactory.java index fa88ee30a4..cfb840f038 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaFactory.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaFactory.java @@ -13,51 +13,56 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class KafkaFactory { - public static final String DEFAULT_PRODUCER_CLIENT_ID = "default"; - private static final Logger LOG = LoggerFactory.getLogger(KafkaFactory.class); private final KafkaLocationManager kafkaLocationManager; - + private final Map, AtomicInteger> useCount; private final ReadWriteLock rwLock; - private final Map, AtomicInteger> useCountByProducer; - private final Map> activeProducerByClientId; + private final List> activeProducers; + private final AtomicLong activeProducerCounter; - public KafkaFactory(final KafkaLocationManager kafkaLocationManager) { + public KafkaFactory(final KafkaLocationManager kafkaLocationManager, final int numActiveProducers) { this.kafkaLocationManager = kafkaLocationManager; + this.useCount = new ConcurrentHashMap<>(); this.rwLock = new ReentrantReadWriteLock(); - this.useCountByProducer = new ConcurrentHashMap<>(); - this.activeProducerByClientId = new HashMap<>(); + + this.activeProducers = new ArrayList<>(numActiveProducers); + for (int i = 0; i < numActiveProducers; ++i) { + this.activeProducers.add(null); + } + + this.activeProducerCounter = new AtomicLong(0); } @Nullable - private Producer takeUnderLock(final String clientId, final boolean canCreate) { + private Producer takeUnderLock(final int index, final boolean canCreate) { final Lock lock = canCreate ? rwLock.writeLock() : rwLock.readLock(); lock.lock(); try { - Producer producer = activeProducerByClientId.get(clientId); + Producer producer = activeProducers.get(index); if (null != producer) { - useCountByProducer.get(producer).incrementAndGet(); + useCount.get(producer).incrementAndGet(); return producer; } else if (canCreate) { - producer = createProducerInstance(clientId); - useCountByProducer.put(producer, new AtomicInteger(1)); - activeProducerByClientId.put(clientId, producer); + producer = createProducerInstance(); + useCount.put(producer, new AtomicInteger(1)); + activeProducers.set(index, producer); - LOG.info("New producer instance created with client id '{}': {}", clientId, producer); + LOG.info("New producer instance created: " + producer); return producer; } else { return null; @@ -73,18 +78,16 @@ private Producer takeUnderLock(final String clientId, final bool * * @return Initialized kafka producer instance. */ - public Producer takeProducer(final String clientId) { - Producer result = takeUnderLock(clientId, false); + public Producer takeProducer() { + final int index = (int)(activeProducerCounter.incrementAndGet() % activeProducers.size()); + + Producer result = takeUnderLock(index, false); if (null == result) { - result = takeUnderLock(clientId, true); + result = takeUnderLock(index, true); } return result; } - public Producer takeDefaultProducer() { - return takeProducer(DEFAULT_PRODUCER_CLIENT_ID); - } - /** * Release kafka producer that was obtained by {@link #takeProducer()} method. If producer was not obtained by * {@link #takeProducer()} call - method will throw {@link NullPointerException} @@ -92,21 +95,21 @@ public Producer takeDefaultProducer() { * @param producer Producer to release. */ public void releaseProducer(final Producer producer) { - final AtomicInteger counter = useCountByProducer.get(producer); + final AtomicInteger counter = useCount.get(producer); if (counter != null && 0 == counter.decrementAndGet()) { final boolean deleteProducer; rwLock.readLock().lock(); try { - deleteProducer = !activeProducerByClientId.containsValue(producer); + deleteProducer = !activeProducers.contains(producer); } finally { rwLock.readLock().unlock(); } if (deleteProducer) { rwLock.writeLock().lock(); try { - if (counter.get() == 0 && null != useCountByProducer.remove(producer)) { + if (counter.get() == 0 && null != useCount.remove(producer)) { LOG.info("Stopping producer instance - It was reported that instance should be refreshed " + - "and it is not used anymore: {}", producer); + "and it is not used anymore: " + producer); producer.close(); } } finally { @@ -126,15 +129,14 @@ public void releaseProducer(final Producer producer) { * @param producer Producer instance to terminate. */ public void terminateProducer(final Producer producer) { - LOG.info("Received signal to terminate producer: {}", producer); + LOG.info("Received signal to terminate producer " + producer); rwLock.writeLock().lock(); try { - final Optional clientId = activeProducerByClientId.entrySet().stream() - .filter(kv -> kv.getValue() == producer) - .map(Map.Entry::getKey) - .findFirst(); - if (clientId.isPresent()) { - activeProducerByClientId.remove(clientId.get()); + final int index = activeProducers.indexOf(producer); + if (index >= 0) { + activeProducers.set(index, null); + } else { + LOG.info("Signal for producer termination already received: " + producer); } } finally { rwLock.writeLock().unlock(); @@ -208,8 +210,8 @@ public void close() { } } - protected Producer createProducerInstance(final String clientId) { - return new KafkaProducerCrutch(kafkaLocationManager.getKafkaProducerProperties(clientId), + protected Producer createProducerInstance() { + return new KafkaProducerCrutch(kafkaLocationManager.getKafkaProducerProperties(), new KafkaCrutch(kafkaLocationManager)); } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java index 298631e0dc..f0d67319df 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java @@ -116,11 +116,8 @@ public Properties getKafkaConsumerProperties() { return properties; } - public Properties getKafkaProducerProperties(final String clientId) { + public Properties getKafkaProducerProperties() { final Properties producerProps = (Properties) kafkaProperties.clone(); - - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, @@ -138,7 +135,6 @@ public Properties getKafkaProducerProperties(final String clientId) { producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaSettings.getMaxRequestSize()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaSettings.getDeliveryTimeoutMs()); producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, kafkaSettings.getMaxBlockMs()); - producerProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, kafkaSettings.getMetadataMaxAgeMs()); return producerProps; } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java index 59baeb0f31..728d3899bc 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java @@ -25,7 +25,6 @@ public class KafkaSettings { private final int maxBlockMs; private final String clientRack; private final String compressionType; - private final int metadataMaxAgeMs; @Autowired public KafkaSettings(@Value("${nakadi.kafka.retries}") final int retries, @@ -39,8 +38,7 @@ public KafkaSettings(@Value("${nakadi.kafka.retries}") final int retries, @Value("${nakadi.kafka.delivery.timeout.ms}") final int deliveryTimeoutMs, @Value("${nakadi.kafka.max.block.ms}") final int maxBlockMs, @Value("${nakadi.kafka.client.rack:}") final String clientRack, - @Value("${nakadi.kafka.compression.type:lz4}") final String compressionType, - @Value("${nakadi.kafka.metadata.max.age.ms}") final int metadataMaxAgeMs) { + @Value("${nakadi.kafka.compression.type:lz4}") final String compressionType) { this.retries = retries; this.requestTimeoutMs = requestTimeoutMs; this.batchSize = batchSize; @@ -53,7 +51,6 @@ public KafkaSettings(@Value("${nakadi.kafka.retries}") final int retries, this.maxBlockMs = maxBlockMs; this.clientRack = clientRack; this.compressionType = compressionType; - this.metadataMaxAgeMs = metadataMaxAgeMs; } public int getRetries() { @@ -103,8 +100,4 @@ public String getClientRack() { public String getCompressionType() { return compressionType; } - - public int getMetadataMaxAgeMs() { - return metadataMaxAgeMs; - } } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 4d165ece8d..bc88aa2ed1 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -52,7 +52,6 @@ import org.zalando.nakadi.repository.NakadiTopicConfig; import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.service.TracingService; -import org.zalando.nakadi.util.SLOBuckets; import javax.annotation.Nullable; import java.io.Closeable; @@ -219,23 +218,21 @@ public void repartition(final String topic, final int partitionsNumber) throws C TopicConfigException { try (AdminClient adminClient = AdminClient.create(kafkaLocationManager.getProperties())) { adminClient.createPartitions(ImmutableMap.of(topic, NewPartitions.increaseTo(partitionsNumber))); - final long timeoutMillis = TimeUnit.SECONDS.toMillis(5); final Boolean areNewPartitionsAdded = Retryer.executeWithRetry(() -> { try (Consumer consumer = kafkaFactory.getConsumer()) { - final List partitions = consumer.partitionsFor(topic); - LOG.info("Repartitioning topic {} partitions: {}, expected: {}", - topic, partitions.size(), partitionsNumber); - return partitions.size() == partitionsNumber; + return consumer.partitionsFor(topic).size() == partitionsNumber; } }, new RetryForSpecifiedTimeStrategy(timeoutMillis) - .withWaitBetweenEachTry(1000L) + .withWaitBetweenEachTry(100L) .withResultsThatForceRetry(Boolean.FALSE)); - if (!Boolean.TRUE.equals(areNewPartitionsAdded)) { throw new TopicConfigException(String.format("Failed to repartition topic to %s", partitionsNumber)); } + final Producer producer = kafkaFactory.takeProducer(); + kafkaFactory.terminateProducer(producer); + kafkaFactory.releaseProducer(producer); } catch (Exception e) { throw new CannotAddPartitionToTopicException(String .format("Failed to increase the number of partition for %s topic to %s", topic, @@ -307,14 +304,7 @@ public boolean topicExists(final String topic) throws TopicRepositoryException { public void syncPostBatch( final String topicId, final List batch, final String eventType, final boolean delete) throws EventPublishingException { - - long totalRawSize = 0; - for (final BatchItem item : batch) { - totalRawSize += item.getEventSize(); - } - final String sloBucketName = SLOBuckets.getNameForBatchSize(totalRawSize); - - final Producer producer = kafkaFactory.takeProducer(sloBucketName); + final Producer producer = kafkaFactory.takeProducer(); try { final Map partitionToBroker = producer.partitionsFor(topicId).stream() .filter(partitionInfo -> partitionInfo.leader() != null) @@ -453,8 +443,7 @@ private void logFailedEvents(final String topicId, final String eventType, final */ public List sendEvents(final String topic, final List nakadiRecords) { - - final Producer producer = kafkaFactory.takeDefaultProducer(); + final Producer producer = kafkaFactory.takeProducer(); final CountDownLatch latch = new CountDownLatch(nakadiRecords.size()); final Map responses = new ConcurrentHashMap<>(); try { @@ -715,7 +704,7 @@ public Map getSizeStats() { } public List listPartitionNamesInternal(final String topicId) { - final Producer producer = kafkaFactory.takeDefaultProducer(); + final Producer producer = kafkaFactory.takeProducer(); try { return unmodifiableList(producer.partitionsFor(topicId) .stream() diff --git a/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java b/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java index a4719d787d..4d2572553a 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java @@ -17,8 +17,23 @@ import static io.opentracing.propagation.Format.Builtin.TEXT_MAP; public class TracingService { + private static final String BUCKET_NAME_5_KB = "<5K"; + private static final String BUCKET_NAME_5_50_KB = "5K-50K"; + private static final String BUCKET_NAME_MORE_THAN_50_KB = ">50K"; + + private static final long BUCKET_5_KB = 5000L; + private static final long BUCKET_50_KB = 50000L; public static final String ERROR_DESCRIPTION = "error.description"; + public static String getSLOBucketName(final long batchSize) { + if (batchSize > BUCKET_50_KB) { + return BUCKET_NAME_MORE_THAN_50_KB; + } else if (batchSize < BUCKET_5_KB) { + return BUCKET_NAME_5_KB; + } + return BUCKET_NAME_5_50_KB; + } + public static Tracer.SpanBuilder buildNewSpan(final String operationName) { return GlobalTracer.get().buildSpan(operationName); } diff --git a/core-common/src/main/java/org/zalando/nakadi/util/SLOBuckets.java b/core-common/src/main/java/org/zalando/nakadi/util/SLOBuckets.java deleted file mode 100644 index 7a88fe95c9..0000000000 --- a/core-common/src/main/java/org/zalando/nakadi/util/SLOBuckets.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.zalando.nakadi.util; - -public class SLOBuckets { - private static final String BUCKET_NAME_5_KB = "<5K"; - private static final String BUCKET_NAME_5_50_KB = "5K-50K"; - private static final String BUCKET_NAME_MORE_THAN_50_KB = ">50K"; - - private static final long BUCKET_5_KB = 5000L; - private static final long BUCKET_50_KB = 50000L; - - public static String getNameForBatchSize(final long batchSize) { - if (batchSize > BUCKET_50_KB) { - return BUCKET_NAME_MORE_THAN_50_KB; - } else if (batchSize < BUCKET_5_KB) { - return BUCKET_NAME_5_KB; - } - return BUCKET_NAME_5_50_KB; - } -} diff --git a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaFactoryTest.java b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaFactoryTest.java index 84e7203bea..5a824a6c57 100644 --- a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaFactoryTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaFactoryTest.java @@ -12,27 +12,27 @@ public class KafkaFactoryTest { private static class FakeKafkaFactory extends KafkaFactory { - FakeKafkaFactory() { - super(null); + FakeKafkaFactory(final int numActiveProducers) { + super(null, numActiveProducers); } @Override - protected Producer createProducerInstance(final String clientId) { + protected Producer createProducerInstance() { return Mockito.mock(Producer.class); } } @Test public void whenSingleProducerThenTheSameProducerIsGiven() { - final KafkaFactory factory = new FakeKafkaFactory(); - final Producer producer1 = factory.takeDefaultProducer(); + final KafkaFactory factory = new FakeKafkaFactory(1); + final Producer producer1 = factory.takeProducer(); try { Assert.assertNotNull(producer1); } finally { factory.releaseProducer(producer1); } - final Producer producer2 = factory.takeDefaultProducer(); + final Producer producer2 = factory.takeProducer(); try { Assert.assertSame(producer1, producer2); } finally { @@ -42,10 +42,10 @@ public void whenSingleProducerThenTheSameProducerIsGiven() { @Test public void verifySingleProducerIsClosedAtCorrectTime() { - final KafkaFactory factory = new FakeKafkaFactory(); + final KafkaFactory factory = new FakeKafkaFactory(1); final List> producers1 = IntStream.range(0, 10) - .mapToObj(ignore -> factory.takeDefaultProducer()).collect(Collectors.toList()); + .mapToObj(ignore -> factory.takeProducer()).collect(Collectors.toList()); final Producer producer = producers1.get(0); Assert.assertNotNull(producer); producers1.forEach(p -> Assert.assertSame(producer, p)); @@ -55,8 +55,8 @@ public void verifySingleProducerIsClosedAtCorrectTime() { final List> producers2 = IntStream.range(0, 10) - .mapToObj(ignore -> factory.takeDefaultProducer()).collect(Collectors.toList()); - final Producer additionalProducer = factory.takeDefaultProducer(); + .mapToObj(ignore -> factory.takeProducer()).collect(Collectors.toList()); + final Producer additionalProducer = factory.takeProducer(); Assert.assertSame(producer, additionalProducer); producers2.forEach(p -> Assert.assertSame(producer, p)); @@ -73,14 +73,14 @@ public void verifySingleProducerIsClosedAtCorrectTime() { @Test public void verifyNewProducerCreatedAfterCloseOfSingle() { - final KafkaFactory factory = new FakeKafkaFactory(); - final Producer producer1 = factory.takeDefaultProducer(); + final KafkaFactory factory = new FakeKafkaFactory(1); + final Producer producer1 = factory.takeProducer(); Assert.assertNotNull(producer1); factory.terminateProducer(producer1); factory.releaseProducer(producer1); Mockito.verify(producer1, Mockito.times(1)).close(); - final Producer producer2 = factory.takeDefaultProducer(); + final Producer producer2 = factory.takeProducer(); Assert.assertNotNull(producer2); Assert.assertNotSame(producer1, producer2); factory.releaseProducer(producer2); @@ -88,24 +88,13 @@ public void verifyNewProducerCreatedAfterCloseOfSingle() { } @Test - public void testTakingProducerForTheSameOrDifferentKey() { - final KafkaFactory factory = new FakeKafkaFactory(); + public void testGoldenPathWithManyActiveProducers() { + final KafkaFactory factory = new FakeKafkaFactory(4); - final Producer producer1 = factory.takeProducer("key1"); - Assert.assertNotNull(producer1); - - final Producer producer2 = factory.takeProducer("key2"); - Assert.assertNotNull(producer2); - - Assert.assertNotSame(producer1, producer2); + final List> producers = IntStream.range(0, 10) + .mapToObj(ignore -> factory.takeProducer()).collect(Collectors.toList()); - final Producer producer3 = factory.takeProducer("key1"); - Assert.assertNotNull(producer3); - - Assert.assertSame(producer3, producer1); - - factory.releaseProducer(producer1); - factory.releaseProducer(producer2); - factory.releaseProducer(producer3); + producers.forEach(Assert::assertNotNull); + producers.forEach(factory::releaseProducer); } } diff --git a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index f3e71555ea..a9859b3905 100644 --- a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -119,7 +119,6 @@ private enum ConsumerOffsetMode { private final KafkaTopicRepository kafkaTopicRepository; private final KafkaProducer kafkaProducer; - private final KafkaProducer defaultKafkaProducer; private final KafkaFactory kafkaFactory; @SuppressWarnings("unchecked") @@ -128,12 +127,6 @@ public KafkaTopicRepositoryTest() throws IOException { when(kafkaProducer.partitionsFor(anyString())).then( invocation -> partitionsOfTopic((String) invocation.getArguments()[0]) ); - - defaultKafkaProducer = mock(KafkaProducer.class); - when(defaultKafkaProducer.partitionsFor(anyString())).then( - invocation -> partitionsOfTopic((String) invocation.getArguments()[0]) - ); - nakadiRecordMapper = TestUtils.getNakadiRecordMapper(); kafkaFactory = createKafkaFactory(); kafkaTopicRepository = createKafkaRepository(kafkaFactory); @@ -456,7 +449,7 @@ public void testSendNakadiRecordsOk() { final var nakadiRecord = getTestNakadiRecord("0"); final List nakadiRecords = Lists.newArrayList(nakadiRecord, nakadiRecord, nakadiRecord); - when(defaultKafkaProducer.send(any(), any())).thenAnswer(invocation -> { + when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> { final Callback callback = (Callback) invocation.getArguments()[1]; callback.onCompletion(null, null); return null; @@ -478,7 +471,7 @@ public void testSendNakadiRecordsHalfPublished() throws IOException { getTestNakadiRecord("3")); final Exception exception = new Exception(); - when(defaultKafkaProducer.send(any(), any())).thenAnswer(invocation -> { + when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> { final ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; final Callback callback = (Callback) invocation.getArguments()[1]; if (record.partition() % 2 == 0) { @@ -512,7 +505,7 @@ public void testSendNakadiRecordsHalfSubmitted() throws IOException { getTestNakadiRecord("3")); final KafkaException exception = new KafkaException(); - when(defaultKafkaProducer.send(any(), any())).thenAnswer(invocation -> { + when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> { final ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; final Callback callback = (Callback) invocation.getArguments()[1]; if (record.partition() <= 1) { @@ -603,8 +596,7 @@ private KafkaFactory createKafkaFactory() { when(kafkaFactory.getConsumer(KAFKA_CLIENT_ID)).thenReturn(consumer); when(kafkaFactory.getConsumer()).thenReturn(consumer); - when(kafkaFactory.takeDefaultProducer()).thenReturn(defaultKafkaProducer); - when(kafkaFactory.takeProducer(anyString())).thenReturn(kafkaProducer); + when(kafkaFactory.takeProducer()).thenReturn(kafkaProducer); return kafkaFactory; } diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java index e0f8c7ce87..0b8348151a 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java @@ -91,7 +91,7 @@ public class EventPublisherTest { protected final Enrichment enrichment = mock(Enrichment.class); protected final AuthorizationValidator authzValidator = mock(AuthorizationValidator.class); protected final TimelineService timelineService = Mockito.mock(TimelineService.class); - protected final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, + protected final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, 1, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, TIMELINE_WAIT_TIMEOUT_MS, NAKADI_EVENT_MAX_BYTES, NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "org/zalando/nakadi", "", "", "nakadi_archiver", "nakadi_to_s3", 100, 10000);