Skip to content

Commit

Permalink
Perform producer compression from IO threads (apache#7733)
Browse files Browse the repository at this point in the history
* Added `zookeeper.snapshot.trust.empty=true` for ZK 3.5 upgrade

* Perform producer compression from IO threads

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
  • Loading branch information
2 people authored and Livio committed Sep 5, 2020
1 parent 6940a58 commit 740b4fb
Showing 1 changed file with 25 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
private final BlockingQueue<OpSendMsg> pendingCallbacks;
private final Semaphore semaphore;
private volatile Timeout sendTimeout = null;
private volatile Timeout batchMessageAndSendTimeout = null;
private long createProducerTimeout;
private final BatchMessageContainerBase batchMessageContainer;
private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -133,6 +132,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne

private final ConnectionHandler connectionHandler;

private ScheduledFuture<?> batchTimerTask;

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
Expand Down Expand Up @@ -406,7 +407,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
return;
}
}

try {
synchronized (this) {
int readStartIndex = 0;
Expand Down Expand Up @@ -795,10 +796,10 @@ public CompletableFuture<Void> closeAsync() {
sendTimeout = null;
}

Timeout batchTimeout = batchMessageAndSendTimeout;
if (batchTimeout != null) {
batchTimeout.cancel();
batchMessageAndSendTimeout = null;
ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
if (batchTimerTask != null) {
batchTimerTask.cancel(false);
this.batchTimerTask = null;
}

if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
Expand Down Expand Up @@ -1229,8 +1230,24 @@ public void connectionOpened(final ClientCnx cnx) {

if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
// schedule the first batch message task
client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMicros(),
TimeUnit.MICROSECONDS);
batchTimerTask = cnx.ctx().executor().scheduleAtFixedRate(() -> {
if (log.isTraceEnabled()) {
log.trace(
"[{}] [{}] Batching the messages from the batch container from timer thread",
topic,
producerName);
}
// semaphore acquired when message was enqueued to container
synchronized (ProducerImpl.this) {
// If it's closing/closed we need to ignore the send batch timer and not
// schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}

batchMessageAndSend();
}
}, 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
resendMessages(cnx);
}
Expand Down Expand Up @@ -1504,32 +1521,6 @@ private void failPendingBatchMessages(PulsarClientException ex) {
semaphore.release(numMessagesInBatch);
}

TimerTask batchMessageAndSendTask = new TimerTask() {

@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
if (log.isTraceEnabled()) {
log.trace("[{}] [{}] Batching the messages from the batch container from timer thread", topic,
producerName);
}
// semaphore acquired when message was enqueued to container
synchronized (ProducerImpl.this) {
// If it's closing/closed we need to ignore the send batch timer and not schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}

batchMessageAndSend();
// schedule the next batch message task
batchMessageAndSendTimeout = client.timer()
.newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
}
};

@Override
public CompletableFuture<Void> flushAsync() {
CompletableFuture<MessageId> lastSendFuture;
Expand Down

0 comments on commit 740b4fb

Please sign in to comment.