Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Revert "Dedicate a Kafka producer to each of the publishing SLO bucke…
…ts (#1490)" (#1492)

This reverts commit 3266042.
  • Loading branch information
adyach committed Feb 1, 2023
1 parent 8019fca commit e20e646
Show file tree
Hide file tree
Showing 17 changed files with 107 additions and 160 deletions.
Expand Up @@ -32,7 +32,6 @@
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.zalando.nakadi.repository.kafka.KafkaTestHelper.createKafkaProperties;

public class KafkaRepositoryAT extends BaseAT {
Expand All @@ -48,6 +47,7 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int ZK_SESSION_TIMEOUT = 30000;
private static final int ZK_CONNECTION_TIMEOUT = 10000;
private static final int ZK_MAX_IN_FLIGHT_REQUESTS = 1000;
private static final int ACTIVE_PRODUCERS_COUNT = 4;
private static final int NAKADI_SEND_TIMEOUT = 10000;
private static final int NAKADI_POLL_TIMEOUT = 10000;
private static final Long DEFAULT_RETENTION_TIME = 100L;
Expand All @@ -57,7 +57,6 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int KAFKA_REQUEST_TIMEOUT = 30000;
private static final int KAFKA_DELIVERY_TIMEOUT = 30000;
private static final int KAFKA_MAX_BLOCK_TIMEOUT = 5000;
private static final int KAFKA_METADATA_MAX_AGE_MS = 1000;
private static final String KAFKA_COMPRESSION_TYPE = "lz4";
private static final int KAFKA_BATCH_SIZE = 1048576;
private static final long KAFKA_BUFFER_MEMORY = KAFKA_BATCH_SIZE * 10L;
Expand Down Expand Up @@ -93,6 +92,7 @@ public void setup() {
DEFAULT_TOPIC_RETENTION,
DEFAULT_TOPIC_ROTATION,
DEFAULT_COMMIT_TIMEOUT,
ACTIVE_PRODUCERS_COUNT,
NAKADI_POLL_TIMEOUT,
NAKADI_SEND_TIMEOUT,
TIMELINE_WAIT_TIMEOUT,
Expand All @@ -109,8 +109,7 @@ public void setup() {

kafkaSettings = new KafkaSettings(KAFKA_RETRIES, KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY,
KAFKA_LINGER_MS, KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, KAFKA_ENABLE_AUTO_COMMIT,
KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE,
KAFKA_METADATA_MAX_AGE_MS);
KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE);
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY,
Optional.of(DEFAULT_RETENTION_TIME));
Expand Down Expand Up @@ -283,11 +282,7 @@ private KafkaTopicRepository createKafkaTopicRepository() {
Mockito
.doReturn(kafkaHelper.createProducer())
.when(factory)
.takeDefaultProducer();
Mockito
.doReturn(kafkaHelper.createProducer())
.when(factory)
.takeProducer(anyString());
.takeProducer();

return new KafkaTopicRepository.Builder()
.setKafkaZookeeper(kafkaZookeeper)
Expand Down
Expand Up @@ -100,27 +100,20 @@ public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartiti
final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(1);
NakadiTestUtils.publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 1, x -> "{\"foo\":\"bar\"}", p -> "0");

NakadiTestUtils.repartitionEventType(eventType, 2);
Thread.sleep(1500);

final Subscription subscription = createSubscription(
RandomSubscriptionBuilder.builder()
.withEventType(eventType.getName())
.withStartFrom(BEGIN)
.buildSubscriptionBase());

final TestStreamingClient clientAfterRepartitioning = TestStreamingClient
.create(URL, subscription.getId(), "")
.start();

NakadiTestUtils.publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 1, x -> "{\"foo\":\"bar" + x + "\"}", p -> "1");

waitFor(() -> assertThat(clientAfterRepartitioning.getJsonBatches(), Matchers.hasSize(2)));

Assert.assertTrue(clientAfterRepartitioning.getJsonBatches().stream()
.anyMatch(batch -> batch.getCursor().getPartition().equals("1")));
.anyMatch(event -> event.getCursor().getPartition().equals("1")));
}

