Permalink
Browse files

DialogueHelper:

	- Service invocation is always performed async.
	- Possibility to register a callback for each service invocation.
  • Loading branch information...
esteban-aliverti committed Aug 9, 2012
1 parent d7bf55e commit 837b3af735eedfa405f2c3067b1ea071efe5699b
@@ -4,18 +4,23 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.namespace.QName;
import javax.xml.ws.BindingProvider;
-import org.drools.mas.*;
+import org.drools.mas.ACLMessage;
+import org.drools.mas.Act;
+import org.drools.mas.Encodings;
import org.drools.mas.body.acts.AbstractMessageBody;
import org.drools.mas.body.acts.Inform;
-import org.drools.mas.body.content.Action;
import org.drools.mas.body.acts.InformIf;
+import org.drools.mas.body.content.Action;
import org.drools.mas.util.ACLMessageFactory;
import org.drools.mas.util.MessageContentEncoder;
import org.drools.mas.util.MessageContentFactory;
@@ -24,15 +29,45 @@
public class DialogueHelper {
public static int WSDL_RETRIEVAL_TIMEOUT = 2000;
+ public static int EXECUTOR_SERVICE_THREAD_NUMBER = 5;
+
private int connectionTimeout = 0;
private int receiveTimeout = 0;
boolean multiReturnValue = false;
- private AbstractMessageBody returnBody;
private Encodings encode = Encodings.XML;
private URL endpointURL;
private QName qname;
+ private ExecutorService executorService = Executors.newFixedThreadPool(EXECUTOR_SERVICE_THREAD_NUMBER);
+
+ protected static interface DialogueHelperCommand{
+ void execute();
+ }
+
+ private DialogueHelperCallback defaultDialogueHelperCallback = new DialogueHelperCallbackImpl(){
+
+ @Override
+ public void onSuccess(List<ACLMessage> messages) {
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ Logger.getLogger(DialogueHelper.class.getName()).log(Level.SEVERE, "Agent invocation failed", t);
+ }
+
+
+ @Override
+ public long getTimeoutForResponses() {
+ return 0;
+ }
+
+ @Override
+ public long getMinimumWaitTimeForResponses() {
+ return 0;
+ }
+
+ };
public DialogueHelper(String url) {
this(url, 0);
@@ -79,89 +114,186 @@ private void checkEndpointAvailability(int wSDLRetrievalTimeout) {
}
}
+
+
+ public String invokeRequest(String sender, String receiver, String methodName, LinkedHashMap<String, Object> args, DialogueHelperCallback callback) throws UnsupportedOperationException {
+ return this.doRequest(sender, receiver, methodName, args, callback);
+ }
+
+ public String invokeRequest(String methodName, LinkedHashMap<String, Object> args, DialogueHelperCallback callback) throws UnsupportedOperationException {
+ return invokeRequest(UUID.randomUUID().toString(), "", methodName, args, callback);
+ }
+
+ /**
+ *
+ * @param sender
+ * @param methodName
+ * @param args
+ * @return
+ * @throws UnsupportedOperationException
+ * @deprecated Without using a DialogueHelperCallback there is no way to
+ * be notified about exceptions in the service invocation.
+ */
+ @Deprecated
public String invokeRequest(String sender, String methodName, LinkedHashMap<String, Object> args) throws UnsupportedOperationException {
return invokeRequest(sender, "", methodName, args);
}
+ /**
+ *
+ * @param methodName
+ * @param args
+ * @return
+ * @throws UnsupportedOperationException
+ * @deprecated Without using a DialogueHelperCallback there is no way to
+ * be notified about exceptions in the service invocation.
+ */
+ @Deprecated
public String invokeRequest(String methodName, LinkedHashMap<String, Object> args) throws UnsupportedOperationException {
return invokeRequest(UUID.randomUUID().toString(), "", methodName, args);
}
-
+
+ /**
+ *
+ * @param sender
+ * @param receiver
+ * @param methodName
+ * @param args
+ * @return
+ * @throws UnsupportedOperationException
+ * @deprecated Without using a DialogueHelperCallback there is no way to
+ * be notified about exceptions in the service invocation.
+ */
+ @Deprecated
public String invokeRequest(String sender, String receiver, String methodName, LinkedHashMap<String, Object> args) throws UnsupportedOperationException {
+ return this.doRequest(sender, receiver, methodName, args, null);
+ }
+
+ protected String doRequest(String sender, String receiver, String methodName, LinkedHashMap<String, Object> args, DialogueHelperCallback callback) throws UnsupportedOperationException {
multiReturnValue = false;
for (Object o : args.values()) {
if (o == Variable.v) {
multiReturnValue = true;
break;
}
}
- AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
-
ACLMessageFactory factory = new ACLMessageFactory(encode);
Action action = MessageContentFactory.newActionContent(methodName, args);
ACLMessage req = factory.newRequestMessage(sender, receiver, action);
- asyncServicePort.tell(req);
-// List<ACLMessage> answers = asyncServicePort.getResponses(req.getId());
-
-// if ( answers.size() == 0 ) { return req.getId(); }
-
-// ACLMessage answer = answers.get(0);
-// if ( ! Act.AGREE.equals( answer.getPerformative() ) ) {
-// throw new UnsupportedOperationException(" Request " + methodName + " was not agreed with args " + args );
-// }
-//
-// if ( ! multiReturnValue ) {
-// returnBody = answers.size() == 2 ? ( (Inform) answers.get( 1 ).getBody() ) : null;
-// } else {
-// returnBody = answers.size() == 2 ? ( (InformRef) answers.get( 1 ).getBody() ) : null;
-// }
+ this.tell(req, callback, callback != null);
+
return req.getId();
}
+ public String invokeQueryIf(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
+ return this.doQueryIf(sender, receiver, proposition, callback);
+ }
+
+ /**
+ *
+ * @param sender
+ * @param receiver
+ * @param proposition
+ * @return
+ * @deprecated Without using a DialogueHelperCallback there is no way to
+ * be notified about exceptions in the service invocation.
+ */
+ @Deprecated
public String invokeQueryIf(String sender, String receiver, Object proposition) {
- AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
+ return this.doQueryIf(sender, receiver, proposition, null);
+ }
+
+ protected String doQueryIf(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
ACLMessageFactory factory = new ACLMessageFactory(Encodings.XML);
-
ACLMessage qryif = factory.newQueryIfMessage(sender, receiver, proposition);
- asyncServicePort.tell(qryif);
- List<ACLMessage> answers = asyncServicePort.getResponses(qryif.getId());
-
- if (answers.size() == 0) {
- return qryif.getId();
- }
- returnBody = ((InformIf) answers.get(0).getBody());
+ this.tell(qryif, callback, callback != null);
+
return qryif.getId();
}
+ public String invokeInform(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
+ return this.doInform(sender, receiver, proposition, callback);
+ }
+
+ /**
+ *
+ * @param sender
+ * @param receiver
+ * @param proposition
+ * @return
+ * @deprecated Without using a DialogueHelperCallback there is no way to
+ * be notified about exceptions in the service invocation.
+ */
+ @Deprecated
public String invokeInform(String sender, String receiver, Object proposition) {
- AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
+ return this.doInform(sender, receiver, proposition, null);
+ }
+
+ protected String doInform(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
ACLMessageFactory factory = new ACLMessageFactory(encode);
ACLMessage newInformMessage = factory.newInformMessage(sender, receiver, proposition);
- asyncServicePort.tell(newInformMessage);
+ this.tell(newInformMessage, callback, callback != null);
+
return newInformMessage.getId();
}
+ public String invokeConfirm(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
+ return this.doConfirm(sender, receiver, proposition, callback);
+ }
+
+ /**
+ *
+ * @param sender
+ * @param receiver
+ * @param proposition
+ * @return
+ * @deprecated Without using a DialogueHelperCallback there is no way to
+ * be notified about exceptions in the service invocation.
+ */
+ @Deprecated
public String invokeConfirm(String sender, String receiver, Object proposition) {
- AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
+ return this.doConfirm(sender, receiver, proposition, null);
+ }
+
+ protected String doConfirm(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
ACLMessageFactory factory = new ACLMessageFactory(encode);
ACLMessage newConfirmMessage = factory.newConfirmMessage(sender, receiver, proposition);
- asyncServicePort.tell(newConfirmMessage);
+ this.tell(newConfirmMessage, callback, callback != null);
+
return newConfirmMessage.getId();
}
+ public String invokeDisconfirm(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
+ return this.doDisconfirm(sender, receiver, proposition, callback);
+ }
+
+ /**
+ *
+ * @param sender
+ * @param receiver
+ * @param proposition
+ * @return
+ * @deprecated Without using a DialogueHelperCallback there is no way to
+ * be notified about exceptions in the service invocation.
+ */
+ @Deprecated
public String invokeDisconfirm(String sender, String receiver, Object proposition) {
- AsyncDroolsAgentService asyncServicePort = this.getAsyncDroolsAgentService();
+ return this.doDisconfirm(sender, receiver, proposition, null);
+ }
+
+ protected String doDisconfirm(String sender, String receiver, Object proposition, DialogueHelperCallback callback) {
ACLMessageFactory factory = new ACLMessageFactory(encode);
ACLMessage newDisconfirmMessage = factory.newDisconfirmMessage(sender, receiver, proposition);
- asyncServicePort.tell(newDisconfirmMessage);
+ this.tell(newDisconfirmMessage, callback, callback != null);
+
return newDisconfirmMessage.getId();
}
@@ -243,11 +375,59 @@ private AsyncDroolsAgentService getAsyncDroolsAgentService(){
}
+ protected void tell(final ACLMessage message, DialogueHelperCallback callback, final boolean waitForAnswers){
+
+ final DialogueHelperCallback finalCallback = callback != null? callback : this.defaultDialogueHelperCallback;
+
+ Runnable runnable = new Runnable() {
+
+ private AsyncDroolsAgentService asyncDroolsAgentService;
+
+ public void run() {
+ try{
+ asyncDroolsAgentService = getAsyncDroolsAgentService();
+ asyncDroolsAgentService.tell(message);
+
+ if (waitForAnswers){
+ List<ACLMessage> results = this.waitForAnswers(message.getId(), finalCallback.getExpectedResponsesNumber(), finalCallback.getMinimumWaitTimeForResponses(), finalCallback.getTimeoutForResponses());
+ finalCallback.onSuccess(results);
+ }
+ }catch (Throwable t){
+ finalCallback.onError(t);
+ }
+ }
+
+ private List<ACLMessage> waitForAnswers( String id, int expectedMessagesNumber, long minimumWaitTime, long timeout) {
+
+
+ List<ACLMessage> answers = new ArrayList<ACLMessage>();
+ long waitTime = minimumWaitTime;
+ do {
+ try {
+ Logger.getLogger(DialogueHelper.class.getName()).log(Level.FINER, "Answer for {0} is not ready, wait... ", id);
+ Thread.sleep( waitTime );
+ } catch ( InterruptedException ex ) {
+ Logger.getLogger(DialogueHelper.class.getName()).log(Level.WARNING, "Thread could not be put to sleep", ex);
+ }
+ List<ACLMessage> incomingAnswers = asyncDroolsAgentService.getResponses(id);
+ answers.addAll( incomingAnswers );
+
+ waitTime *= 2;
+ } while ( answers.size() != expectedMessagesNumber && waitTime < timeout );
+ return answers;
+
+ }
+
+ };
+
+ this.executorService.submit(runnable);
+ }
+
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public void setReceiveTimeout(int receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
-}
+}
@@ -0,0 +1,25 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.drools.mas.helpers;
+
+import java.util.List;
+import org.drools.mas.ACLMessage;
+
+/**
+ *
+ * @author esteban
+ */
+public interface DialogueHelperCallback {
+
+ void onSuccess(List<ACLMessage> messages);
+
+ void onError(Throwable t);
+
+ int getExpectedResponsesNumber();
+
+ long getTimeoutForResponses();
+
+ long getMinimumWaitTimeForResponses();
+}
Oops, something went wrong.

0 comments on commit 837b3af

Please sign in to comment.