Skip to content

Commit

Permalink
Fixed #831: changed several cases where a shared readerIndex was chan…
Browse files Browse the repository at this point in the history
…ged (#832)

DebugUtils.payload2Str set the readerIndex to the readableBytes, witch
was simply wrong. Utils.readBytesAndRewind was unused and thus removed.
SegmentedPersistentQueueSerDes use of readBytes (that changes the
readerIndex) was changed to getBytes which does not change the readerIndex.
  • Loading branch information
hylkevds committed Apr 30, 2024
1 parent 073cd63 commit 858a0f3
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 16 deletions.
5 changes: 1 addition & 4 deletions broker/src/main/java/io/moquette/broker/DebugUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
public final class DebugUtils {

public static String payload2Str(ByteBuf content) {
final int readerPin = content.readableBytes();
final String result = content.toString(StandardCharsets.UTF_8);
content.readerIndex(readerPin);
return result;
return content.toString(StandardCharsets.UTF_8);
}

private DebugUtils() {
Expand Down
9 changes: 0 additions & 9 deletions broker/src/main/java/io/moquette/broker/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.moquette.broker;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -42,14 +41,6 @@ public static int messageId(MqttMessage msg) {
return ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
}

public static byte[] readBytesAndRewind(ByteBuf payload) {
byte[] payloadContent = new byte[payload.readableBytes()];
int mark = payload.readerIndex();
payload.readBytes(payloadContent);
payload.readerIndex(mark);
return payloadContent;
}

public static MqttVersion versionFromConnect(MqttConnectMessage msg) {
return MqttVersion.fromProtocolNameAndLevel(msg.variableHeader().name(), (byte) msg.variableHeader().version());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private void writeMessageExpiry(ByteBuffer buff, Instant messageExpiry) {
private void writePayload(ByteBuffer target, ByteBuf source) {
final int payloadSize = source.readableBytes();
byte[] rawBytes = new byte[payloadSize];
source.readBytes(rawBytes).release();
source.getBytes(source.readerIndex(), rawBytes).release();
target.putInt(payloadSize);
target.put(rawBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,35 @@

class SegmentedPersistentQueueSerDesTest {

private static final String TEST_STRING = "Some fancy things";

@Test
public void givenEnqueuedMessageWithoutMqttPropertyThenItCanBeProperlySerialized() {
SegmentedPersistentQueueSerDes sut = new SegmentedPersistentQueueSerDes();

final Topic topic = Topic.asTopic("/metering/temperature");
ByteBuf payload = Unpooled.wrappedBuffer("Some fancy things".getBytes(StandardCharsets.UTF_8));
ByteBuf payload = Unpooled.wrappedBuffer(TEST_STRING.getBytes(StandardCharsets.UTF_8));
payload.retain();
SessionRegistry.EnqueuedMessage messageToSerialize = new SessionRegistry.PublishedMessage(
topic, MqttQoS.AT_MOST_ONCE, payload, true, Instant.MAX);

ByteBuffer serialized = sut.toBytes(messageToSerialize);
assertEquals(TEST_STRING, payload.toString(StandardCharsets.UTF_8), "Buffer should not have changed.");
payload.release();

final SessionRegistry.EnqueuedMessage decoded = sut.fromBytes(serialized);
assertTrue(decoded instanceof SessionRegistry.PublishedMessage);
final SessionRegistry.PublishedMessage casted = (SessionRegistry.PublishedMessage) decoded;
assertEquals(topic, casted.getTopic());
assertEquals(TEST_STRING, casted.getPayload().toString(StandardCharsets.UTF_8), "Decoded message not the same.");
}

@Test
public void givenEnqueuedMessageContainingMqttPropertyThenItCanBeProperlySerialized() {
SegmentedPersistentQueueSerDes sut = new SegmentedPersistentQueueSerDes();

final Topic topic = Topic.asTopic("/metering/temperature");
ByteBuf payload = Unpooled.wrappedBuffer("Some fancy things".getBytes(StandardCharsets.UTF_8));
ByteBuf payload = Unpooled.wrappedBuffer(TEST_STRING.getBytes(StandardCharsets.UTF_8));
int subscriptionId = 123;
MqttProperties.IntegerProperty intProperty = new MqttProperties.IntegerProperty(
MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value(), subscriptionId);
Expand Down

0 comments on commit 858a0f3

Please sign in to comment.