Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed #573 ByteBuf reference counting #600

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -37,6 +38,7 @@
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import io.netty.util.ReferenceCountUtil;

final class MQTTConnection {

Expand Down Expand Up @@ -377,7 +379,6 @@ void processPublish(MqttPublishMessage msg) {
case EXACTLY_ONCE: {
bindedSession.receivedPublishQos2(messageID, msg);
postOffice.receivedPublishQos2(this, msg, username);
// msg.release();
break;
}
default:
Expand Down Expand Up @@ -419,11 +420,19 @@ void sendIfWritableElseDrop(MqttMessage msg) {
LOG.debug("OUT {}", msg.fixedHeader().messageType());
}
if (channel.isWritable()) {

// Sending to external, retain a duplicate. Just retain is not
// enough, since the receiver must have full control.
Object retainedDup = msg;
if (msg instanceof ByteBufHolder) {
retainedDup = ((ByteBufHolder) msg).retainedDuplicate();
}

ChannelFuture channelFuture;
if (brokerConfig.isImmediateBufferFlush()) {
channelFuture = channel.writeAndFlush(msg);
channelFuture = channel.writeAndFlush(retainedDup);
} else {
channelFuture = channel.write(msg);
channelFuture = channel.write(retainedDup);
}
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
}
Expand Down
6 changes: 3 additions & 3 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ private void publishRetainedMessagesForSubscriptions(String clientID, List<Subsc

final ByteBuf payloadBuf = Unpooled.wrappedBuffer(retainedMsg.getPayload());
targetSession.sendRetainedPublishOnSessionAtQos(retainedMsg.getTopic(), qos, payloadBuf);
// We made the buffer, we must release it.
payloadBuf.release();
}
}
}
Expand Down Expand Up @@ -202,7 +204,7 @@ void receivedPublishQos1(MQTTConnection connection, Topic topic, String username
interceptor.notifyTopicPublished(msg, clientId, username);
}

private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publishingQos) {
private void publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS publishingQos) {
Set<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);

for (final Subscription sub : topicMatchingSubscriptions) {
Expand All @@ -213,8 +215,6 @@ private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publi
if (isSessionPresent) {
LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}",
sub.getClientId(), sub.getTopicFilter(), qos);
// we need to retain because duplicate only copy r/w indexes and don't retain() causing refCnt = 0
ByteBuf payload = origPayload.retainedDuplicate();
targetSession.sendPublishOnSessionAtQos(topic, qos, payload);
} else {
// If we are, the subscriber disconnected after the subscriptions tree selected that session as a
Expand Down
3 changes: 2 additions & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private <T, U> T loadClass(String className, Class<T> intrface, Class<U> constru
* Use the broker to publish a message. It's intended for embedding applications. It can be used
* only after the integration is correctly started with startServer.
*
* @param msg the message to forward.
* @param msg the message to forward. The ByteBuf in the message will be released.
* @param clientId the id of the sending integration.
* @throws IllegalStateException if the integration is not yet started
*/
Expand All @@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) {
}
LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
dispatcher.internalPublish(msg);
msg.payload().release();
}

public void stopServer() {
Expand Down
70 changes: 60 additions & 10 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
import io.moquette.broker.SessionRegistry.PublishedMessage;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -183,7 +185,12 @@ boolean isClean() {
}

public void processPubRec(int packetId) {
inflightWindow.remove(packetId);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
Expand All @@ -200,7 +207,12 @@ public void processPubRec(int packetId) {
}

public void processPubComp(int messageID) {
inflightWindow.remove(messageID);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();

drainQueueToConnection();
Expand Down Expand Up @@ -238,15 +250,26 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) {
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));

// Adding to a map, retain.
payload.retain();
EnqueuedMessage old = inflightWindow.put(packetId, new PublishedMessage(topic, qos, payload));
// If there already was something, release it.
if (old != null) {
old.release();
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);

// TODO drainQueueToConnection();?
} else {
final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload);
// Adding to a queue, retain.
msg.retain();
sessionQueue.add(msg);
}
}
Expand All @@ -255,15 +278,26 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload) {
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));

// Retain before adding to map
payload.retain();
EnqueuedMessage old = inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));
// If there already was something, release it.
if (old != null) {
old.release();
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);

drainQueueToConnection();
} else {
final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload);
// Adding to a queue, retain.
msg.retain();
sessionQueue.add(msg);
}
}
Expand All @@ -283,7 +317,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
inflightWindow.remove(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
drainQueueToConnection();
}
Expand All @@ -305,8 +343,7 @@ public void resendInflightNotAcked() {
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload);
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload);
mqttConnection.sendPublish(publishMsg);
}
}
Expand Down Expand Up @@ -341,7 +378,13 @@ private void drainQueueToConnection() {
}
inflightSlots.decrementAndGet();
int sendPacketId = mqttConnection.nextPacketId();
inflightWindow.put(sendPacketId, msg);

