Permalink
Browse files

Fixed termination to consume last message rather that read it at

the beginning of the next execution of the consumer.
  • Loading branch information...
1 parent ffa4e85 commit 90e2f85d9f03d6afab524f59212c08f9cefe2a0d schuchert@yahoo.com committed Jul 10, 2013
Showing with 17 additions and 15 deletions.
  1. +2 −1 README
  2. +15 −14 consumer/src/main/java/net/timico/messaging/App.java
View
3 README
@@ -15,4 +15,5 @@ To use:
You can continue to execute produce.sh. When you want to stop the madness,
you can hit ^c in consumer and mq shells.
-Alternatively, you can try using end.sh in the producer directory.
+Alternatively, you can use end.sh in the producer directory and hit ^c
+in the mq directory.
@@ -20,7 +20,7 @@
private Connection connection;
private Session session;
private MessageConsumer consumer;
- private boolean justStarted = true;
+ private volatile boolean shutdown = false;
public static void main( String[] args )
{
@@ -30,18 +30,20 @@ public static void main( String[] args )
public void run()
{
- try
- {
+ try {
ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
+ while(!shutdown) {
+ Thread.sleep(1000);
+ }
+ System.exit(0);
}
- catch (Exception e)
- {
+ catch (Exception e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
@@ -54,15 +56,14 @@ public void onMessage(Message message)
if (message instanceof TextMessage)
{
TextMessage txtMessage = (TextMessage)message;
- System.out.println("Message received: " + txtMessage.getText());
- if("terminate".equals(txtMessage.getText())) {
- if(justStarted) {
- System.out.println("ignoring terminate from last shutdown");
- justStarted = false;
- } else {
- System.out.println("Shutting down");
- System.exit(0);
- }
+ String body = txtMessage.getText();
+ if(!body.startsWith("=")) {
+ System.out.printf("Message received: ");
+ }
+ System.out.println(body);
+ if("terminate".equals(body)) {
+ System.out.println("Shutting down");
+ shutdown = true;
}
}
else

0 comments on commit 90e2f85

Please sign in to comment.