Skip to content

Commit

Permalink
This is PR #576
Browse files Browse the repository at this point in the history
  • Loading branch information
brusdev committed Oct 25, 2021
2 parents b9cb06b + 9805d93 commit 4b33231
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;

import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;

import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
Expand Down Expand Up @@ -354,18 +356,10 @@ public boolean postAcknowledge(String address, String queue, String nodeID, long

}

private void performAckOnPage(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation) {
if (targetQueue.getPagingStore().isPaging()) {
PageAck pageAck = new PageAck(nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, pageAck);
targetQueue.getPageSubscription().performScanAck();
} else {
if (logger.isDebugEnabled()) {
logger.debug("Post ack Server " + server + " could not find messageID = " + messageID +
" representing nodeID=" + nodeID);
}
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
}

public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
}

private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, boolean retry) {
Expand Down Expand Up @@ -487,23 +481,44 @@ public void postAcknowledge(MessageReference ref, AckReason reason) {
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
}

// I need a supress warning annotation here
// because errorProne is issuing an error her, however I really intend to compare PageACK against PagedReference
@SuppressWarnings("ComparableType")
class PageAck implements Comparable<PagedReference>, Runnable {
class PageAck implements ToIntFunction<PagedReference>, BooleanSupplier, Runnable {

final Queue targetQueue;
final String nodeID;
final long messageID;
final ACKMessageOperation operation;
final IOCallback operation;

PageAck(String nodeID, long messageID, ACKMessageOperation operation) {
PageAck(Queue targetQueue, String nodeID, long messageID, IOCallback operation) {
this.targetQueue = targetQueue;
this.nodeID = nodeID;
this.messageID = messageID;
this.operation = operation;
}

/**
* Method to retry the ack before a scan
* @return
*/
@Override
public boolean getAsBoolean() {
try {
recoverContext();
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
if (reference == null) {
return false;
} else {
targetQueue.acknowledge(reference);
OperationContextImpl.getContext().executeOnCompletion(operation);
return true;
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
return false;
}
}

@Override
public int compareTo(PagedReference reference) {
public int applyAsInt(PagedReference reference) {
String refNodeID = referenceNodeStore.getServerID(reference);
long refMessageID = referenceNodeStore.getID(reference);
if (refNodeID == null) {
Expand All @@ -526,7 +541,7 @@ public int compareTo(PagedReference reference) {

@Override
public void run() {
operation.connectionRun();
operation.done();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ protected AMQPSessionContext newSessionExtension(Session realSession) throws Act
return protonSession;
}

public Map<Session, AMQPSessionContext> getSessions() {
return sessions;
}

public SecurityAuth getSecurityAuth() {
return new LocalSecurity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ public int getReceiverCount() {
return receivers.size();
}

public Map<Receiver, ProtonAbstractReceiver> getReceivers() {
return receivers;
}

public int getSenderCount() {
return senders.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;

import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;

import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.Page;
Expand Down Expand Up @@ -85,12 +88,14 @@ public interface PageSubscription {
// for internal (cursor) classes
void confirmPosition(Transaction tx, PagePosition position) throws Exception;


// Add a scan function to be performed. It will be completed when you call performScan
void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound);

// it will schedule a scan on pages for everything that was added through addScanAck
void performScanAck();
/**
* This method will schedule scanning over Paging, however a retry should be done before the scanning.
* @param retryBeforeScan if this function is called and returns true, the scan for this element will not be called. It would be caller's responsibility to call found.
* @param scanFunction
* @param found
* @param notFound
*/
void scanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound);


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
Expand Down Expand Up @@ -112,16 +114,18 @@ public AtomicInteger getScheduledCleanupCount() {

// Each CursorIterator will record their current PageReader in this map
private final ConcurrentLongHashMap<PageReader> pageReaders = new ConcurrentLongHashMap<>();
private final AtomicInteger scheduledScanCount = new AtomicInteger(0);

/** this variable governs if we need to schedule another runner to look after the scanList. */
private boolean pageScanNeeded = true;
private final LinkedList<PageScan> scanList = new LinkedList();

private static class PageScan {
final Comparable<PagedReference> scanFunction;
final BooleanSupplier retryBeforeScan;
final ToIntFunction<PagedReference> scanFunction;
final Runnable found;
final Runnable notfound;
final Runnable notFound;

public Comparable<PagedReference> getScanFunction() {
public ToIntFunction<PagedReference> getScanFunction() {
return scanFunction;
}

Expand All @@ -130,47 +134,70 @@ public Runnable getFound() {
}

public Runnable getNotfound() {
return notfound;
return notFound;
}

PageScan(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
PageScan(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
this.retryBeforeScan = retryBeforeScan;
this.scanFunction = scanFunction;
this.found = found;
this.notfound = notfound;
this.notFound = notFound;
}
}

@Override
public void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, Runnable notfound) {
PageScan scan = new PageScan(scanFunction, found, notfound);
public void scanAck(BooleanSupplier retryBeforeScan, ToIntFunction<PagedReference> scanFunction, Runnable found, Runnable notFound) {
PageScan scan = new PageScan(retryBeforeScan, scanFunction, found, notFound);
boolean pageScanNeededLocal;
synchronized (scanList) {
scanList.add(scan);
pageScanNeededLocal = this.pageScanNeeded;
this.pageScanNeeded = false;
}
}

@Override
public void performScanAck() {
// we should only have a max of 2 scheduled tasks
// one that's might still be currently running, and another one lined up
// no need for more than that
if (scheduledScanCount.incrementAndGet() < 2) {
executor.execute(this::actualScanAck);
} else {
scheduledScanCount.decrementAndGet();
if (pageScanNeededLocal) {
executor.execute(this::performScanAck);
}
}

private void actualScanAck() {
private void performScanAck() {
try {
PageScan[] localScanList;
synchronized (scanList) {
this.pageScanNeeded = true;
if (scanList.size() == 0) {
return;
}
localScanList = scanList.stream().toArray(i -> new PageScan[i]);
scanList.clear();
}

int retriedFound = 0;
for (int i = 0; i < localScanList.length; i++) {
PageScan scanElemen = localScanList[i];
if (scanElemen.retryBeforeScan != null && scanElemen.retryBeforeScan.getAsBoolean()) {
localScanList[i] = null;
retriedFound++;
}
}

if (retriedFound == localScanList.length) {
return;
}

if (!isPaging()) {
// this would mean that between the submit and now the system left paging mode
// at this point we will just return everything as notFound
for (int i = 0; i < localScanList.length; i++) {
PageScan scanElemen = localScanList[i];
if (scanElemen != null && scanElemen.notFound != null) {
scanElemen.notFound.run();
}
}

return;
}

LinkedList<Runnable> afterCommitList = new LinkedList<>();
TransactionImpl tx = new TransactionImpl(store);
tx.addOperation(new TransactionOperationAbstract() {
Expand All @@ -196,7 +223,7 @@ public void afterCommit(Transaction tx) {
continue;
}

int result = scanElemen.scanFunction.compareTo(reference);
int result = scanElemen.scanFunction.applyAsInt(reference);

if (result >= 0) {
if (result == 0) {
Expand All @@ -209,8 +236,8 @@ public void afterCommit(Transaction tx) {
logger.warn(e.getMessage(), e);
}
} else {
if (scanElemen.notfound != null) {
scanElemen.notfound.run();
if (scanElemen.notFound != null) {
scanElemen.notFound.run();
}
}
localScanList[i] = null;
Expand All @@ -228,8 +255,8 @@ public void afterCommit(Transaction tx) {
}

for (int i = 0; i < localScanList.length; i++) {
if (localScanList[i] != null && localScanList[i].notfound != null) {
localScanList[i].notfound.run();
if (localScanList[i] != null && localScanList[i].notFound != null) {
localScanList[i].notFound.run();
}
localScanList[i] = null;
}
Expand All @@ -241,8 +268,8 @@ public void afterCommit(Transaction tx) {
logger.warn(e.getMessage(), e);
}
}
} finally {
scheduledScanCount.decrementAndGet();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ private void replicaTest(boolean largeMessage,
server_2.startBrokerConnection(brokerConnectionName);
}

snfreplica = server_2.locateQueue(replica.getMirrorSNF());
Assert.assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF()));

if (pagingTarget) {
assertTrue(queueOnServer1.getPagingStore().isPaging());
Expand Down Expand Up @@ -852,7 +852,7 @@ private void consumeMessages(boolean largeMessage,
int LAST_ID,
int port,
boolean assertNull) throws JMSException {
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port + "?jms.prefetchPolicy.all=0");
ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port);
Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Expand Down

0 comments on commit 4b33231

Please sign in to comment.