Skip to content

Commit

Permalink
This is PR #574
Browse files Browse the repository at this point in the history
  • Loading branch information
brusdev committed Oct 25, 2021
2 parents 37fdc6f + 7dcba3b commit 277a7dc
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.util.CharsetUtil;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
Expand Down Expand Up @@ -74,6 +75,7 @@ void connect(String cId,
return;
}

boolean sessionPresent = session.getProtocolManager().getSessionStates().containsKey(clientId);
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
Expand Down Expand Up @@ -104,7 +106,7 @@ void connect(String cId,
}

session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent && !cleanSession, MqttProperties.NO_PROPERTIES);
// ensure we don't publish before the CONNACK
session.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,16 @@ void sendConnack(MqttConnectReturnCode returnCode) {
}

void sendConnack(MqttConnectReturnCode returnCode, MqttProperties properties) {
sendConnack(returnCode, true, properties);
}

void sendConnack(MqttConnectReturnCode returnCode, boolean sessionPresent, MqttProperties properties) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true, properties);
// [MQTT-3.2.2-4] If a server sends a CONNACK packet containing a non-zero return code it MUST set Session Present to 0.
if (returnCode.byteValue() != (byte) 0x00) {
sessionPresent = false;
}
MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent, properties);
MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
sendToClient(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
Expand Down Expand Up @@ -161,6 +164,28 @@ public void deliveryComplete(IMqttDeliveryToken token) {
producer.close();
}

@Test(timeout = 300000)
public void testSessionPresentWithCleanSession() throws Exception {
MqttClient client = createPahoClient(RandomUtil.randomString());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
IMqttToken result = client.connectWithResult(options);
assertFalse(result.getSessionPresent());
client.disconnect();
}

@Test(timeout = 300000)
public void testSessionPresent() throws Exception {
MqttClient client = createPahoClient(RandomUtil.randomString());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
IMqttToken result = client.connectWithResult(options);
assertFalse(result.getSessionPresent());
client.disconnect();
result = client.connectWithResult(options);
assertTrue(result.getSessionPresent());
}

private MqttClient createPahoClient(String clientId) throws MqttException {
return new MqttClient(protocol + "://localhost:" + getPort(), clientId, new MemoryPersistence());
}
Expand Down

0 comments on commit 277a7dc

Please sign in to comment.