Permalink
Browse files

Ability to use Beanstalkd as a message queue

Added switch `consumer.awaitJob` to specify either the component should wait for Exchange completion (following Beanstalkd nature) or delete the job immediately
  • Loading branch information...
1 parent df45fc9 commit abcb3ed36421a68195172ee365134d1fc3d0ba49 @alaz alaz committed Nov 5, 2012
View
@@ -66,7 +66,9 @@ Producer behaviour is affected by the @command@ parameter which tells what to do
h3. Consumer parameters
-By default the consumer calls @delete@ on successful job completion and calls @bury@ on failure. You can choose which command to execute in the case of failure by specifying @consumer.onFailure@ parameter in the URI. It can take values of @bury@, @delete@ or @release@.
+The consumer may delete the job immediately after reserving it or wait until Camel routes process it. While the first scenario is more like a "message queue", the second is similar to "job queue". This behavior is controlled by @consumer.awaitJob@ parameter, which equals @true@ by default (following Beanstalkd nature).
+
+When synchronous, the consumer calls @delete@ on successful job completion and calls @bury@ on failure. You can choose which command to execute in the case of failure by specifying @consumer.onFailure@ parameter in the URI. It can take values of @bury@, @delete@ or @release@.
There is a boolean parameter @consumer.useBlockIO@ which corresponds to the same parameter in JavaBeanstalkClient library. By default it is @true@.
View
@@ -46,7 +46,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <camel.version>2.10.1</camel.version>
+ <camel.version>2.10.2</camel.version>
<jdk.version>1.6</jdk.version>
<java.src.version>1.5</java.src.version>
</properties>
@@ -71,7 +71,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
- <version>1.7.1</version>
+ <version>1.7.2</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -83,7 +83,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
- <version>1.9.0</version>
+ <version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
@@ -32,6 +32,7 @@
* <li><code>jobDelay</code></li>
* <li><code>jobTimeToRun</code></li>
* <li><code>consumer.onFailure</code></li>
+ * <li><code>consumer.awaitJob</code></li>
* </ul>
*
* @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
@@ -34,15 +34,18 @@
/**
* PollingConsumer to read Beanstalk jobs.
*
+ * The consumer may delete the job immediately or based on successful {@link Exchange}
+ * completion. The behavior is configurable by <code>consumer.awaitJob</code>
+ * flag (by default <code>true</code>)
+ *
* This consumer will add a {@link Synchronization} object to every {@link Exchange}
* object it creates in order to react on successful exchange completion or failure.
*
* In the case of successful completion, Beanstalk's <code>delete</code> method is
* called upon the job. In the case of failure the default reaction is to call
* <code>bury</code>.
*
- * The only configuration this consumer may have is the reaction on failures: possible
- * variants are "bury", "release" or "delete"
+ * The reaction on failures is configurable: possible variants are "bury", "release" or "delete"
*
* @author <a href="mailto:azarov@osinka.com">Alexander Azarov</a>
*/
@@ -51,6 +54,7 @@
String onFailure = BeanstalkComponent.COMMAND_BURY;
boolean useBlockIO = true;
+ boolean deleteImmediately = false;
private Client client = null;
private ExecutorService executor = null;
@@ -101,7 +105,10 @@ public Exchange call() throws Exception {
}
}
- exchange.addOnCompletion(sync);
+ if (deleteImmediately)
+ client.delete(job.getJobId());
+ else
+ exchange.addOnCompletion(sync);
return exchange;
} catch (BeanstalkException e) {
@@ -147,6 +154,14 @@ public void setUseBlockIO(boolean useBlockIO) {
this.useBlockIO = useBlockIO;
}
+ public boolean getAwaitJob() {
+ return !deleteImmediately;
+ }
+
+ public void setAwaitJob(boolean awaitingCompletion) {
+ this.deleteImmediately = !awaitingCompletion;
+ }
+
@Override
public BeanstalkEndpoint getEndpoint() {
return (BeanstalkEndpoint) super.getEndpoint();
@@ -23,7 +23,7 @@
import org.junit.Test;
import static org.mockito.Mockito.*;
-public class ConsumerTest extends BeanstalkMockTestSupport {
+public class AwaitingConsumerTest extends BeanstalkMockTestSupport {
final String testMessage = "hello, world";
@EndpointInject(uri = "beanstalk:tube")
@@ -49,6 +49,7 @@ public void testReceive() throws Exception {
result.assertIsSatisfied(100);
verify(client, atLeast(1)).reserve(0);
+ verify(client, atLeast(1)).delete(jobId);
}
@Test
@@ -0,0 +1,117 @@
+/**
+ * Copyright (C) 2010 Osinka <http://osinka.ru>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.osinka.camel.beanstalk;
+
+import com.surftools.BeanstalkClient.BeanstalkException;
+import com.surftools.BeanstalkClient.Job;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+public class ConsumerCompletionTest extends BeanstalkMockTestSupport {
+ final String testMessage = "hello, world";
+
+ boolean shouldIdie = false;
+ final Processor processor = new Processor() {
+ @Override
+ public void process(Exchange exchange) throws InterruptedException {
+ if (shouldIdie) throw new InterruptedException("die");
+ }
+ };
+
+ @Test
+ public void testDeleteOnComplete() throws Exception {
+ final long jobId = 111;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final Job jobMock = mock(Job.class);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenReturn(jobMock)
+ .thenReturn(null);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMinimumMessageCount(1);
+ result.expectedBodiesReceived(testMessage);
+ result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+ result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+ result.assertIsSatisfied(1000);
+
+ verify(client, atLeastOnce()).reserve(anyInt());
+ verify(client).delete(jobId);
+ }
+
+ @Test
+ public void testReleaseOnFailure() throws Exception {
+ shouldIdie = true;
+ final long jobId = 111;
+ final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
+ final int delay = BeanstalkComponent.DEFAULT_DELAY;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+ final Job jobMock = mock(Job.class);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenReturn(jobMock)
+ .thenReturn(null);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMinimumMessageCount(1);
+ result.assertIsNotSatisfied(1000);
+
+ verify(client, atLeastOnce()).reserve(anyInt());
+ verify(client).release(jobId, priority, delay);
+ }
+
+ @Test
+ public void testBeanstalkException() throws Exception {
+ shouldIdie = false;
+ final Job jobMock = mock(Job.class);
+ final long jobId = 111;
+ final byte[] payload = Helper.stringToBytes(testMessage);
+
+ when(jobMock.getJobId()).thenReturn(jobId);
+ when(jobMock.getData()).thenReturn(payload);
+ when(client.reserve(anyInt()))
+ .thenThrow(new BeanstalkException("test"))
+ .thenReturn(jobMock);
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(1);
+ result.expectedBodiesReceived(testMessage);
+ result.expectedPropertyReceived(Headers.JOB_ID, jobId);
+ result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
+ result.assertIsSatisfied(100);
+
+ verify(client, atLeast(1)).reserve(anyInt());
+ verify(client, times(1)).close();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("beanstalk:tube?consumer.onFailure=release").process(processor).to("mock:result");
+ }
+ };
+ }
+}
@@ -16,14 +16,16 @@
package com.osinka.camel.beanstalk;
import com.surftools.BeanstalkClient.Job;
+import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
+
import static org.mockito.Mockito.*;
-public class CompletionTest extends BeanstalkMockTestSupport {
+public class ImmediateConsumerTest extends BeanstalkMockTestSupport {
final String testMessage = "hello, world";
boolean shouldIdie = false;
@@ -35,34 +37,32 @@ public void process(Exchange exchange) throws InterruptedException {
};
@Test
- public void testDeleteOnComplete() throws Exception {
+ public void testDeleteOnSuccess() throws Exception {
+ final Job jobMock = mock(Job.class);
final long jobId = 111;
final byte[] payload = Helper.stringToBytes(testMessage);
- final Job jobMock = mock(Job.class);
when(jobMock.getJobId()).thenReturn(jobId);
when(jobMock.getData()).thenReturn(payload);
when(client.reserve(anyInt()))
- .thenReturn(jobMock)
- .thenReturn(null);
+ .thenReturn(jobMock)
+ .thenReturn(null);
MockEndpoint result = getMockEndpoint("mock:result");
- result.expectedMinimumMessageCount(1);
+ result.expectedMessageCount(1);
result.expectedBodiesReceived(testMessage);
result.expectedPropertyReceived(Headers.JOB_ID, jobId);
result.message(0).header(Headers.JOB_ID).isEqualTo(jobId);
- result.assertIsSatisfied(1000);
+ result.assertIsSatisfied(100);
- verify(client, atLeastOnce()).reserve(anyInt());
- verify(client).delete(jobId);
+ verify(client, atLeast(1)).reserve(0);
+ verify(client, atLeast(1)).delete(jobId);
}
@Test
- public void testReleaseOnFailure() throws Exception {
+ public void testDeleteOnFailure() throws Exception {
shouldIdie = true;
final long jobId = 111;
- final long priority = BeanstalkComponent.DEFAULT_PRIORITY;
- final int delay = BeanstalkComponent.DEFAULT_DELAY;
final byte[] payload = Helper.stringToBytes(testMessage);
final Job jobMock = mock(Job.class);
@@ -77,15 +77,15 @@ public void testReleaseOnFailure() throws Exception {
result.assertIsNotSatisfied(1000);
verify(client, atLeastOnce()).reserve(anyInt());
- verify(client).release(jobId, priority, delay);
+ verify(client, atLeast(1)).delete(jobId);
}
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
- from("beanstalk:tube?consumer.onFailure=release").process(processor).to("mock:result");
+ from("beanstalk:tube?consumer.awaitJob=false").process(processor).to("mock:result");
}
};
}

0 comments on commit abcb3ed

Please sign in to comment.