Skip to content

Commit

Permalink
pmd
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshnsears committed Dec 23, 2018
1 parent fec0d21 commit 54a08b5
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 82 deletions.
49 changes: 21 additions & 28 deletions src/main/java/xqa/IngestBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.*;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Expand Down Expand Up @@ -42,7 +39,7 @@ public class IngestBalancer extends Thread implements MessageListener {
public int messageBrokerRetryAttempts;
public int insertThreadWait;
public int insertThreadSecondaryWait;
private boolean stop = false;
private boolean stop;
private ThreadPoolExecutor ingestPoolExecutor;

public IngestBalancer() {
Expand All @@ -51,22 +48,19 @@ public IngestBalancer() {
setName("IngestBalancer");
}

public static void main(String[] args) {
public static void main(final String... args) {
try {
IngestBalancer ingestBalancer = new IngestBalancer();
final IngestBalancer ingestBalancer = new IngestBalancer();
ingestBalancer.processCommandLine(args);
ingestBalancer.start();
ingestBalancer.join();
} catch (CommandLineException exception) {
System.exit(0);
} catch (Exception exception) {
} catch (CommandLineException | InterruptedException | ParseException exception) {
logger.error(exception.getMessage());
System.exit(0);
}
}

public void processCommandLine(String[] args) throws ParseException, CommandLineException {
Options options = new Options();
public void processCommandLine(final String... args) throws ParseException, CommandLineException {
final Options options = new Options();

options.addOption("message_broker_host", true, "i.e. xqa-message-broker");
options.addOption("message_broker_port", true, "i.e. 5672");
Expand All @@ -83,18 +77,18 @@ public void processCommandLine(String[] args) throws ParseException, CommandLine
options.addOption("insert_thread_wait", true, "i.e. 10000"); // 10 seconds
options.addOption("insert_thread_secondary_wait", true, "i.e. 1000"); // 1 second

CommandLineParser commandLineParser = new DefaultParser();
final CommandLineParser commandLineParser = new DefaultParser();
setConfigurationValues(options, commandLineParser.parse(options, args));
}

private void setConfigurationValues(Options options, CommandLine commandLine) throws CommandLineException {
private void setConfigurationValues(final Options options, final CommandLine commandLine) throws CommandLineException {
if (commandLine.hasOption("message_broker_host")) {
messageBrokerHost = commandLine.getOptionValue("message_broker_host");
logger.info("message_broker_host=" + messageBrokerHost);
} else {
//showUsage(options);
messageBrokerHost = commandLine.getOptionValue("message_broker_host", "127.0.0.1");
}
// else {
// showUsage(options);
// }

messageBrokerPort = Integer.parseInt(commandLine.getOptionValue("message_broker_port", "5672"));
messageBrokerUsername = commandLine.getOptionValue("message_broker_username", "admin");
Expand All @@ -111,7 +105,7 @@ private void setConfigurationValues(Options options, CommandLine commandLine) th
insertThreadWait = Integer.parseInt(commandLine.getOptionValue("insert_thread_wait", "60000"));
logger.info("insert_thread_wait=" + insertThreadWait);

Map<String, String> env = System.getenv();
final Map<String, String> env = System.getenv();
if (env.get("POOL_SIZE") != null) {
poolSize = Integer.parseInt(env.get("POOL_SIZE"));
} else {
Expand All @@ -129,7 +123,7 @@ private void showUsage(final Options options) throws CommandLineException {
*/

private void initialiseIngestPool() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("InserterThread-%d").setDaemon(true)
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("InserterThread-%d").setDaemon(true)
.build();

ingestPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory);
Expand All @@ -145,9 +139,10 @@ public void run() {
}

messageBroker.close();
} catch (InterruptedException | JMSException exception) {
logger.error(exception.getMessage());
} catch (Exception exception) {
logger.error(exception.getMessage());
System.exit(1);
} finally {
ingestPoolExecutor.shutdown();
}
Expand All @@ -161,18 +156,18 @@ private void registerListeners() throws Exception {
messageBrokerPassword,
messageBrokerRetryAttempts);

Destination cmdStop = messageBroker.getSession().createTopic(destinationCmdStop);
MessageConsumer cmdStopConsumer = messageBroker.getSession().createConsumer(cmdStop);
final Destination cmdStop = messageBroker.getSession().createTopic(destinationCmdStop);
final MessageConsumer cmdStopConsumer = messageBroker.getSession().createConsumer(cmdStop);
cmdStopConsumer.setMessageListener(this);

Destination ingest = messageBroker.getSession().createQueue(destinationIngest);
MessageConsumer ingestConsumer = messageBroker.getSession().createConsumer(ingest);
final Destination ingest = messageBroker.getSession().createQueue(destinationIngest);
final MessageConsumer ingestConsumer = messageBroker.getSession().createConsumer(ingest);
ingestConsumer.setMessageListener(this);

logger.info("listeners registered");
}

public void onMessage(Message message) {
public void onMessage(final Message message) {
try {
if (message.getJMSDestination().toString().equals(destinationCmdStop)) {
logger.info(MessageLogger.log(MessageLogger.Direction.RECEIVE, message, false));
Expand All @@ -185,8 +180,6 @@ public void onMessage(Message message) {
}
} catch (Exception exception) {
logger.error(exception.getMessage());
exception.printStackTrace();
System.exit(1);
}
}

Expand Down
37 changes: 18 additions & 19 deletions src/main/java/xqa/InserterThread.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package xqa;

import java.io.UnsupportedEncodingException;
import java.text.MessageFormat;
import java.util.List;
import java.util.UUID;
Expand All @@ -20,12 +19,12 @@

class InserterThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(InserterThread.class);
static int threadInstances = 0;
private static int threadInstances;
private final IngestBalancer ingestBalancer;
private final Message ingestMessage;
private MessageBroker inserterThreadMessageBroker;

InserterThread(IngestBalancer ingestBalancer, Message ingestMessage) {
InserterThread(final IngestBalancer ingestBalancer, final Message ingestMessage) {
setName("InserterThread");
synchronized (this) {
this.ingestBalancer = ingestBalancer;
Expand All @@ -48,7 +47,7 @@ public void run() {

sendEventToMessageBroker("START");

Message smallestShard = findSmallestShard(askShardsForSize());
final Message smallestShard = findSmallestShard(askShardsForSize());
if (smallestShard != null) {
insert(smallestShard);
}
Expand All @@ -60,14 +59,13 @@ public void run() {
}
} catch (Exception exception) {
logger.error(exception.getMessage());
System.exit(1);
}

logger.debug("++++++++++++");
}

private synchronized void sendEventToMessageBroker(final String state) throws Exception {
Message message = MessageMaker.createMessage(inserterThreadMessageBroker.getSession(),
final Message message = MessageMaker.createMessage(inserterThreadMessageBroker.getSession(),
inserterThreadMessageBroker.getSession().createQueue(ingestBalancer.destinationEvent),
UUID.randomUUID().toString(),
new Gson().toJson(new IngestBalancerEvent(ingestBalancer.serviceId, ingestMessage.getJMSCorrelationID(),
Expand All @@ -77,39 +75,40 @@ private synchronized void sendEventToMessageBroker(final String state) throws Ex
}

private synchronized List<Message> askShardsForSize() throws Exception {
TemporaryQueue sizeReplyToDestination = inserterThreadMessageBroker.createTemporaryQueue();
final TemporaryQueue sizeReplyToDestination = inserterThreadMessageBroker.createTemporaryQueue();

Message message = MessageMaker.createMessage(inserterThreadMessageBroker.getSession(),
final Message message = MessageMaker.createMessage(inserterThreadMessageBroker.getSession(),
inserterThreadMessageBroker.getSession().createTopic(ingestBalancer.destinationShardSize),
sizeReplyToDestination, UUID.randomUUID().toString(), "");
inserterThreadMessageBroker.sendMessage(message);

return getSizeResponses(inserterThreadMessageBroker, sizeReplyToDestination);
}

private synchronized List<Message> getSizeResponses(MessageBroker shardSizeMessageBroker,
TemporaryQueue sizeReplyToDestination) throws Exception {
private synchronized List<Message> getSizeResponses(final MessageBroker shardSizeMessageBroker,
final TemporaryQueue sizeReplyToDestination) throws Exception {

logger.debug(MessageFormat.format("{0}: START", ingestMessage.getJMSCorrelationID()));

List<Message> shardSizeResponses = shardSizeMessageBroker.receiveMessagesTemporaryQueue(sizeReplyToDestination,
final List<Message> shardSizeResponses = shardSizeMessageBroker.receiveMessagesTemporaryQueue(sizeReplyToDestination,
ingestBalancer.insertThreadWait, ingestBalancer.insertThreadSecondaryWait);

if (shardSizeResponses.size() == 0) {
if (shardSizeResponses.isEmpty()) {
logger.warn(MessageFormat.format("{0}: END: shardSizeResponses=None; subject={1}",
ingestMessage.getJMSCorrelationID(), ingestMessage.getJMSType()));

placeMessageBackOnOriginatingDestination();
} else
} else {
logger.debug(MessageFormat.format("{0}: END: shardSizeResponses={1}", ingestMessage.getJMSCorrelationID(),
shardSizeResponses.size()));
}

return shardSizeResponses;
}

private synchronized void placeMessageBackOnOriginatingDestination()
throws JMSException, UnsupportedEncodingException, MessageBroker.MessageBrokerException {
Message message =
throws JMSException, MessageBroker.MessageBrokerException {
final Message message =
MessageMaker.createMessage(
inserterThreadMessageBroker.getSession(),
inserterThreadMessageBroker.getSession().createQueue("xqa.ingest"),
Expand All @@ -124,13 +123,13 @@ private synchronized void placeMessageBackOnOriginatingDestination()
inserterThreadMessageBroker.sendMessage(message);
}

public Message findSmallestShard(List<Message> shardSizeResponses) throws Exception {
public Message findSmallestShard(final List<Message> shardSizeResponses) throws Exception {
Message smallestShard = null;

if (shardSizeResponses.size() > 0) {
if (!shardSizeResponses.isEmpty()) {
smallestShard = shardSizeResponses.get(0);

for (Message currentShard : shardSizeResponses) {
for (final Message currentShard : shardSizeResponses) {
if (Integer.valueOf(MessageMaker.getBody(smallestShard)) > Integer
.valueOf(MessageMaker.getBody(currentShard))) {
smallestShard = currentShard;
Expand All @@ -142,7 +141,7 @@ public Message findSmallestShard(List<Message> shardSizeResponses) throws Except
}

private synchronized void insert(final Message smallestShard) throws Exception {
Message message = MessageMaker.createMessage(inserterThreadMessageBroker.getSession(),
final Message message = MessageMaker.createMessage(inserterThreadMessageBroker.getSession(),
inserterThreadMessageBroker.getSession().createQueue(smallestShard.getJMSReplyTo().toString()),
ingestMessage.getJMSCorrelationID(), MessageMaker.getSubject(ingestMessage),
MessageMaker.getBody(ingestMessage));
Expand Down
17 changes: 9 additions & 8 deletions src/test/java/xqa/SmallestShardTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

import javax.jms.BytesMessage;
import javax.jms.Message;
Expand All @@ -19,28 +19,29 @@
class SmallestShardTest {
@Test
void findSmallestShard() throws Exception {
IngestBalancer ingestBalancer = new IngestBalancer();
final IngestBalancer ingestBalancer = new IngestBalancer();
ingestBalancer.processCommandLine(new String[]{"-message_broker_host", "127.0.0.1"});

InserterThread inserterThread = new InserterThread(ingestBalancer, mock(BytesMessage.class));
final InserterThread inserterThread = new InserterThread(ingestBalancer, mock(BytesMessage.class));

List<Message> shardSizeResponses = new Vector<>();
final List<Message> shardSizeResponses = new ArrayList<>();

JmsMessageFactory factory = new JmsTestMessageFactory();
final JmsMessageFactory factory = new JmsTestMessageFactory();

JmsBytesMessage big = factory.createBytesMessage();
final JmsBytesMessage big = factory.createBytesMessage();
big.writeBytes("10".getBytes());
shardSizeResponses.add(big);

JmsBytesMessage smallest = factory.createBytesMessage();
final JmsBytesMessage smallest = factory.createBytesMessage();
smallest.writeBytes("5".getBytes());
shardSizeResponses.add(smallest);

JmsBytesMessage bigger = factory.createBytesMessage();
final JmsBytesMessage bigger = factory.createBytesMessage();
bigger.writeBytes("30".getBytes());
shardSizeResponses.add(bigger);

assertEquals(
"unable to find smallest shard size reponse",
MessageMaker.getBody(smallest),
MessageMaker.getBody(inserterThread.findSmallestShard(shardSizeResponses)));
}
Expand Down
34 changes: 22 additions & 12 deletions src/test/java/xqa/integration/IngestBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,36 @@
class IngestBalancerTest {
@Test
void singleIngest() throws Exception {
IngestBalancer ingestBalancer = new IngestBalancer();
String messageBrokerHost = "0.0.0.0";
final IngestBalancer ingestBalancer = new IngestBalancer();
ingestBalancer.processCommandLine(new String[]{
"-message_broker_host", messageBrokerHost,
"-pool_size", "3",
"-insert_thread_wait", "10000"});
"-message_broker_host",
"0.0.0.0",
"-pool_size",
"3",
"-insert_thread_wait",
"10000"});
ingestBalancer.start();

MockShard mockShard = new MockShard();
final MockShard mockShard = new MockShard();
mockShard.start();

MessageBroker messageBroker = new MessageBroker(messageBrokerHost, 5672, "admin", "admin", 3);
final MessageBroker messageBroker = new MessageBroker(
"0.0.0.0",
5672,
"admin",
"admin",
3);

sendIngestMessage(ingestBalancer.destinationIngest, messageBroker);

while (mockShard.digestOfMostRecentMessage == null) {
Thread.sleep(1000);
}

assertEquals("192a0c3918e308c1374d57256b183045393c1cf9053a8614e9d7bb24b8261358", mockShard.digestOfMostRecentMessage);
assertEquals(
"192a0c3918e308c1374d57256b183045393c1cf9053a8614e9d7bb24b8261358",
mockShard.digestOfMostRecentMessage,
"digest does not match");

sendStopMessage(ingestBalancer.destinationCmdStop, messageBroker);

Expand All @@ -49,8 +59,8 @@ void singleIngest() throws Exception {
messageBroker.close();
}

private void sendIngestMessage(String destination,
MessageBroker messageBroker)
private void sendIngestMessage(final String destination,
final MessageBroker messageBroker)
throws JMSException, IOException, MessageBroker.MessageBrokerException {
messageBroker.sendMessage(MessageMaker.createMessage(
messageBroker.getSession(),
Expand All @@ -60,8 +70,8 @@ private void sendIngestMessage(String destination,
xmlFileContents()));
}

private void sendStopMessage(String destination,
MessageBroker messageBroker)
private void sendStopMessage(final String destination,
final MessageBroker messageBroker)
throws JMSException, UnsupportedEncodingException, MessageBroker.MessageBrokerException {
messageBroker.sendMessage(MessageMaker.createMessage(
messageBroker.getSession(),
Expand Down
Loading

0 comments on commit 54a08b5

Please sign in to comment.