From fe1f6a82bade8174a32629d9f99ac4bbebe05d1c Mon Sep 17 00:00:00 2001 From: changqing Date: Mon, 8 May 2023 09:36:55 +0800 Subject: [PATCH] Fix - Add permit if receive nack command. --- .../io/streamnative/pulsar/handlers/amqp/AmqpChannel.java | 7 ++++++- .../pulsar/handlers/amqp/AmqpPulsarConsumer.java | 4 ++++ .../pulsar/handlers/amqp/UnacknowledgedMessageMap.java | 1 + 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java index a5c28612..97ded232 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java @@ -17,7 +17,6 @@ import static io.streamnative.pulsar.handlers.amqp.utils.ExchangeUtil.isBuildInExchange; import static org.apache.qpid.server.protocol.ErrorCodes.INTERNAL_ERROR; import static org.apache.qpid.server.transport.util.Functions.hex; - import com.google.common.annotations.VisibleForTesting; import io.streamnative.pulsar.handlers.amqp.common.exception.AoPException; import io.streamnative.pulsar.handlers.amqp.flow.AmqpFlowCreditManager; @@ -755,6 +754,9 @@ public void messageNAck(long deliveryTag, boolean multiple, boolean requeue) { } else { closeChannel(ErrorCodes.IN_USE, "deliveryTag not found"); } + ackedMessages.forEach(entry -> { + entry.getConsumer().handleFlow(1); + }); if (creditManager.hasCredit() && isBlockedOnCredit()) { unBlockedOnCredit(); } @@ -767,6 +769,9 @@ public void receiveBasicRecover(boolean requeue, boolean sync) { if (!ackedMessages.isEmpty()) { requeue(ackedMessages); } + ackedMessages.forEach(entry -> { + entry.getConsumer().handleFlow(1); + }); if (creditManager.hasCredit() && isBlockedOnCredit()) { unBlockedOnCredit(); } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarConsumer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarConsumer.java index 2543a4d2..b1ea233d 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarConsumer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarConsumer.java @@ -113,6 +113,10 @@ public void requeue(List positions) { } } + @Override + public void handleFlow(int permits) { + } + public void close() throws PulsarClientException { this.isClosed = true; this.consumer.pause(); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/UnacknowledgedMessageMap.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/UnacknowledgedMessageMap.java index 3acd7cc9..aab48b91 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/UnacknowledgedMessageMap.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/UnacknowledgedMessageMap.java @@ -36,6 +36,7 @@ public class UnacknowledgedMessageMap { public interface MessageProcessor { void messageAck(Position position); void requeue(List positions); + void handleFlow(int permits); } /**