@Test(timeout = 10000)
Expand Down
Expand Up @@ -118,8 +118,6 @@ public void whenEventTypeRepartitionedSubscriptionStartsStreamNewPartitions() th
NakadiTestUtils.repartitionEventType(eventType, 2);
TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false)));

Thread.sleep(1500);

final TestStreamingClient clientAfterRepartitioning = TestStreamingClient
.create(URL, subscription.getId(), "")
.startWithAutocommit(streamBatches -> LOG.info("{}", streamBatches));
Expand Down Expand Up @@ -155,9 +153,8 @@ public void shouldRepartitionTimelinedEventType() throws Exception {
NakadiTestUtils.createTimeline(eventType.getName());

NakadiTestUtils.repartitionEventType(eventType, 2);
TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false)));

Thread.sleep(1500);
TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false)));

final TestStreamingClient clientAfterRepartitioning = TestStreamingClient
.create(URL, subscription.getId(), "")
Expand Down
Expand Up @@ -102,7 +102,7 @@ public EventTypeControllerTestCase() {
@Before
public void init() throws Exception {

final NakadiSettings nakadiSettings = new NakadiSettings(32, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60,
final NakadiSettings nakadiSettings = new NakadiSettings(32, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, 1,
NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, 0, NAKADI_EVENT_MAX_BYTES,
NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "org/zalando/nakadi", "I am warning you",
"I am warning you, even more", "nakadi_archiver", "nakadi_to_s3", 100, 10000);
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.zalando.nakadi.service.publishing.BinaryEventPublisher;
import org.zalando.nakadi.service.publishing.EventPublisher;
import org.zalando.nakadi.service.publishing.NakadiKpiPublisher;
import org.zalando.nakadi.util.SLOBuckets;

import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
Expand Down Expand Up @@ -174,7 +173,7 @@ private ResponseEntity postBinaryEvents(final String eventTypeName,
final int eventCount = result.getResponses().size();

final long totalSizeBytes = countingInputStream.getCount();
TracingService.setTag("slo_bucket", SLOBuckets.getNameForBatchSize(totalSizeBytes));
TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes));

reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount);
reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client);
Expand Down Expand Up @@ -246,7 +245,7 @@ private ResponseEntity postEventInternal(final String eventTypeName,
final EventPublishResult result;

final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length;
TracingService.setTag("slo_bucket", SLOBuckets.getNameForBatchSize(totalSizeBytes));
TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes));

