diff --git a/src/main/java/xqa/IngestBalancer.java b/src/main/java/xqa/IngestBalancer.java index 5a34af2..da1a982 100644 --- a/src/main/java/xqa/IngestBalancer.java +++ b/src/main/java/xqa/IngestBalancer.java @@ -1,13 +1,11 @@ package xqa; -import java.util.Map; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -58,12 +56,12 @@ public static void main(final String... args) { ingestBalancer.processCommandLine(args); ingestBalancer.start(); ingestBalancer.join(); - } catch (CommandLineException | InterruptedException | ParseException exception) { + } catch (InterruptedException | ParseException exception) { logger.error(exception.getMessage()); } } - public void processCommandLine(final String... args) throws ParseException, CommandLineException { + public void processCommandLine(final String... args) throws ParseException { final Options options = new Options(); options.addOption("message_broker_host", true, "i.e. xqa-message-broker"); @@ -85,14 +83,11 @@ public void processCommandLine(final String... args) throws ParseException, Comm setConfigurationValues(options, commandLineParser.parse(options, args)); } - private void setConfigurationValues(final Options options, final CommandLine commandLine) throws CommandLineException { + private void setConfigurationValues(final Options options, final CommandLine commandLine) { if (commandLine.hasOption("message_broker_host")) { messageBrokerHost = commandLine.getOptionValue("message_broker_host"); logger.info("message_broker_host=" + messageBrokerHost); } -// else { -// showUsage(options); -// } messageBrokerPort = Integer.parseInt(commandLine.getOptionValue("message_broker_port", "5672")); messageBrokerUsername = commandLine.getOptionValue("message_broker_username", "admin"); @@ -109,23 +104,10 @@ private void setConfigurationValues(final Options options, final CommandLine com insertThreadWait = Integer.parseInt(commandLine.getOptionValue("insert_thread_wait", "60000")); logger.info("insert_thread_wait=" + insertThreadWait); - final Map env = System.getenv(); - if (env.get("POOL_SIZE") != null) { - poolSize = Integer.parseInt(env.get("POOL_SIZE")); - } else { - poolSize = Integer.parseInt(commandLine.getOptionValue("pool_size", "4")); - } + poolSize = Integer.parseInt(commandLine.getOptionValue("pool_size", "4")); logger.info("pool_size=" + poolSize); } - /* - private void showUsage(final Options options) throws CommandLineException { - HelpFormatter formater = new HelpFormatter(); - formater.printHelp("IngestBalancer", options); - throw new IngestBalancer.CommandLineException(); - } - */ - private void initialiseIngestPool() { final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("InserterThread-%d").setDaemon(true) .build(); @@ -143,8 +125,6 @@ public void run() { } messageBroker.close(); - } catch (InterruptedException | JMSException exception) { - logger.error(exception.getMessage()); } catch (Exception exception) { logger.error(exception.getMessage()); } finally { @@ -186,10 +166,4 @@ public void onMessage(final Message message) { logger.error(exception.getMessage()); } } - - @SuppressWarnings("serial") - public class CommandLineException extends Exception { - public CommandLineException() { - } - } } diff --git a/src/test/java/xqa/integration/IngestBalancerTest.java b/src/test/java/xqa/integration/IngestBalancerTest.java index 80102fe..13cca2c 100644 --- a/src/test/java/xqa/integration/IngestBalancerTest.java +++ b/src/test/java/xqa/integration/IngestBalancerTest.java @@ -59,6 +59,36 @@ void singleIngest() throws Exception { messageBroker.close(); } + @Test + void noResponseFromShard() throws Exception { + final IngestBalancer ingestBalancer = new IngestBalancer(); + ingestBalancer.processCommandLine(new String[]{ + "-message_broker_host", + "0.0.0.0", + "-pool_size", + "3", + "-insert_thread_wait", + "500"}); + ingestBalancer.start(); + + final MessageBroker messageBroker = new MessageBroker( + "0.0.0.0", + 5672, + "admin", + "admin", + 3); + + sendIngestMessage(ingestBalancer.destinationIngest, messageBroker); + + Thread.sleep(2000); + + sendStopMessage(ingestBalancer.destinationCmdStop, messageBroker); + + ingestBalancer.join(); + + messageBroker.close(); + } + private void sendIngestMessage(final String destination, final MessageBroker messageBroker) throws JMSException, IOException, MessageBroker.MessageBrokerException {