Skip to content

Commit

Permalink
Support Pulsar batch handling (#638)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed May 23, 2022
1 parent e135cc5 commit 0d0af93
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 14 deletions.
Expand Up @@ -68,6 +68,8 @@ public boolean accept(String protocol) {
public void initialize(ServiceConfiguration conf) throws Exception {
// init config
mqttConfig = ConfigurationUtils.create(conf.getProperties(), MQTTServerConfiguration.class);
// We have to enable ack batch message individual.
mqttConfig.setAcknowledgmentAtBatchIndexLevelEnabled(true);
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(mqttConfig.getBindAddress());
}

Expand Down
Expand Up @@ -14,38 +14,62 @@
package io.streamnative.pulsar.handlers.mqtt;

import io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer;
import java.util.Objects;
import lombok.Getter;

/**
* Outstanding packet that the broker sent to clients.
*/

@Getter
public class OutstandingPacket {

private final MQTTConsumer consumer;
private final int packetId;
private final long ledgerId;
private final long entryId;
private final int batchIndex;

private final int batchSize;

public OutstandingPacket(MQTTConsumer consumer, int packetId, long ledgerId, long entryId) {
this.consumer = consumer;
this.packetId = packetId;
this.ledgerId = ledgerId;
this.entryId = entryId;
this.batchIndex = -1;
this.batchSize = -1;
}

public MQTTConsumer getConsumer() {
return consumer;
public OutstandingPacket(MQTTConsumer consumer, int packetId, long ledgerId,
long entryId, int batchIndex, int batchSize) {
this.consumer = consumer;
this.packetId = packetId;
this.ledgerId = ledgerId;
this.entryId = entryId;
this.batchIndex = batchIndex;
this.batchSize = batchSize;
}

public int getPacketId() {
return packetId;
public boolean isBatch() {
return batchIndex != -1;
}

public long getLedgerId() {
return ledgerId;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OutstandingPacket that = (OutstandingPacket) o;
return packetId == that.packetId && ledgerId == that.ledgerId
&& entryId == that.entryId && batchIndex == that.batchIndex && batchSize == that.batchSize;
}

public long getEntryId() {
return entryId;
@Override
public int hashCode() {
return Objects.hash(packetId, ledgerId, entryId, batchIndex, batchSize);
}
}
Expand Up @@ -153,8 +153,17 @@ public void processPubAck(MqttAdapterMessage adapter) {
int packetId = msg.variableHeader().messageId();
OutstandingPacket packet = outstandingPacketContainer.remove(packetId);
if (packet != null) {
packet.getConsumer().getSubscription().acknowledgeMessage(
Collections.singletonList(PositionImpl.get(packet.getLedgerId(), packet.getEntryId())),
PositionImpl position;
if (packet.isBatch()) {
long[] ackSets = new long[packet.getBatchSize()];
for (int i = 0; i < packet.getBatchSize(); i++) {
ackSets[i] = packet.getBatchIndex() == i ? 0 : 1;
}
position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId(), ackSets);
} else {
position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId());
}
packet.getConsumer().getSubscription().acknowledgeMessage(Collections.singletonList(position),
CommandAck.AckType.Individual, Collections.emptyMap());
packet.getConsumer().getPendingAcks().remove(packet.getLedgerId(), packet.getEntryId());
packet.getConsumer().incrementPermits();
Expand Down
Expand Up @@ -94,16 +94,26 @@ public ChannelPromise sendMessages(List<Entry> entries, EntryBatchSizes batchSiz
List<MqttPublishMessage> messages = PulsarMessageConverter.toMqttMessages(toConsumerTopicName, entry,
packetIdGenerator, qos);
if (MqttQoS.AT_MOST_ONCE != qos) {
messages.stream().map(message -> new OutstandingPacket(this,
message.variableHeader().packetId(), entry.getLedgerId(),
entry.getEntryId())).forEach(outstandingPacketContainer::add);
final boolean isBatch = messages.size() > 1;
if (isBatch) {
for (int i = 0; i < messages.size(); i++) {
int packetId = messages.get(i).variableHeader().packetId();
OutstandingPacket outstandingPacket = new OutstandingPacket(this, packetId, entry.getLedgerId(),
entry.getEntryId(), i, messages.size());
outstandingPacketContainer.add(outstandingPacket);
}
} else {
OutstandingPacket outstandingPacket = new OutstandingPacket(this,
messages.get(0).variableHeader().packetId(), entry.getLedgerId(), entry.getEntryId());
outstandingPacketContainer.add(outstandingPacket);
}
}
for (MqttPublishMessage msg : messages) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Send MQTT message {} to subscriber", pulsarTopicName,
mqttTopicName, super.getSubscription().getName(), msg);
}
int readableBytes = msg.payload().readableBytes();
final int readableBytes = msg.payload().readableBytes();
metricsCollector.addReceived(readableBytes);
if (clientRestrictions.exceedMaximumPacketSize(readableBytes)) {
log.warn("discard msg {}, because it exceeds maximum packet size : {}, msg size {}", msg,
Expand Down
Expand Up @@ -149,6 +149,7 @@ protected MQTTCommonConfiguration initConfig() throws Exception{
mqtt.setAuthorizationEnabled(false);
mqtt.setAllowAutoTopicCreation(true);
mqtt.setBrokerDeleteInactiveTopicsEnabled(false);
mqtt.setAcknowledgmentAtBatchIndexLevelEnabled(true);

// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");
Expand Down
Expand Up @@ -16,12 +16,16 @@
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Producer;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -65,4 +69,62 @@ public void testReceiveBatchMessageWithCorrectPacketId() throws Exception {
client.disconnect();
producer.close();
}


@Test
public void testAckBatchMessageIndividual() throws Exception {
final String topic = "persistent://public/default/test-batch-message-1";
final Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("127.0.0.1")
.serverPort(getMqttBrokerPortList().get(0))
.automaticReconnectWithDefaultConfig()
.buildBlocking();
client.connectWith()
.cleanStart(false).sessionExpiryInterval(10).send();
client.subscribeWith()
.topicFilter(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
final Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL, true);
Producer<byte[]> producer = pulsarClient.newProducer()
.enableBatching(true)
.batchingMaxMessages(5)
.topic(topic)
.create();
final List<String> payloads = new ArrayList<>();
for (int i = 0; i < 50; i++) {
final String payload = UUID.randomUUID().toString();
payloads.add(payload);
producer.sendAsync(payload.getBytes());
}
for (int i = 0; i < 50; i++) {
Mqtt5Publish message = publishes.receive();
if (i <= 7) {
message.acknowledge();
String payload = new String(message.getPayloadAsBytes());
payloads.remove(payload);
}
}
admin.topics().unload(topic);
Awaitility.await().until(() -> client.getState().isConnected());
client.subscribeWith()
.topicFilter(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.send();
Assert.assertFalse(payloads.isEmpty());
for (int i = 0; i < 50; i++) {
Optional<Mqtt5Publish> receive = publishes.receive(1, TimeUnit.SECONDS);
if (receive.isPresent()) {
Mqtt5Publish message = receive.get();
message.acknowledge();
String payload = new String(message.getPayloadAsBytes());
payloads.remove(payload);
}
}
Assert.assertTrue(payloads.isEmpty());
publishes.close();
client.disconnect();
producer.close();
}
}

0 comments on commit 0d0af93

Please sign in to comment.