Skip to content

Commit

Permalink
bugfix messaging with custom quenames
Browse files Browse the repository at this point in the history
  • Loading branch information
sboesebeck committed May 24, 2018
1 parent c4bdd2f commit 91528ff
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>de.caluga</groupId>
<artifactId>morphium</artifactId>
<version>3.2.1</version>
<version>3.2.3</version>
<packaging>jar</packaging>
<parent>
<groupId>org.sonatype.oss</groupId>
Expand Down
2 changes: 1 addition & 1 deletion src/de/caluga/morphium/messaging/Messaging.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private void lockAndProcess(Msg obj) {
} catch (InterruptedException e) {

}
obj=morphium.reread(obj);
obj=morphium.reread(obj,getCollectionName());
if (obj!=null && obj.getLockedBy()!=null && obj.getLockedBy().equals(id)) {
List<Msg> lst = new ArrayList<>();
lst.add(obj);
Expand Down
85 changes: 85 additions & 0 deletions test/de/caluga/test/mongo/suite/MessagingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,91 @@ public void mutlithreaddedMessagingPerformanceTest() throws Exception {
}



@Test
public void exclusiveMessageCustomQueueTest() throws Exception {
morphium.dropCollection(Msg.class);
morphium.dropCollection(Msg.class,"test",null);
morphium.dropCollection(Msg.class,"test2",null);
Messaging sender = new Messaging(morphium,"test", 100, false);
sender.start();
Messaging sender2 = new Messaging(morphium,"test2", 100, false);
sender2.start();

gotMessage1 = false;
gotMessage2 = false;
gotMessage3 = false;
gotMessage4 = false;

Messaging m1 = new Messaging(morphium, "test",100, false);
m1.addMessageListener((msg, m) -> {
gotMessage1 = true;
return null;
});
Messaging m2 = new Messaging(morphium, "test",100, false);
m2.addMessageListener((msg, m) -> {
gotMessage2 = true;
return null;
});
Messaging m3 = new Messaging(morphium, "test2",100, false);
m3.addMessageListener((msg, m) -> {
gotMessage3 = true;
return null;
});
Messaging m4 = new Messaging(morphium, "test2",100, false);
m4.addMessageListener((msg, m) -> {
gotMessage4 = true;
return null;
});

m1.start();
m2.start();
m3.start();
m4.start();

Msg m = new Msg();
m.setExclusive(true);
m.setName("A message");

sender.queueMessage(m);
Thread.sleep(5000);

assert(!gotMessage3);
assert(!gotMessage4);

int rec = 0;
if (gotMessage1) {
rec++;
}
if (gotMessage2) {
rec++;
}
assert (rec == 1):"rec is "+rec;

gotMessage1=false;
gotMessage2=false;

m = new Msg();
m.setExclusive(true);
m.setName("A message");
sender2.storeMessage(m);
Thread.sleep(500);
assert(!gotMessage1);
assert(!gotMessage2);

rec = 0;
if (gotMessage3) {
rec++;
}
if (gotMessage4) {
rec++;
}
assert (rec == 1):"rec is "+rec;
assert (m1.getNumberOfMessages() == 0);
m1.terminate();
m2.terminate();
m3.terminate();
}
@Test
public void exclusiveMessageTest() throws Exception {
morphium.dropCollection(Msg.class);
Expand Down

0 comments on commit 91528ff

Please sign in to comment.