Skip to content

Commit

Permalink
DURACLOUD-1257: Update methods to use new RabbitmqConfig class from d…
Browse files Browse the repository at this point in the history
…uracloud-db (duracloud#141)
  • Loading branch information
fozboz authored and Andy Foster committed Jul 22, 2021
1 parent face303 commit 80b7615
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 42 deletions.
Expand Up @@ -28,8 +28,8 @@
* @author Shibo Liu
* Feb 29, 2020
*/
public class RabbitMQSubscriptionManager implements SubscriptionManager {
private Logger log = LoggerFactory.getLogger(RabbitMQSubscriptionManager.class);
public class RabbitmqSubscriptionManager implements SubscriptionManager {
private Logger log = LoggerFactory.getLogger(RabbitmqSubscriptionManager.class);
private Channel mqChannel;
private String mqHost;
private Integer mqPort;
Expand All @@ -42,7 +42,7 @@ public class RabbitMQSubscriptionManager implements SubscriptionManager {
private String consumerName;
private boolean initialized = false;
private List<MessageListener> messageListeners = new ArrayList<>();
public RabbitMQSubscriptionManager(String host, Integer port, String vhost,
public RabbitmqSubscriptionManager(String host, Integer port, String vhost,
String exchange, String username, String password,
String queueName) {
mqHost = host;
Expand Down
Expand Up @@ -22,11 +22,12 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.duracloud.account.db.model.GlobalProperties;
import org.duracloud.account.db.model.RabbitmqConfig;
import org.duracloud.account.db.repo.GlobalPropertiesRepo;
import org.duracloud.common.cache.AccountComponentCache;
import org.duracloud.common.changenotifier.MessageListener;
import org.duracloud.common.changenotifier.NotifierType;
import org.duracloud.common.changenotifier.RabbitMQSubscriptionManager;
import org.duracloud.common.changenotifier.RabbitmqSubscriptionManager;
import org.duracloud.common.changenotifier.SnsSubscriptionManager;
import org.duracloud.common.changenotifier.SubscriptionManager;
import org.duracloud.common.error.DuraCloudRuntimeException;
Expand Down Expand Up @@ -59,13 +60,14 @@ public SubscriptionManager subscriptionManager(GlobalPropertiesRepo globalProper

if (notifierType == NotifierType.RABBITMQ) {
//RabbitMQ
RabbitmqConfig rabbitmqConfig = props.getRabbitmqConfig();
subscriptionManager =
new RabbitMQSubscriptionManager(props.getRabbitmqHost(),
props.getRabbitmqPort(),
props.getRabbitmqVhost(),
new RabbitmqSubscriptionManager(rabbitmqConfig.getHost(),
rabbitmqConfig.getPort(),
rabbitmqConfig.getVhost(),
props.getRabbitmqExchange(),
props.getRabbitmqUsername(),
props.getRabbitmqPassword(),
rabbitmqConfig.getUsername(),
rabbitmqConfig.getPassword(),
queueName);
} else {
//SNS
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.duracloud.account.db.model.GlobalProperties;
import org.duracloud.account.db.model.RabbitmqConfig;
import org.duracloud.account.db.repo.GlobalPropertiesRepo;
import org.duracloud.common.changenotifier.AccountChangeNotifier;
import org.duracloud.common.changenotifier.NotifierType;
Expand All @@ -34,7 +35,7 @@ public class AccountChangeNotifierImpl implements AccountChangeNotifier {

private AmazonSNS snsClient;

private Channel rabbitMqChannel;
private Channel rabbitmqChannel;

private String rabbitmqExchange;

Expand Down Expand Up @@ -62,18 +63,25 @@ public AccountChangeNotifierImpl(GlobalPropertiesRepo globalPropertiesRepo) {
log.info("Notifier-Type: {}", notifierType.toString());
if (notifierType == NotifierType.RABBITMQ) {
rabbitmqExchange = props.getRabbitmqExchange();
rabbitmqVhost = props.getRabbitmqVhost();

RabbitmqConfig rabbitmqConfig = props.getRabbitmqConfig();
String rabbitmqHost = rabbitmqConfig.getHost();
Integer rabbitmqPort = rabbitmqConfig.getPort();
rabbitmqVhost = rabbitmqConfig.getVhost();
String rabbitmqUsername = rabbitmqConfig.getUsername();
String rabbitmqPassword = rabbitmqConfig.getPassword();

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(props.getRabbitmqUsername());
factory.setPassword(props.getRabbitmqPassword());
factory.setHost(rabbitmqHost);
factory.setPort(rabbitmqPort);
factory.setVirtualHost(rabbitmqVhost);
factory.setHost(props.getRabbitmqHost());
factory.setPort(props.getRabbitmqPort());
log.info("RabbitMQ Host: {}, Vhost: {}, Exchange: {}", props.getRabbitmqHost(),
rabbitmqVhost, rabbitmqExchange);
factory.setUsername(rabbitmqUsername);
factory.setPassword(rabbitmqPassword);
log.info("RabbitMQ Host: {}, Vhost: {}, Exchange: {}",
rabbitmqHost, rabbitmqVhost, rabbitmqExchange);
try {
Connection conn = factory.newConnection();
rabbitMqChannel = conn.createChannel();
rabbitmqChannel = conn.createChannel();
} catch (Exception e) {
log.error("Failed to connect to RabbitMQ because: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -107,7 +115,7 @@ private void publish(EventType eventType, String account) {
try {
log.debug("publishing event={}", event);
if (notifierType == NotifierType.RABBITMQ) {
rabbitMqChannel.basicPublish(rabbitmqExchange, "", null,
rabbitmqChannel.basicPublish(rabbitmqExchange, "", null,
AccountChangeEvent.serialize(event).getBytes());
log.info("published event via RabbitMQ, vhost={}, exchange={}, event={}",
rabbitmqVhost, rabbitmqExchange, event);
Expand Down
Expand Up @@ -39,8 +39,8 @@
* @author Shibo Liu
* Date: 07/03/19
*/
public class RabbitMQTaskQueue implements TaskQueue {
private static Logger log = LoggerFactory.getLogger(RabbitMQTaskQueue.class);
public class RabbitmqTaskQueue implements TaskQueue {
private static Logger log = LoggerFactory.getLogger(RabbitmqTaskQueue.class);

private Channel mqChannel;
private String queueName;
Expand All @@ -57,7 +57,7 @@ public enum MsgProp {
* Creates RabbitMQ task queue, a RabbitMQ server is needed and a DIRECT exchange must be created and bound to
* the queue name provided (routing key must be the same as the queue name)
*/
public RabbitMQTaskQueue(String host, Integer port, String vhost, String exchange, String username, String password,
public RabbitmqTaskQueue(String host, Integer port, String vhost, String exchange, String username, String password,
String queueName) {
try {
this.exchangeName = exchange;
Expand All @@ -79,7 +79,7 @@ public RabbitMQTaskQueue(String host, Integer port, String vhost, String exchang
}
}

public RabbitMQTaskQueue(Connection conn, String exchange, String queueName) {
public RabbitmqTaskQueue(Connection conn, String exchange, String queueName) {
try {
this.exchangeName = exchange;
mqChannel = conn.createChannel();
Expand Down
Expand Up @@ -37,10 +37,10 @@
* Date: 2020.02.12
*/
@RunWith(EasyMockRunner.class)
public class RabbitMQTaskQueueTest extends EasyMockSupport {
public class RabbitmqTaskQueueTest extends EasyMockSupport {

private Connection connection;
private RabbitMQTaskQueue queue;
private RabbitmqTaskQueue queue;
private Channel channel;
private String queueName = "test-queue";
private String exchange = "test-exchange";
Expand All @@ -49,7 +49,7 @@ public class RabbitMQTaskQueueTest extends EasyMockSupport {
public void setup() {
}

private void setupRabbitMQClient() throws IOException {
private void setupRabbitmqClient() throws IOException {
connection = createMock("Connection", Connection.class);
channel = createMock("Channel", Channel.class);
InetAddress address = InetAddress.getByName("127.0.0.1");
Expand All @@ -68,12 +68,12 @@ public void tearDown() {
}

private void createSubject() {
queue = new RabbitMQTaskQueue(connection, exchange, queueName);
queue = new RabbitmqTaskQueue(connection, exchange, queueName);
}

@Test
public void testMarshallTask() throws IOException {
setupRabbitMQClient();
setupRabbitmqClient();
replayAll();
createSubject();

Expand All @@ -83,11 +83,11 @@ public void testMarshallTask() throws IOException {

Task task = queue.marshallTask(msgBody, 0, queueName, exchange);

assertThat(task.getProperty(RabbitMQTaskQueue.MsgProp.DELIVERY_TAG.name()),
assertThat(task.getProperty(RabbitmqTaskQueue.MsgProp.DELIVERY_TAG.name()),
is(equalTo("0")));
assertThat(task.getProperty(RabbitMQTaskQueue.MsgProp.EXCHANGE.name()),
assertThat(task.getProperty(RabbitmqTaskQueue.MsgProp.EXCHANGE.name()),
is(equalTo("test-exchange")));
assertThat(task.getProperty(RabbitMQTaskQueue.MsgProp.ROUTING_KEY.name()),
assertThat(task.getProperty(RabbitmqTaskQueue.MsgProp.ROUTING_KEY.name()),
is(equalTo("test-queue")));
assertThat(task.getType(), is(equalTo(Task.Type.DUP)));
assertThat(task.getProperty("key1"), is(equalTo("value1")));
Expand All @@ -97,7 +97,7 @@ public void testMarshallTask() throws IOException {

@Test
public void testUnmarshallTask() throws IOException {
setupRabbitMQClient();
setupRabbitmqClient();
replayAll();
createSubject();

Expand All @@ -115,7 +115,7 @@ public void testUnmarshallTask() throws IOException {

@Test
public void testPut() throws IOException {
setupRabbitMQClient();
setupRabbitmqClient();
replayAll();
createSubject();

Expand All @@ -132,7 +132,7 @@ protected Task createSampleAuditTask(long time) {

@Test
public void testPutMuliple() throws IOException {
setupRabbitMQClient();
setupRabbitmqClient();
replayAll();
createSubject();
Set<Task> tasks = new HashSet<>();
Expand Down
Expand Up @@ -8,6 +8,7 @@
package org.duracloud.durastore.util;

import org.duracloud.account.db.model.DuracloudMill;
import org.duracloud.account.db.model.RabbitmqConfig;
import org.duracloud.account.db.repo.DuracloudMillRepo;
import org.duracloud.common.queue.QueueType;
import org.duracloud.storage.domain.AuditConfig;
Expand All @@ -25,15 +26,16 @@ public AuditConfigBuilder(DuracloudMillRepo millRepo) {
public AuditConfig build() {
AuditConfig config = new AuditConfig();
DuracloudMill mill = millRepo.findAll().get(0);
RabbitmqConfig rmqConf = mill.getRabbitmqConfig();
config.setAuditLogSpaceId(mill.getAuditLogSpaceId());
config.setAuditQueueName(mill.getAuditQueue());
config.setQueueType(QueueType.fromString(mill.getQueueType()));
config.setRabbitmqHost(mill.getRabbitmqHost());
config.setRabbitmqPort(mill.getRabbitmqPort());
config.setRabbitmqVhost(mill.getRabbitmqVhost());
config.setRabbitmqHost(rmqConf.getHost());
config.setRabbitmqPort(rmqConf.getPort());
config.setRabbitmqVhost(rmqConf.getVhost());
config.setRabbitmqExchange(mill.getRabbitmqExchange());
config.setRabbitmqUsername(mill.getRabbitmqUsername());
config.setRabbitmqPassword(mill.getRabbitmqPassword());
config.setRabbitmqUsername(rmqConf.getUsername());
config.setRabbitmqPassword(rmqConf.getPassword());
return config;
}

Expand Down
Expand Up @@ -19,7 +19,7 @@
import org.duracloud.common.queue.TaskQueue;
import org.duracloud.common.queue.aws.SQSTaskQueue;
import org.duracloud.common.queue.noop.NoopTaskQueue;
import org.duracloud.common.queue.rabbitmq.RabbitMQTaskQueue;
import org.duracloud.common.queue.rabbitmq.RabbitmqTaskQueue;
import org.duracloud.common.rest.DuraCloudRequestContextUtil;
import org.duracloud.common.util.UserUtil;
import org.duracloud.durastore.test.MockRetryStorageProvider;
Expand Down Expand Up @@ -150,7 +150,7 @@ private void configureAuditQueue(AuditConfig auditConfig) {
String password = auditConfig.getRabbitmqPassword();
log.info("Configuring Audit queue with host: {}, port: {}, vhost: {}, exchange: {}, queue: {}",
host, port, vhost, exchange, queueName);
this.auditQueue = new RabbitMQTaskQueue(host, port, vhost, exchange, username, password, queueName);
this.auditQueue = new RabbitmqTaskQueue(host, port, vhost, exchange, username, password, queueName);
} else {
//AWS - SQS
this.auditQueue = new SQSTaskQueue(queueName);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -232,7 +232,7 @@
<instrumentedTests>**/*Test*__*.class</instrumentedTests>
<innerClasses>**/*$*</innerClasses>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<duracloud.db.version>7.0.0</duracloud.db.version>
<duracloud.db.version>7.1.0-SNAPSHOT</duracloud.db.version>
<duraspace-codestyle.version>1.1.0</duraspace-codestyle.version>
<org.springframework.version>4.2.5.RELEASE</org.springframework.version>
<org.springframework.security.version>4.0.4.RELEASE</org.springframework.security.version>
Expand Down

0 comments on commit 80b7615

Please sign in to comment.