Permalink
Browse files

[TORQUE-470] Synchronous Message Processors

This makes it possible to receive a value after processing a message using the message processor.
  • Loading branch information...
1 parent 95f46be commit 36d59d451f3a6f754dc01b35fd7f0c18038e926f @goldmann goldmann committed Feb 8, 2013
View
@@ -28,3 +28,4 @@ Infinispan-FileCacheStore
rubyObj.*
schema.rb
*.cache
+build/assembly/d*
@@ -1688,6 +1688,78 @@ end</programlisting>
</example>
</para>
</section>
+
+ <section id="synchronous-message-processors">
+ <title>Synchronous Message Processors</title>
+
+ <para>
+ Message processors by default are asynchronous, but sometimes you
+ would like to receive some value after the message was processed.
+ in TorqueBox you can mark a selected message processor as
+ synchronous which would send the return value of the
+ <methodname>on_message(body)</methodname> method back to the sender.
+ </para>
+
+ <example>
+ <title>Synchronous message processor</title>
+
+ <para>
+ Consider the following simple message processor:
+
+ <programlisting>require 'torquebox-messaging'
+
+class SynchronousProcessor &lt; TorqueBox::Messaging::MessageProcessor
+ def on_message(body)
+ "Got #{body} but I want bacon!"
+ end
+end</programlisting>
+
+ In this case every received message (we assume a text message
+ in this example) will be transformed and returned back.
+
+ </para>
+ </example>
+
+ <para>
+ You can use the <methodname>publish_and_receive</methodname> method available
+ in the <classname>TorqueBox::Messaging::Queue</classname> objects to send messages
+ to synchronous message processors and receive the return values.
+ </para>
+
+ <example>
+ <title>Message sender</title>
+
+ <para><programlisting>queue = TorqueBox::Messaging::Queue.new('/queues/samplequeue')
+queue.publish_and_receive("something") # => "Got something but I want bacon!"</programlisting>
+
+ As you can see, the message sender receives the return value from
+ the processor.
+
+ </para>
+ </example>
+
+ <para>
+ To make a message processor work synchronously, you need to set the
+ <literal>synchronous</literal> parameter.
+ </para>
+
+ <example>
+ <title>Deployment descriptor configuration</title>
+ <para>Using the DSL:<programlisting>TorqueBox.configure do
+ queue "/queues/samplequeue" do
+ processor SynchronousProcessor do
+ synchronous true
+ end
+ end
+end</programlisting></para>
+
+ <para>Using YAML:<programlisting>messaging:
+ /queues/samplequeue:
+ SynchronousProcessor:
+ synchronous: true</programlisting></para>
+
+ </example>
+ </section>
</section>
<section id="backgroundable">
@@ -80,6 +80,7 @@ def self.load_configuration(file)
:concurrency,
{ :singleton => [true, false] },
{ :durable => [true, false] },
+ { :synchronous => [true, false] },
{ :xa => [true, false] },
:client_id,
:config,
@@ -25,9 +25,18 @@ class MessageProcessor
attr_accessor :message
def initialize
- @message = nil
+ @message = nil
+ @proxy = nil
end
-
+
+ def initialize_proxy(group)
+ @proxy = MessageProcessorProxy.new(group)
+ end
+
+ def method_missing(method, *args, &block)
+ @proxy.send( method, *args, &block )
+ end
+
def on_message(body)
throw "Your subclass must implement on_message(body)"
end
@@ -39,12 +48,17 @@ def on_error(error)
def process!(message)
@message = message
begin
- on_message( message.decode )
+ value = on_message(message.decode)
+ reply(value) if synchronous?
rescue Exception => e
on_error( e )
end
end
+ def reply(value)
+ TorqueBox::Messaging::Queue.new(@message.jms_message.jms_destination.queue_name).publish(value, :correlation_id => @message.jms_message.jms_message_id)
+ end
+
class << self
# List all available message processors for current application.
@@ -159,6 +173,14 @@ def durable?
@group.durable
end
+ # Returns true if the message processor is synchronous,
+ # false otherwise
+ #
+ # @return Boolean
+ def synchronous?
+ @group.synchronous
+ end
+
def to_s
"[MessageProcessorProxy: #{name}]"
end
@@ -39,12 +39,14 @@ def on_message(body)
end
it "should process text messages" do
+ @processor.should_receive(:synchronous?).and_return(false)
@message = TorqueBox::Messaging::Message.new(@jms_session, "foo", :marshal_base64)
@processor.process! @message
@processor.body.should eql("foo")
end
it "should process non-text messages" do
+ @processor.should_receive(:synchronous?).and_return(false)
payload = {:foo => "foo", :sym => :sym, "bar" => :bar}
@message = TorqueBox::Messaging::Message.new(@jms_session, payload, :marshal_base64)
@processor.process! @message
@@ -0,0 +1,7 @@
+require 'torquebox-messaging'
+
+class SynchronousProcessor < TorqueBox::Messaging::MessageProcessor
+ def on_message(body)
+ "Got #{body} but I want bacon!"
+ end
+end
@@ -10,8 +10,19 @@ queues:
durable: false
/queue/echo_backchannel:
durable: false
+ /queue/synchronous:
+ durable: false
+ /queue/synchronous_with_selectors:
+ durable: false
messaging:
+ /queue/synchronous:
+ SynchronousProcessor:
+ synchronous: true
+ /queue/synchronous_with_selectors:
+ SynchronousProcessor:
+ synchronous: true
+ selector: "awesomeness IS NOT NULL AND awesomeness > 10"
/queue/simple_queue: SimpleProcessor
/queue/stateless_queue: StatelessProcessor
/queue/echo_queue:
@@ -515,18 +515,20 @@ def ensure_no_xa_error
version: #{RUBY_VERSION[0,3]}
END
- describe "message processors management" do
+ context "message processors management" do
describe "list" do
it "should list all available message processors" do
processors = TorqueBox::Messaging::MessageProcessor.list
- processors.size.should == 4
+ processors.size.should == 6
processors.map { |p| p.name }.sort.should == [
"/queue/simple_queue.SimpleProcessor",
"/queue/stateless_queue.StatelessProcessor",
"/queues/torquebox/messaging_processor_tests/tasks/torquebox_backgroundable.TorqueBox::Messaging::BackgroundableProcessor",
- "/queue/echo_queue.Torquebox::Messaging::EchoProcessor"
+ "/queue/echo_queue.Torquebox::Messaging::EchoProcessor",
+ "/queue/synchronous.SynchronousProcessor",
+ "/queue/synchronous_with_selectors.SynchronousProcessor"
].sort
end
end
@@ -579,6 +581,25 @@ def ensure_no_xa_error
end
end
end
+
+ context "synchronous message processors" do
+ before(:each) do
+ @queue = TorqueBox::Messaging::Queue.new('/queue/synchronous')
+ @queue_with_selectors = TorqueBox::Messaging::Queue.new('/queue/synchronous_with_selectors')
+ end
+
+ it "should reply to the message" do
+ @queue.publish_and_receive("something", :timeout => 2000).should eql("Got something but I want bacon!")
+ end
+
+ it "should reply to the message when a selector is provided" do
+ @queue_with_selectors.publish_and_receive("bike", :timeout => 2000, :properties => {"awesomeness" => 20}).should eql("Got bike but I want bacon!")
+ end
+
+ it "should timeout since the selector is not satisfied" do
+ @queue_with_selectors.publish_and_receive("food", :timeout => 500, :properties => {"awesomeness" => 5}).should be_nil
+ end
+ end
end
def with_queue(name)
@@ -52,6 +52,7 @@
-concurrency: integer
-selector: string
-durable: boolean
+ -synchronous: boolean
-config:
type: map
arbitrary: true
@@ -68,6 +69,7 @@
-concurrency: integer
-selector: string
-durable: boolean
+ -synchronous: boolean
-config:
arbitrary: true
-client_id: string
@@ -42,9 +42,10 @@ public void onMessage(Message message) {
if (getConsumer() == null) {
return; // racist!
}
+
MessageProcessorComponent component = (MessageProcessorComponent) group.getComponentResolver().resolve( this.currentRuby );
try {
- component.process( message, getSession() );
+ component.process( message, getSession(), group );
if (isXAEnabled()) {
commitXATransaction();
} else {
@@ -81,7 +81,7 @@ public void setRubyConfig(Map<String, Object> rubyConfig) {
}
public void setConcurrency(Integer concurrency) {
- if (concurrency != null && concurrency > 0)
+ if (concurrency != null && concurrency >= 0)
this.concurrency = concurrency;
}
@@ -97,7 +97,16 @@ public void setDurable(Boolean durable) {
public Boolean isDurable() {
return this.durable;
}
-
+
+ public boolean isSynchronous() {
+ return synchronous;
+ }
+
+ public void setSynchronous(Boolean synchronous) {
+ if (synchronous != null)
+ this.synchronous = synchronous;
+ }
+
public String getClientID() {
return clientID;
}
@@ -127,6 +136,7 @@ public boolean isXAEnabled() {
private String destinationName;
private String messageSelector;
private int concurrency = 1;
+ private boolean synchronous = false;
private boolean durable = false; //only has meaning for Topic processors
private String clientID; //only has meaning for Topic processors
private boolean singleton = false;
@@ -24,21 +24,18 @@
import org.jruby.RubyModule;
import org.torquebox.core.component.AbstractRubyComponent;
+import org.torquebox.messaging.MessageProcessorGroup;
public class MessageProcessorComponent extends AbstractRubyComponent {
public MessageProcessorComponent() {
}
-
- public void process(Message message) {
- process( message, (Session) null );
- }
-
- public void process(Message message, Session session) {
+ public void process(Message message, Session session, MessageProcessorGroup group) {
RubyModule messageWrapperClass = getClass( "TorqueBox::Messaging::Message" );
Object wrappedMessage = _callRubyMethod( messageWrapperClass, "new", message );
+ _callRubyMethodIfDefined("initialize_proxy", group);
_callRubyMethod( findMiddleware(), "invoke", session, wrappedMessage, getRubyComponent() );
}
@@ -86,6 +86,7 @@ protected void deploy(DeploymentPhaseContext phaseContext, final MessageProcesso
service.setMessageSelector( metaData.getMessageSelector() );
service.setName( metaData.getName() );
service.setXAEnabled( metaData.isXAEnabled() );
+ service.setSynchronous( metaData.isSynchronous() );
ServiceBuilder<BaseMessageProcessorGroup> builder = phaseContext.getServiceTarget().addService( baseServiceName, service )
.addDependency( MessagingServices.messageProcessorComponentResolver( unit, name ), ComponentResolver.class, service.getComponentResolverInjector() )
@@ -120,10 +120,24 @@ public static MessageProcessorMetaData subscribe(String handler, String destinat
options = Collections.EMPTY_MAP;
MessageProcessorMetaData result = new MessageProcessorMetaData();
result.setDurable( (Boolean) options.get( "durable" ) );
- result.setClientID( (String) options.get( "client_id" ) );
- result.setDestinationName( destination );
- result.setMessageSelector( (String) options.get( "selector" ) );
- if (options.containsKey( "singleton" )) {
+ result.setSynchronous((Boolean) options.get("synchronous"));
+ result.setClientID((String) options.get("client_id"));
+ result.setDestinationName(destination);
+ result.setMessageSelector((String) options.get("selector"));
+
+ // In case this is a synchronous message processor
+ // we do not want to receive own replies
+ if (result.isSynchronous()) {
+ if (result.getMessageSelector() == null) {
+ // Message selector was not provided
+ result.setMessageSelector( "JMSCorrelationID IS NULL" );
+ } else {
+ // Message selector was provided, we need to use it
+ result.setMessageSelector( "(JMSCorrelationID IS NULL) AND (" + result.getMessageSelector() + ")" );
+ }
+ }
+
+ if (options.containsKey("singleton")) {
result.setSingleton( (Boolean) options.get( "singleton" ) );
}
if (options.containsKey( "xa" )) {
@@ -133,6 +147,7 @@ public static MessageProcessorMetaData subscribe(String handler, String destinat
result.setRubyRequirePath( StringUtils.underscore( handler ) );
result.setRubyConfig( (Map) options.get( "config" ) );
result.setConcurrency( (Integer) options.get( "concurrency" ) );
+
return result;
}
@@ -75,7 +75,7 @@ protected void deploy(DeploymentPhaseContext phaseContext, DeploymentUnit unit,
processorMetaData.setDestinationName( queueName );
processorMetaData.setRubyClassName( task.getRubyClassName(), task.getLocation() );
processorMetaData.setConcurrency( task.getConcurrency() );
- processorMetaData.setMessageSelector( "JMSCorrelationID IS NULL" );
+ processorMetaData.setMessageSelector( "JMSCorrelationID IS NULL" );
processorMetaData.setXAEnabled( task.isXAEnabled() );
unit.addToAttachmentList( MessageProcessorMetaData.ATTACHMENTS_KEY, processorMetaData );
}
Oops, something went wrong. Retry.

0 comments on commit 36d59d4

Please sign in to comment.