@@ -65,6 +65,8 @@ public class Async {
65
65
66
66
private final static Logger LOGGER = LoggerFactory .getLogger ("JavaLite Async" );
67
67
68
+ private final static int MIN_LARGE_MESSAGE_SIZE = 100 * 4096 ;
69
+
68
70
private static final String QUEUE_NAMESPACE = "/queue/" ;
69
71
private Injector injector ;
70
72
private final Configuration config ;
@@ -112,21 +114,24 @@ public Async(String dataDirectory, boolean useLibAio, Injector injector, QueueCo
112
114
configureAcceptor ();
113
115
configureConnectionFactory ();
114
116
configurePaging ();
115
-
116
117
configureQueues (queueConfigs );
117
-
118
- config .setJournalType (useLibAio ? JournalType .ASYNCIO : JournalType .NIO );
118
+ configureJournal (useLibAio );
119
119
config .setThreadPoolMaxSize (-1 );
120
120
config .setScheduledThreadPoolMaxSize (10 );
121
- jmsServer .setConfiguration (config );
122
- jmsServer .setJmsConfiguration (jmsConfig );
121
+
123
122
} catch (AsyncException e ) {
124
123
throw e ;
125
124
} catch (Exception e ) {
126
125
throw new AsyncException ("Failed to start EmbeddedJMS" , e );
127
126
}
128
127
}
129
128
129
+ private void configureJournal (boolean useLibAio ){
130
+ config .setJournalType (useLibAio ? JournalType .ASYNCIO : JournalType .NIO );
131
+ config .setJournalBufferSize_AIO (2 * MIN_LARGE_MESSAGE_SIZE );
132
+ config .setJournalBufferSize_NIO (2 * MIN_LARGE_MESSAGE_SIZE );
133
+ }
134
+
130
135
private void configureLocations (String dataDirectory ) {
131
136
if (dataDirectory == null || !new File (dataDirectory ).exists ()) {
132
137
throw new AsyncException ("Must provide data directory that exists" );
@@ -156,9 +161,11 @@ In my case, below configuration (made on JBoss 7.1.1.Final) has helped:
156
161
<connection-ttl>-1</connection-ttl>
157
162
<reconnect-attempts>-1</reconnect-attempts>
158
163
*/
164
+
159
165
cfConfig .setClientFailureCheckPeriod (Long .MAX_VALUE );
160
166
cfConfig .setConnectionTTL (-1 );
161
167
cfConfig .setReconnectAttempts (-1 );
168
+ cfConfig .setCompressLargeMessages (true );
162
169
jmsConfig .getConnectionFactoryConfigurations ().add (cfConfig );
163
170
}
164
171
@@ -212,7 +219,6 @@ private void configureListeners(Injector injector, List<QueueConfig> queueConfig
212
219
213
220
///******* PUBLIC METHODS BELOW ***********///
214
221
215
-
216
222
/**
217
223
* Sends a command into a queue for processing
218
224
*
@@ -247,7 +253,7 @@ public void send(String queueName, Command command, int deliveryMode) {
247
253
public void send (String queueName , Command command , int deliveryMode , int priority , int timeToLive ) {
248
254
checkStarted ();
249
255
250
- try {
256
+ try ( Session session = producerConnection . createSession ()) {
251
257
checkInRange (deliveryMode , 1 , 2 , "delivery mode" );
252
258
checkInRange (priority , 0 , 9 , "priority" );
253
259
if (timeToLive < 0 )
@@ -257,7 +263,6 @@ public void send(String queueName, Command command, int deliveryMode, int priori
257
263
if (queue == null )
258
264
throw new AsyncException ("Failed to find queue: " + queueName );
259
265
260
- Session session = producerConnection .createSession ();
261
266
TextMessage msg = session .createTextMessage (command .toXml ());
262
267
MessageProducer p = session .createProducer (queue );
263
268
p .send (msg , deliveryMode , priority , timeToLive );
@@ -268,13 +273,15 @@ public void send(String queueName, Command command, int deliveryMode, int priori
268
273
}
269
274
}
270
275
271
-
272
276
/**
273
277
* Starts the server.
274
278
*/
275
279
public void start (){
276
280
277
281
try {
282
+ jmsServer .setConfiguration (config );
283
+ jmsServer .setJmsConfiguration (jmsConfig );
284
+
278
285
jmsServer .start ();
279
286
280
287
ConnectionFactory connectionFactory = (ConnectionFactory ) jmsServer .lookup ("/cf" );
0 commit comments