Skip to content

Commit

Permalink
Added isEmpty check method to unsafe persistent queue
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Nov 25, 2022
1 parent 8d8ae33 commit f987236
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
10 changes: 9 additions & 1 deletion broker/src/main/java/io/moquette/broker/unsafequeues/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.concurrent.locks.ReentrantLock;

/**
* Not thread safe disk persisted queue.S
* Not thread safe disk persisted queue.
* */
public class Queue {
private static final Logger LOG = LoggerFactory.getLogger(Queue.class);
Expand Down Expand Up @@ -149,6 +149,14 @@ VirtualPointer currentTail() {
return currentTailPtr;
}

public boolean isEmpty() {
if (isTailFirstUsage(currentTailPtr)) {
return currentHeadPtr.compareTo(currentTailPtr) == 0;
} else {
return currentHeadPtr.moveForward(1).compareTo(currentTailPtr) == 0;
}
}

/**
* Read next message or return null if the queue has no data.
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,27 @@ private ByteBuffer randomPayload(int dataSize) {
return (ByteBuffer) ByteBuffer.wrap(payload);
}

@Test
public void newlyQueueIsEmpty() throws QueueException {
final QueuePool queuePool = QueuePool.loadQueues(tempQueueFolder);
final Queue queue = queuePool.getOrCreate("test");

assertTrue(queue.isEmpty(), "Freshly created queue must be empty");
}

@Test
public void consumedQueueIsEmpty() throws QueueException {
final QueuePool queuePool = QueuePool.loadQueues(tempQueueFolder);
final Queue queue = queuePool.getOrCreate("test");
queue.enqueue(ByteBuffer.wrap("AAAA".getBytes(StandardCharsets.UTF_8)));
Optional<ByteBuffer> data = queue.dequeue();
assertTrue(data.isPresent(), "Some payload is retrieved");

assertEquals(4, data.get().remaining(), "Payload contains what's expected");

assertTrue(queue.isEmpty(), "Queue must be empty after consuming it");
}

@Test
public void insertSomeDataIntoNewQueue() throws QueueException, IOException {
final QueuePool queuePool = QueuePool.loadQueues(tempQueueFolder);
Expand Down

0 comments on commit f987236

Please sign in to comment.