Skip to content

Commit

Permalink
[pulsar-client] fix deadlock on send failure (apache#6488)
Browse files Browse the repository at this point in the history
(cherry picked from commit ad5415a)
  • Loading branch information
rdhabalia authored and jiazhai committed May 17, 2020
1 parent c0fc335 commit 4d984ab
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -3283,4 +3284,36 @@ public void testConsumerStartMessageIdAtExpectedPos(boolean batching, boolean st
consumer.close();
producer.close();
}

/**
* It verifies that message failure successfully releases semaphore and client successfully receives
* InvalidMessageException.
*
* @throws Exception
*/
@Test
public void testReleaseSemaphoreOnFailMessages() throws Exception {
log.info("-- Starting {} test --", methodName);

int maxPendingMessages = 10;
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().enableBatching(false)
.blockIfQueueFull(true).maxPendingMessages(maxPendingMessages)
.topic("persistent://my-property/my-ns/my-topic2");

Producer<byte[]> producer = producerBuilder.create();
List<Future<MessageId>> futures = Lists.newArrayList();

// Asynchronously produce messages
byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
for (int i = 0; i < maxPendingMessages + 10; i++) {
Future<MessageId> future = producer.sendAsync(message);
try {
future.get();
fail("should fail with InvalidMessageException");
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException.InvalidMessageException);
}
}
log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds %d bytes",
producerName, topic, compressedStr, compressedSize, ClientCnx.getMaxMessageSize()));
callback.sendComplete(invalidMessageException);
completeCallbackAndReleaseSemaphore(callback, invalidMessageException);
return;
}
}
Expand All @@ -360,7 +360,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s can not reuse the same message", producerName, topic));
callback.sendComplete(invalidMessageException);
completeCallbackAndReleaseSemaphore(callback, invalidMessageException);
compressedPayload.release();
return;
}
Expand Down Expand Up @@ -455,11 +455,9 @@ public void sendAsync(Message<?> message, SendCallback callback) {
}
}
} catch (PulsarClientException e) {
semaphore.release();
callback.sendComplete(e);
completeCallbackAndReleaseSemaphore(callback, e);
} catch (Throwable t) {
semaphore.release();
callback.sendComplete(new PulsarClientException(t));
completeCallbackAndReleaseSemaphore(callback, new PulsarClientException(t));
}
}

Expand All @@ -471,8 +469,9 @@ private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
return true;
}
if (!isMultiSchemaEnabled(true)) {
callback.sendComplete(new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic)));
PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s is disabled the `MultiSchema`", producerName, topic));
completeCallbackAndReleaseSemaphore(callback, e);
return false;
}
SchemaHash schemaHash = SchemaHash.of(msg.getSchema());
Expand Down Expand Up @@ -872,6 +871,11 @@ private void releaseSemaphoreForSendOp(OpSendMsg op) {
semaphore.release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
}

private void completeCallbackAndReleaseSemaphore(SendCallback callback, Exception exception) {
semaphore.release();
callback.sendComplete(exception);
}

/**
* Checks message checksum to retry if message was corrupted while sending to broker. Recomputes checksum of the
* message header-payload again.
Expand Down

0 comments on commit 4d984ab

Please sign in to comment.