// Putting it in a map, but the retain is cancelled out by the below release.
EnqueuedMessage old = inflightWindow.put(sendPacketId, msg);
if (old != null) {
old.release();
inflightSlots.incrementAndGet();
}
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
Expand All @@ -352,6 +395,7 @@ private void drainQueueToConnection() {
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);
}
// we fetched msg from a map, but the release is cancelled out by the above retain
}
}

Expand All @@ -374,12 +418,18 @@ void sendRetainedPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload
}

public void receivedPublishQos2(int messageID, MqttPublishMessage msg) {
qos2Receiving.put(messageID, msg);
msg.retain(); // retain to put in the inflight map
// Retain before putting msg in map.
ReferenceCountUtil.retain(msg);

MqttPublishMessage old = qos2Receiving.put(messageID, msg);
// In case of evil client with duplicate msgid.
ReferenceCountUtil.release(old);

mqttConnection.sendPublishReceived(messageID);
}

public void receivedPubRelQos2(int messageID) {
// Done with the message, remove from queue and release payload.
final MqttPublishMessage removedMsg = qos2Receiving.remove(messageID);
ReferenceCountUtil.release(removedMsg);
}
Expand Down
23 changes: 23 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@
public class SessionRegistry {

public abstract static class EnqueuedMessage {

/**
* Releases any held resources. Must be called when the EnqueuedMessage is no
* longer needed.
*/
public void release() {}

/**
* Retains any held resources. Must be called when the EnqueuedMessage is added
* to a store.
*/
public void retain() {}
}

public static class PublishedMessage extends EnqueuedMessage {
Expand All @@ -63,6 +75,17 @@ public MqttQoS getPublishingQos() {
public ByteBuf getPayload() {
return payload;
}

@Override
public void release() {
payload.release();
}

@Override
public void retain() {
payload.retain();
}

}

public static final class PubRelMarker extends EnqueuedMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void onConnectionLost(InterceptConnectionLostMessage msg) {

@Override
public void onPublish(InterceptPublishMessage msg) {
msg.getPayload().release();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void notifyTopicPublished(final MqttPublishMessage msg, final String clie
for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
+ "interceptorId={}", clientID, messageId, topic, handler.getID());
handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
// Sending to the outside, make a retainedDuplicate.
handler.onPublish(new InterceptPublishMessage(msg.retainedDuplicate(), clientID, username));
}
} finally {
ReferenceCountUtil.release(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public interface InterceptHandler {

void onConnectionLost(InterceptConnectionLostMessage msg);

/**
* Called when a message is published. The receiver MUST release the payload of the message, either
* by calling super.onPublish, or by calling msg.getPayload.release() directly.
*
* @param msg The message that was published.
*/
void onPublish(InterceptPublishMessage msg);

void onSubscribe(InterceptSubscribeMessage msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void write(WriteBuffer buff, Object obj) {
final ByteBuf casted = (ByteBuf) obj;
final int payloadSize = casted.readableBytes();
byte[] rawBytes = new byte[payloadSize];
casted.copy().readBytes(rawBytes);
casted.copy().readBytes(rawBytes).release();
buff.putInt(payloadSize);
buff.put(rawBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -81,16 +82,18 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel
@Test
public void dropConnectionOnPublishWithInvalidTopicFormat() {
// Connect message with clean session set to true and client id is null.
final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8));
MqttPublishMessage publish = MqttMessageBuilders.publish()
.topicName("")
.retained(false)
.qos(MqttQoS.AT_MOST_ONCE)
.payload(Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8))).build();
.payload(payload).build();

sut.processPublish(publish);

// Verify
assertFalse(channel.isOpen(), "Connection should be closed by the broker");
payload.release();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,13 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
connection.processConnect(connectMessage);
ConnectionTestUtils.assertConnectAccepted(channel);

final ByteBuf payload1 = ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!");
this.retainedRepository.retain(new Topic(NEWS_TOPIC), MqttMessageBuilders.publish()
.payload(ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!"))
.payload(payload1)
.qos(AT_LEAST_ONCE)
.build());
// Retaining a msg does not release the payload.
payload1.release();

// Exercise
final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
Expand All @@ -241,6 +244,8 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());
// receivedPublishQos0 does not release payload.
anyPayload.release();

// Verify
assertTrue(retainedRepository.isEmpty(), "QoS0 MUST clean retained message for topic");
Expand Down
7 changes: 7 additions & 0 deletions broker/src/test/java/io/moquette/broker/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public void testPubAckDrainMessagesRemainingInQueue() {

// Verify
assertTrue(queuedMessages.isEmpty(), "Messages should be drained");

// release the rest, to avoid leaking buffers
for (int i = 2; i <= 11; i++) {
client.pubAckReceived(i);
}
client.closeImmediately();
testChannel.close();
}

private void sendQoS1To(Session client, Topic destinationTopic, String message) {
Expand Down
Loading