Skip to content

Commit

Permalink
improve coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshnsears committed Dec 23, 2018
1 parent 2fb4523 commit c281a85
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 30 deletions.
34 changes: 4 additions & 30 deletions src/main/java/xqa/IngestBalancer.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package xqa;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
Expand Down Expand Up @@ -58,12 +56,12 @@ public static void main(final String... args) {
ingestBalancer.processCommandLine(args);
ingestBalancer.start();
ingestBalancer.join();
} catch (CommandLineException | InterruptedException | ParseException exception) {
} catch (InterruptedException | ParseException exception) {
logger.error(exception.getMessage());
}
}

public void processCommandLine(final String... args) throws ParseException, CommandLineException {
public void processCommandLine(final String... args) throws ParseException {
final Options options = new Options();

options.addOption("message_broker_host", true, "i.e. xqa-message-broker");
Expand All @@ -85,14 +83,11 @@ public void processCommandLine(final String... args) throws ParseException, Comm
setConfigurationValues(options, commandLineParser.parse(options, args));
}

private void setConfigurationValues(final Options options, final CommandLine commandLine) throws CommandLineException {
private void setConfigurationValues(final Options options, final CommandLine commandLine) {
if (commandLine.hasOption("message_broker_host")) {
messageBrokerHost = commandLine.getOptionValue("message_broker_host");
logger.info("message_broker_host=" + messageBrokerHost);
}
// else {
// showUsage(options);
// }

messageBrokerPort = Integer.parseInt(commandLine.getOptionValue("message_broker_port", "5672"));
messageBrokerUsername = commandLine.getOptionValue("message_broker_username", "admin");
Expand All @@ -109,23 +104,10 @@ private void setConfigurationValues(final Options options, final CommandLine com
insertThreadWait = Integer.parseInt(commandLine.getOptionValue("insert_thread_wait", "60000"));
logger.info("insert_thread_wait=" + insertThreadWait);

final Map<String, String> env = System.getenv();
if (env.get("POOL_SIZE") != null) {
poolSize = Integer.parseInt(env.get("POOL_SIZE"));
} else {
poolSize = Integer.parseInt(commandLine.getOptionValue("pool_size", "4"));
}
poolSize = Integer.parseInt(commandLine.getOptionValue("pool_size", "4"));
logger.info("pool_size=" + poolSize);
}

/*
private void showUsage(final Options options) throws CommandLineException {
HelpFormatter formater = new HelpFormatter();
formater.printHelp("IngestBalancer", options);
throw new IngestBalancer.CommandLineException();
}
*/

private void initialiseIngestPool() {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("InserterThread-%d").setDaemon(true)
.build();
Expand All @@ -143,8 +125,6 @@ public void run() {
}

messageBroker.close();
} catch (InterruptedException | JMSException exception) {
logger.error(exception.getMessage());
} catch (Exception exception) {
logger.error(exception.getMessage());
} finally {
Expand Down Expand Up @@ -186,10 +166,4 @@ public void onMessage(final Message message) {
logger.error(exception.getMessage());
}
}

@SuppressWarnings("serial")
public class CommandLineException extends Exception {
public CommandLineException() {
}
}
}
30 changes: 30 additions & 0 deletions src/test/java/xqa/integration/IngestBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,36 @@ void singleIngest() throws Exception {
messageBroker.close();
}

@Test
void noResponseFromShard() throws Exception {
final IngestBalancer ingestBalancer = new IngestBalancer();
ingestBalancer.processCommandLine(new String[]{
"-message_broker_host",
"0.0.0.0",
"-pool_size",
"3",
"-insert_thread_wait",
"500"});
ingestBalancer.start();

final MessageBroker messageBroker = new MessageBroker(
"0.0.0.0",
5672,
"admin",
"admin",
3);

sendIngestMessage(ingestBalancer.destinationIngest, messageBroker);

Thread.sleep(2000);

sendStopMessage(ingestBalancer.destinationCmdStop, messageBroker);

ingestBalancer.join();

messageBroker.close();
}

private void sendIngestMessage(final String destination,
final MessageBroker messageBroker)
throws JMSException, IOException, MessageBroker.MessageBrokerException {
Expand Down

0 comments on commit c281a85

Please sign in to comment.