Skip to content

Commit

Permalink
JAMES-3454 Make RegistrationQueueName immutable
Browse files Browse the repository at this point in the history
We rely on an already known queue name, based in the EventBusId
  • Loading branch information
chibenwa committed Nov 24, 2020
1 parent da91c9b commit aff9981
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

import org.apache.james.backends.rabbitmq.ReceiverProvider;
Expand Down Expand Up @@ -76,7 +75,6 @@ public class KeyRegistrationHandler implements SimpleConnectionPool.Reconnection
private final MailboxListenerExecutor mailboxListenerExecutor;
private final RetryBackoffConfiguration retryBackoff;
private Optional<Disposable> receiverSubscriber;
private AtomicBoolean registrationQueueInitialized = new AtomicBoolean(false);

KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer,
Sender sender, ReceiverProvider receiverProvider,
Expand All @@ -90,7 +88,7 @@ public class KeyRegistrationHandler implements SimpleConnectionPool.Reconnection
this.receiver = receiverProvider.createReceiver();
this.mailboxListenerExecutor = mailboxListenerExecutor;
this.retryBackoff = retryBackoff;
this.registrationQueue = new RegistrationQueueName();
this.registrationQueue = new RegistrationQueueName(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString());
this.registrationBinder = new RegistrationBinder(sender, registrationQueue);
this.receiverSubscriber = Optional.empty();

Expand Down Expand Up @@ -120,12 +118,6 @@ private void declareQueue(Sender sender) {
.timeout(TOPOLOGY_CHANGES_TIMEOUT)
.map(AMQP.Queue.DeclareOk::getQueue)
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()))
.doOnSuccess(queueName -> {
if (!registrationQueueInitialized.get()) {
registrationQueue.initialize(queueName);
registrationQueueInitialized.set(true);
}
})
.block();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,14 @@

package org.apache.james.mailbox.events;

import java.util.Optional;

import com.google.common.base.Preconditions;

class RegistrationQueueName {
private Optional<String> queueName;

RegistrationQueueName() {
this.queueName = Optional.empty();
}
private final String queueName;

void initialize(String queueName) {
Preconditions.checkNotNull(queueName);
Preconditions.checkState(!this.queueName.isPresent(), "'queueName' must be empty for initializing");
this.queueName = Optional.of(queueName);
RegistrationQueueName(String queueName) {
this.queueName = queueName;
}

String asString() {
return queueName.orElseThrow(() -> new IllegalStateException("'queueName' is not yet initialized"));
return queueName;
}
}

0 comments on commit aff9981

Please sign in to comment.