/
RequestMessageBean.java
320 lines (290 loc) · 14.1 KB
/
RequestMessageBean.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package org.kie.services.remote.jms;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.bind.JAXBException;
import org.jboss.resteasy.spi.UnauthorizedException;
import org.jbpm.services.task.commands.TaskCommand;
import org.jbpm.services.task.exception.PermissionDeniedException;
import org.kie.api.command.Command;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.manager.Context;
import org.kie.api.runtime.manager.RuntimeEngine;
import org.kie.api.runtime.manager.RuntimeManager;
import org.kie.api.task.TaskService;
import org.kie.internal.runtime.manager.context.EmptyContext;
import org.kie.internal.runtime.manager.context.ProcessInstanceIdContext;
import org.kie.internal.task.api.InternalTaskService;
import org.kie.services.client.api.command.AcceptedCommands;
import org.kie.services.client.serialization.jaxb.JaxbSerializationProvider;
import org.kie.services.client.serialization.jaxb.impl.JaxbCommandsRequest;
import org.kie.services.client.serialization.jaxb.impl.JaxbCommandsResponse;
import org.kie.services.client.serialization.jaxb.impl.JaxbExceptionResponse;
import org.kie.services.remote.cdi.RuntimeManagerManager;
import org.kie.services.remote.exception.DomainNotFoundBadRequestException;
import org.kie.services.remote.exception.KieRemoteServicesInternalError;
import org.kie.services.remote.rest.RestProcessRequestBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is the link between incoming request (whether via REST or JMS or .. whatever)
* and the bean that processes the requests, the {@link RestProcessRequestBean}.
* </p>
* Responses to requests are <b>not</b> placed on the reply-to queue, but on the answer queue.
* </p> Because there are multiple queues to which an instance of this class could listen to, the (JMS queue) configuration is
* done in the ejb-jar.xml file, which allows us to configure instances of one class to listen to more than one queue.
*/
public class RequestMessageBean implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(RequestMessageBean.class);
@Resource(mappedName = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
private String RESPONSE_QUEUE_NAME = null;
private static String RESPONSE_QUEUE_NAME_PROPERTY = "kie.services.jms.queues.response";
@Inject
private RuntimeManagerManager runtimeMgrMgr;
@Inject
private TaskService taskService;
@PostConstruct
public void init() {
RESPONSE_QUEUE_NAME = System.getProperty(RESPONSE_QUEUE_NAME_PROPERTY, "queue/KIE.RESPONSE.ALL");
}
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public void onMessage(Message message) {
boolean failure = false;
String msgCorrId = null;
try {
msgCorrId = message.getJMSCorrelationID();
} catch (JMSException jmse) {
logger.warn("Unable to retrieve JMS correlation id from message! This id is needed to be able to match a request to a response message.",
jmse);
}
if( msgCorrId == null ) {
logger.warn("JMS correlation id is empty! This id is needed to be able to match a request to a response message.");
}
// 1. get request
int[] serializationTypeHolder = new int[1];
JaxbCommandsRequest cmdsRequest = deserializeRequest(message, msgCorrId, serializationTypeHolder);
// 2. process request
JaxbCommandsResponse jaxbResponse;
if (cmdsRequest != null) {
jaxbResponse = processJaxbCommandsRequest(cmdsRequest);
} else {
// Failure reasons have been logged in deserializeRequest().
logger.error("Stopping processing of request message due to errors: see above.");
return;
}
// 3. create session
Connection connection = null;
Session session = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException jmse) {
logger.error("Unable to open new session to send response message to message " + msgCorrId, jmse);
failure = true;
} finally {
if (failure) {
if (connection != null) {
try {
connection.close();
connection = null;
session.close();
session = null;
} catch (JMSException jmse) {
logger.warn("Unable to close connection or session after failing to create connection or session.", jmse);
}
}
// Unable to create connection/session, so no need to try send the message either
return;
}
}
// 4. create response message
Message msg = serializeResponse(session, msgCorrId, serializationTypeHolder[0], jaxbResponse);
try {
msg.setJMSCorrelationID(msgCorrId);
} catch (JMSException jmse) {
logger.warn( "Unable to set correlation id of response to msg id " + msgCorrId, jmse );
}
// 5. send response message
if (!failure) {
try {
InitialContext context = new InitialContext();
Queue responseQueue = (Queue) context.lookup(RESPONSE_QUEUE_NAME);
MessageProducer producer = session.createProducer(responseQueue);
producer.send(msg);
} catch (NamingException ne) {
logger.error("Unable to lookup response queue (" + RESPONSE_QUEUE_NAME + ") to send msg " + msgCorrId
+ " (Is " + RESPONSE_QUEUE_NAME_PROPERTY + " incorrect?).",
ne );
} catch (JMSException jmse) {
logger.error("Unable to send msg " + msgCorrId + " to " + RESPONSE_QUEUE_NAME, jmse );
} finally {
if (connection != null) {
try {
connection.close();
connection = null;
session.close();
session = null;
} catch (JMSException jmse) {
logger.error("Unable to close connection or session.", jmse);
}
}
}
}
}
private JaxbCommandsRequest deserializeRequest(Message message, String msgId, int[] serializationTypeHolder) {
JaxbCommandsRequest cmdMsg = null;
try {
serializationTypeHolder[0] = message.getIntProperty("serialization");
if (serializationTypeHolder[0] == 1) {
String msgStrContent = ((BytesMessage) message).readUTF();
cmdMsg = (JaxbCommandsRequest) JaxbSerializationProvider.convertStringToJaxbObject(msgStrContent);
} else {
throw new KieRemoteServicesInternalError("Unknown serialization type when deserializing message " + msgId + ":" + serializationTypeHolder[0]);
}
} catch (JMSException jmse) {
logger.error("Unable to read information from message " + msgId + ".", jmse);
} catch( JAXBException jaxbe) {
throw new KieRemoteServicesInternalError("Unable to convert String to " + JaxbCommandsRequest.class.getSimpleName() + " [msg id: " + msgId + "].", jaxbe);
}
return cmdMsg;
}
private Message serializeResponse(Session session, String msgId, int serializationType, JaxbCommandsResponse jaxbResponse) {
BytesMessage byteMsg = null;
try {
byteMsg = session.createBytesMessage();
byteMsg.setIntProperty("serialization", serializationType);
if (serializationType == 1) {
String xmlStr = JaxbSerializationProvider.convertJaxbObjectToString(jaxbResponse);
byteMsg.writeUTF(xmlStr);
} else {
throw new KieRemoteServicesInternalError("Unknown serialization type when deserializing message " + msgId + ":" + serializationType);
}
} catch (JMSException jmse) {
logger.error("Unable to create response message or write to it [msg id: " + msgId + "].", jmse);
} catch( JAXBException jaxbe) {
throw new KieRemoteServicesInternalError("Unable to serialize " + jaxbResponse.getClass().getSimpleName() + " to a String.", jaxbe);
}
return byteMsg;
}
public JaxbCommandsResponse processJaxbCommandsRequest(JaxbCommandsRequest request) {
// If exceptions are happening here, then there is something REALLY wrong and they should be thrown.
JaxbCommandsResponse jaxbResponse = new JaxbCommandsResponse(request);
List<Command<?>> commands = request.getCommands();
if (commands != null) {
for (int i = 0; i < commands.size(); ++i) {
Command<?> cmd = commands.get(i);
if( ! AcceptedCommands.getSet().contains(cmd.getClass())) {
UnsupportedOperationException uoe = new UnsupportedOperationException(cmd.getClass().getName()
+ " is not a supported command.");
jaxbResponse.addException(uoe, i, cmd);
continue;
}
Object cmdResult = null;
if (cmd instanceof TaskCommand<?>) {
cmdResult = internalDoTaskOperation(cmd, jaxbResponse, i);
} else {
cmdResult = internalDoKieSessionOperation( cmd, request, jaxbResponse, i);
}
if (cmdResult != null) {
try {
// addResult could possibly throw an exception, which is why it's here and not above
jaxbResponse.addResult(cmdResult, i, cmd);
} catch (Exception e) {
logger.error("Unable to add result from " + cmd.getClass().getSimpleName() + "/" + i + " because of "
+ e.getClass().getSimpleName(), e);
jaxbResponse.addException(e, i, cmd);
}
}
}
}
if (commands == null || commands.isEmpty()) {
logger.info("Commands request object with no commands sent!");
}
return jaxbResponse;
}
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public Object internalDoTaskOperation(Command<?> cmd, JaxbCommandsResponse jaxbResponse, int i) {
Object cmdResult;
try {
cmdResult = doTaskOperation(cmd);
} catch( UnauthorizedException ue ) {
Throwable cause = ue.getCause();
if( cause instanceof PermissionDeniedException ) {
PermissionDeniedException pde = (PermissionDeniedException) cause;
logger.warn(pde.getMessage());
jaxbResponse.addException(pde, i, cmd);
return null;
}
throw ue;
}
return cmdResult;
}
@TransactionAttribute(TransactionAttributeType.NEVER)
public Object internalDoKieSessionOperation(Command<?> cmd, JaxbCommandsRequest request, JaxbCommandsResponse jaxbResponse, int i) {
Object cmdResult;
try {
cmdResult = doKieSessionOperation(cmd, request.getDeploymentId(), request.getProcessInstanceId());
} catch( DomainNotFoundBadRequestException dnfbre ) {
logger.warn( dnfbre.getMessage() );
jaxbResponse.addException(dnfbre, i, cmd);
return null;
}
return cmdResult;
}
private Object doKieSessionOperation(Command<?> cmd, String deploymentId, Long processInstanceId) {
Object result = null;
try {
KieSession kieSession = getRuntimeEngine(deploymentId, processInstanceId).getKieSession();
result = kieSession.execute(cmd);
} catch( Exception e ) {
JaxbExceptionResponse exceptResp = new JaxbExceptionResponse(e, cmd);
logger.warn( "Unable to execute " + exceptResp.getCommandName() + " because of " + e.getClass().getSimpleName() + ": " + e.getMessage());
logger.trace("Stack trace: \n", e);
result = exceptResp;
}
return result;
}
private Object doTaskOperation(Command<?> cmd) {
Object result = null;
try {
result = ((InternalTaskService) taskService).execute(cmd);
} catch( PermissionDeniedException pde ) {
throw new UnauthorizedException(pde.getMessage(), pde);
} catch( Exception e ) {
JaxbExceptionResponse exceptResp = new JaxbExceptionResponse(e, cmd);
logger.warn( "Unable to execute " + exceptResp.getCommandName() + " because of " + e.getClass().getSimpleName() + ": " + e.getMessage());
logger.trace("Stack trace: \n", e);
result = exceptResp;
}
return result;
}
protected RuntimeEngine getRuntimeEngine(String domainName, Long processInstanceId) {
RuntimeManager runtimeManager = runtimeMgrMgr.getRuntimeManager(domainName);
Context<?> runtimeContext;
if (processInstanceId != null) {
runtimeContext = new ProcessInstanceIdContext(processInstanceId);
} else {
runtimeContext = EmptyContext.get();
}
if( runtimeManager == null ) {
throw new DomainNotFoundBadRequestException("No runtime manager could be found for domain '" + domainName + "'.");
}
return runtimeManager.getRuntimeEngine(runtimeContext);
}
}