if (delete) {
result = publisher.delete(eventsAsString, eventTypeName);
Expand Down
4 changes: 1 addition & 3 deletions app/src/main/resources/application.yml
Expand Up @@ -55,6 +55,7 @@ nakadi:
maxConnections: 5
maxStreamMemoryBytes: 50000000 # ~50 MB
kafka:
producers.count: 1
retries: 0
request.timeout.ms: 30000
instanceType: t2.large
Expand All @@ -71,7 +72,6 @@ nakadi:
enable.auto.commit: false
delivery.timeout.ms: 30000 # request.timeout.ms + linger.ms
max.block.ms: 5000 # kafka default 60000
metadata.max.age.ms: 300000 # 5 min, default
zookeeper:
connectionString: zookeeper://zookeeper:2181
sessionTimeoutMs: 10000
Expand Down Expand Up @@ -166,8 +166,6 @@ nakadi:
AUDIT_LOG_COLLECTION: true
EVENT_OWNER_SELECTOR_AUTHZ: false
ACCESS_LOG_ENABLED: true
kafka:
metadata.max.age.ms: 1000
kpi:
config:
stream-data-collection-frequency-ms: 100
Expand Down
Expand Up @@ -15,6 +15,7 @@ public class NakadiSettings {
private final long defaultTopicRetentionMs;
private final long defaultTopicRotationMs;
private final long maxCommitTimeout;
private final int kafkaActiveProducersCount;
private final long kafkaPollTimeoutMs;
private final long kafkaSendTimeoutMs;
private final long timelineWaitTimeoutMs;
Expand All @@ -35,6 +36,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
@Value("${nakadi.topic.default.retentionMs}") final long defaultTopicRetentionMs,
@Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs,
@Value("${nakadi.stream.max.commitTimeout}") final long maxCommitTimeout,
@Value("${nakadi.kafka.producers.count}") final int kafkaActiveProducersCount,
@Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs,
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs,
@Value("${nakadi.timeline.wait.timeoutMs}") final long timelineWaitTimeoutMs,
Expand All @@ -58,6 +60,7 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
this.defaultTopicRetentionMs = defaultTopicRetentionMs;
this.defaultTopicRotationMs = defaultTopicRotationMs;
this.maxCommitTimeout = maxCommitTimeout;
this.kafkaActiveProducersCount = kafkaActiveProducersCount;
this.kafkaPollTimeoutMs = kafkaPollTimeoutMs;
this.kafkaSendTimeoutMs = kafkaSendTimeoutMs;
this.eventMaxBytes = eventMaxBytes;
Expand Down Expand Up @@ -96,6 +99,10 @@ public long getMaxCommitTimeout() {
return maxCommitTimeout;
}

public int getKafkaActiveProducersCount() {
return kafkaActiveProducersCount;
}

public long getKafkaPollTimeoutMs() {
return kafkaPollTimeoutMs;
}
Expand Down
Expand Up @@ -64,7 +64,8 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic
zookeeperSettings.getZkConnectionTimeoutMs(),
nakadiSettings);
final KafkaLocationManager kafkaLocationManager = new KafkaLocationManager(zooKeeperHolder, kafkaSettings);
final KafkaFactory kafkaFactory = new KafkaFactory(kafkaLocationManager);
final KafkaFactory kafkaFactory = new KafkaFactory(kafkaLocationManager,
nakadiSettings.getKafkaActiveProducersCount());
final KafkaZookeeper zk = new KafkaZookeeper(zooKeeperHolder, objectMapper);
final KafkaTopicRepository kafkaTopicRepository =
new KafkaTopicRepository.Builder()
Expand Down
Expand Up @@ -13,51 +13,56 @@

import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class KafkaFactory {

public static final String DEFAULT_PRODUCER_CLIENT_ID = "default";

private static final Logger LOG = LoggerFactory.getLogger(KafkaFactory.class);
private final KafkaLocationManager kafkaLocationManager;

private final Map<Producer<byte[], byte[]>, AtomicInteger> useCount;
private final ReadWriteLock rwLock;
private final Map<Producer<byte[], byte[]>, AtomicInteger> useCountByProducer;
private final Map<String, Producer<byte[], byte[]>> activeProducerByClientId;
private final List<Producer<byte[], byte[]>> activeProducers;
private final AtomicLong activeProducerCounter;

public KafkaFactory(final KafkaLocationManager kafkaLocationManager) {
public KafkaFactory(final KafkaLocationManager kafkaLocationManager, final int numActiveProducers) {
this.kafkaLocationManager = kafkaLocationManager;

this.useCount = new ConcurrentHashMap<>();
this.rwLock = new ReentrantReadWriteLock();
this.useCountByProducer = new ConcurrentHashMap<>();
this.activeProducerByClientId = new HashMap<>();

this.activeProducers = new ArrayList<>(numActiveProducers);
for (int i = 0; i < numActiveProducers; ++i) {
this.activeProducers.add(null);
}

this.activeProducerCounter = new AtomicLong(0);
}

@Nullable
private Producer<byte[], byte[]> takeUnderLock(final String clientId, final boolean canCreate) {
private Producer<byte[], byte[]> takeUnderLock(final int index, final boolean canCreate) {
final Lock lock = canCreate ? rwLock.writeLock() : rwLock.readLock();
lock.lock();
try {
Producer<byte[], byte[]> producer = activeProducerByClientId.get(clientId);
Producer<byte[], byte[]> producer = activeProducers.get(index);
if (null != producer) {
useCountByProducer.get(producer).incrementAndGet();
useCount.get(producer).incrementAndGet();
return producer;
} else if (canCreate) {
producer = createProducerInstance(clientId);
useCountByProducer.put(producer, new AtomicInteger(1));
activeProducerByClientId.put(clientId, producer);
producer = createProducerInstance();
useCount.put(producer, new AtomicInteger(1));
activeProducers.set(index, producer);

LOG.info("New producer instance created with client id '{}': {}", clientId, producer);
LOG.info("New producer instance created: " + producer);
return producer;
} else {
return null;
Expand All @@ -73,40 +78,38 @@ private Producer<byte[], byte[]> takeUnderLock(final String clientId, final bool
*
* @return Initialized kafka producer instance.
*/
public Producer<byte[], byte[]> takeProducer(final String clientId) {
Producer<byte[], byte[]> result = takeUnderLock(clientId, false);
public Producer<byte[], byte[]> takeProducer() {
final int index = (int)(activeProducerCounter.incrementAndGet() % activeProducers.size());

Producer<byte[], byte[]> result = takeUnderLock(index, false);
if (null == result) {
result = takeUnderLock(clientId, true);
result = takeUnderLock(index, true);
}
return result;
}

public Producer<byte[], byte[]> takeDefaultProducer() {
return takeProducer(DEFAULT_PRODUCER_CLIENT_ID);
}

/**
* Release kafka producer that was obtained by {@link #takeProducer()} method. If producer was not obtained by
* {@link #takeProducer()} call - method will throw {@link NullPointerException}
*
* @param producer Producer to release.
*/
public void releaseProducer(final Producer<byte[], byte[]> producer) {
final AtomicInteger counter = useCountByProducer.get(producer);
final AtomicInteger counter = useCount.get(producer);
if (counter != null && 0 == counter.decrementAndGet()) {
final boolean deleteProducer;
rwLock.readLock().lock();
try {
deleteProducer = !activeProducerByClientId.containsValue(producer);
deleteProducer = !activeProducers.contains(producer);
} finally {
rwLock.readLock().unlock();
}
if (deleteProducer) {
rwLock.writeLock().lock();
try {
if (counter.get() == 0 && null != useCountByProducer.remove(producer)) {
if (counter.get() == 0 && null != useCount.remove(producer)) {
LOG.info("Stopping producer instance - It was reported that instance should be refreshed " +
"and it is not used anymore: {}", producer);
"and it is not used anymore: " + producer);
producer.close();
}
} finally {
Expand All @@ -126,15 +129,14 @@ public void releaseProducer(final Producer<byte[], byte[]> producer) {
* @param producer Producer instance to terminate.
*/
public void terminateProducer(final Producer<byte[], byte[]> producer) {
LOG.info("Received signal to terminate producer: {}", producer);
LOG.info("Received signal to terminate producer " + producer);
rwLock.writeLock().lock();
try {
final Optional<String> clientId = activeProducerByClientId.entrySet().stream()
.filter(kv -> kv.getValue() == producer)
.map(Map.Entry::getKey)
.findFirst();
if (clientId.isPresent()) {
activeProducerByClientId.remove(clientId.get());
final int index = activeProducers.indexOf(producer);
if (index >= 0) {
activeProducers.set(index, null);
} else {
LOG.info("Signal for producer termination already received: " + producer);
}
} finally {
rwLock.writeLock().unlock();
Expand Down Expand Up @@ -208,8 +210,8 @@ public void close() {
}
}

protected Producer<byte[], byte[]> createProducerInstance(final String clientId) {
return new KafkaProducerCrutch(kafkaLocationManager.getKafkaProducerProperties(clientId),
protected Producer<byte[], byte[]> createProducerInstance() {
return new KafkaProducerCrutch(kafkaLocationManager.getKafkaProducerProperties(),
new KafkaCrutch(kafkaLocationManager));
}

Expand Down
Expand Up @@ -116,11 +116,8 @@ public Properties getKafkaConsumerProperties() {
return properties;
}

public Properties getKafkaProducerProperties(final String clientId) {
public Properties getKafkaProducerProperties() {
final Properties producerProps = (Properties) kafkaProperties.clone();

producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);

producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Expand All @@ -138,7 +135,6 @@ public Properties getKafkaProducerProperties(final String clientId) {
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaSettings.getMaxRequestSize());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaSettings.getDeliveryTimeoutMs());
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, kafkaSettings.getMaxBlockMs());
producerProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, kafkaSettings.getMetadataMaxAgeMs());
return producerProps;
}

Expand Down

0 comments on commit e20e646

Please sign in to comment.