Skip to content

Commit

Permalink
Minor fixes (Azure#141)
Browse files Browse the repository at this point in the history
* Changing message return daemon wakeup interval from 100 milliseconds to 20 milliseconds.
* Changing message return loop wake up interval to 1 millisecond.
* Removing filterExpiredMessages smarts from receiver.
  • Loading branch information
yvgopal authored and nemakam committed Oct 11, 2017
1 parent 9fa3ea1 commit d79e75c
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 119 deletions.
Expand Up @@ -353,7 +353,7 @@ public CompletableFuture<IMessage> receiveAsync() {

@Override
public CompletableFuture<IMessage> receiveAsync(Duration serverWaitTime) {
CompletableFuture<IMessage> receiveFuture = this.internalReceiver.receiveAsync(1, serverWaitTime).thenApplyAsync(c ->
return this.internalReceiver.receiveAsync(1, serverWaitTime).thenApplyAsync(c ->
{
if (c == null)
return null;
Expand All @@ -362,8 +362,6 @@ else if (c.isEmpty())
else
return MessageConverter.convertAmqpMessageToBrokeredMessage(c.toArray(new MessageWithDeliveryTag[0])[0]);
});

return this.filterLockExpiredMessage(receiveFuture, serverWaitTime);
}

@Override
Expand All @@ -373,7 +371,7 @@ public CompletableFuture<Collection<IMessage>> receiveBatchAsync(int maxMessageC

@Override
public CompletableFuture<Collection<IMessage>> receiveBatchAsync(int maxMessageCount, Duration serverWaitTime) {
CompletableFuture<Collection<IMessage>> receiveFuture = this.internalReceiver.receiveAsync(maxMessageCount, serverWaitTime).thenApplyAsync(c ->
return this.internalReceiver.receiveAsync(maxMessageCount, serverWaitTime).thenApplyAsync(c ->
{
if (c == null)
return null;
Expand All @@ -382,8 +380,6 @@ else if (c.isEmpty())
else
return convertAmqpMessagesWithDeliveryTagsToBrokeredMessages(c);
});

return this.filterLockExpiredMessages(receiveFuture, maxMessageCount, serverWaitTime);
}

@Override
Expand Down Expand Up @@ -500,116 +496,6 @@ private void ensurePeekLockReceiveMode() {
throw new UnsupportedOperationException("Operations Complete/Abandon/DeadLetter/Defer cannot be called on a receiver opened in ReceiveAndDelete mode.");
}
}

private boolean isMessageLockExpired(IMessage msg)
{
return msg.getLockedUntilUtc().isBefore(Instant.now());
}

private CompletableFuture<IMessage> filterLockExpiredMessage(CompletableFuture<IMessage> receivedFuture, Duration serverWaitTime)
{
if(this.receiveMode == ReceiveMode.RECEIVEANDDELETE)
{
return receivedFuture;
}
else
{
Instant startTime = Instant.now();
return receivedFuture.thenCompose((msg) -> {
if(msg == null)
{
return receivedFuture;
}
else
{
if(isMessageLockExpired(msg))
{
// Message lock already expired. Receive another message
TRACE_LOGGER.warn("Lock of the prefetched message with sequence number '{}' and id '{}' from '{}' already expired", msg.getSequenceNumber(), msg.getMessageId(), this.getEntityPath());
Duration remainingWaitTime = serverWaitTime.minus(Duration.between(startTime, Instant.now()));
if(remainingWaitTime.isNegative() || remainingWaitTime.isZero())
{
return CompletableFuture.completedFuture(null);
}
else
{
TRACE_LOGGER.debug("Ignored the lock exipred message and receiving again from '{}'", this.getEntityPath());
return this.receiveAsync(remainingWaitTime);
}
}
else
{
return CompletableFuture.completedFuture(msg);
}
}
});
}
}

private CompletableFuture<Collection<IMessage>> filterLockExpiredMessages(CompletableFuture<Collection<IMessage>> receivedFuture, int maxMessageCount, Duration serverWaitTime)
{
if(this.receiveMode == ReceiveMode.RECEIVEANDDELETE)
{
return receivedFuture;
}
else
{
Instant startTime = Instant.now();
return receivedFuture.thenCompose((messages) -> {
if(messages == null || messages.size() == 0)
{
return receivedFuture;
}
else
{
boolean areMessagesRemoved = false;
Iterator<IMessage> msgIterator = messages.iterator();
while(msgIterator.hasNext())
{
IMessage msg = msgIterator.next();
if(isMessageLockExpired(msg))
{
// Message lock already expired. remove it
TRACE_LOGGER.warn("Lock of the prefetched message with sequence number '{}' and id '{}' from '{}' already expired. Removing it from the list of messages returned to the caller.", msg.getSequenceNumber(), msg.getMessageId(), this.getEntityPath());
msgIterator.remove();
areMessagesRemoved = true;
}
else
{
// Break the loop. As next messages in the list are received from the entity after current message and are definitely not expired. No need to check them
break;
}
}

if(areMessagesRemoved)
{
if(messages.size() > 0)
{
// There are some messages still in the list.. Just return them to the caller
return CompletableFuture.completedFuture(messages);
}
else
{
Duration remainingWaitTime = serverWaitTime.minus(Duration.between(startTime, Instant.now()));
if(remainingWaitTime.isNegative() || remainingWaitTime.isZero())
{
return CompletableFuture.completedFuture(null);
}
else
{
TRACE_LOGGER.debug("All messages in the received list are lock expired. So receiving again from '{}'", this.getEntityPath());
return this.receiveBatchAsync(maxMessageCount, remainingWaitTime);
}
}
}
else
{
return receivedFuture;
}
}
});
}
}

private CompletableFuture<Boolean> checkIfValidRequestResponseLockTokenAsync(UUID lockToken) {
CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
Expand Down
Expand Up @@ -66,8 +66,8 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
{
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageReceiver.class);
private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5); // service closes link long before this timeout expires
private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(100); // Wakes up every 100 milliseconds
private static final Duration UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1000); // Wakes up every second
private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1); // Wakes up every 1 millisecond
private static final Duration UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(500); // Wakes up every 500 milliseconds
private static final Duration ZERO_TIMEOUT_APPROXIMATION = Duration.ofMillis(200);
private static final int CREDIT_FLOW_BATCH_SIZE = 50;// Arbitrarily chosen 50 to avoid sending too many flows in case prefetch count is large

Expand Down
Expand Up @@ -40,7 +40,7 @@ public static String generateSharedAccessSignatureToken(String sasKeyName, Strin
throw new IllegalArgumentException("validityInSeconds should be positive");
}

String validUntil = String.valueOf(Instant.now().getEpochSecond() + validityInSeconds);
String validUntil = String.valueOf(Instant.now().getEpochSecond() + validityInSeconds);
try
{
String utf8EncodingName = StandardCharsets.UTF_8.name();
Expand Down

0 comments on commit d79e75c

Please sign in to comment.