-
Notifications
You must be signed in to change notification settings - Fork 20
/
SendAndReceiveTest.java
125 lines (105 loc) · 3.75 KB
/
SendAndReceiveTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package org.apache.activemq.store.cassandra;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.HashSet;
import java.util.Set;
/**
*
*/
public class SendAndReceiveTest extends EmbeddedServicesTest {
Logger log = LoggerFactory.getLogger(SendAndReceiveTest.class);
public static final String BROKER_URI = "tcp://localhost:61616";
BrokerService broker;
CassandraPersistenceAdapter adapter;
ActiveMQConnectionFactory factory;
ActiveMQQueue queue;
Connection conn;
Session session;
int messages = 250;
private boolean useCassandra = true;
@Test
public void testWithNoRestart() throws Exception {
doSendAndReceive(false);
}
@Test
public void testWithRestart() throws Exception {
doSendAndReceive(true);
}
public void doSendAndReceive(boolean restart) throws Exception {
Set<Message> sentMsgs = new HashSet<Message>();
MessageProducer producer = session.createProducer(queue);
int sent = 0;
for (; sent <= messages; sent++) {
String text = getMessageBody(sent);
log.debug("Sending: {}", text);
Message m = session.createTextMessage(text);
producer.send(m);
sentMsgs.add(m);
}
producer.close();
if (restart) {
stopBrokerService();
createBrokerService();
}
MessageConsumer consumer = session.createConsumer(queue);
Message m = null;
int received = 0;
int nulls = 0;
Set<Message> recMsg = new HashSet<Message>();
while (recMsg.size() < sentMsgs.size() && nulls < 3) {
m = consumer.receive(10000);
if (m != null) {
String text = ((TextMessage) m).getText();
log.debug("Received:{}", text);
Assert.assertEquals(getMessageBody(received), text);
log.debug("recievedCount:{}", ++received);
recMsg.add(m);
} else {
log.error("received null");
nulls++;
}
}
Assert.assertEquals(null, consumer.receive(2000));
consumer.close();
Assert.assertEquals(sent, received);
}
private String getMessageBody(int i) {
return "TEST:" + i;
}
@After()
public void stopBrokerService() throws Exception {
session.close();
conn.close();
broker.stop();
}
@Before
public void createBrokerService() throws Exception {
queue = new ActiveMQQueue("test.queue.1");
broker = new BrokerService();
broker.addConnector(BROKER_URI);
broker.setPersistent(true);
broker.setDataDirectory("target");
if (useCassandra) {
adapter = new CassandraPersistenceAdapter();
CassandraClient cassandraClient = new CassandraClient();
cassandraClient.setCassandraHost("localhost");
cassandraClient.setCassandraPort(getCassandraPort());
ZooKeeperMasterElector elector = new ZooKeeperMasterElector();
elector.setZookeeperConnectString(getZookeeperConnectString());
adapter.setCassandraClient(cassandraClient);
adapter.setMasterElector(elector);
broker.setPersistenceAdapter(adapter);
}
broker.start();
factory = new ActiveMQConnectionFactory(BROKER_URI);
conn = factory.createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
}