Skip to content

Commit

Permalink
Fixes #569: NoSuchElementException in Session.drainQueueToConnection (#…
Browse files Browse the repository at this point in the history
…570)

Session.drainQueueToConnection checks sessionQueue.isEmpty(), and then
uses sessionQueue.remove(). However, between the check and the remove,
another thread can steal the last message, causing the remove() to throw
an exception. Better to use sessionQueue.poll() with a null-check.
  • Loading branch information
hylkevds committed May 1, 2021
1 parent 78aeed4 commit 55d8d4c
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,11 @@ private MqttPublishMessage publishNotRetainedDuplicated(InFlightPacket notAckPac
private void drainQueueToConnection() {
// consume the queue
while (!sessionQueue.isEmpty() && inflighHasSlotsAndConnectionIsUp()) {
final SessionRegistry.EnqueuedMessage msg = sessionQueue.remove();
final SessionRegistry.EnqueuedMessage msg = sessionQueue.poll();
if (msg == null) {
// Our message was already fetched by another Thread.
return;
}
inflightSlots.decrementAndGet();
int sendPacketId = mqttConnection.nextPacketId();
inflightWindow.put(sendPacketId, msg);
Expand Down

0 comments on commit 55d8d4c

Please sign in to comment.