Skip to content

Commit

Permalink
CAMEL-17474: camel-core - set transacted when copying exchange in mul… (
Browse files Browse the repository at this point in the history
apache#7040)

* CAMEL-17474: camel-core - set transacted when copying exchange in multicast processor

* CAMEL-17474: camel-core - similar fix with RecipientList and Split EIP

* CAMEL-17474: add a test for Enrich EIP

* CAMEL-17474 add assertions to check the copied exchanges are marked as transactional
  • Loading branch information
zhfeng committed Mar 1, 2022
1 parent e0f667d commit 2c7b88e
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@ protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange
for (Processor processor : processors) {
// copy exchange, and do not share the unit of work
Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());

if (streamCache != null) {
if (index > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
Expand Down Expand Up @@ -288,6 +289,7 @@ protected ProcessorExchangePair createProcessorExchangePair(
Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) {
// copy exchange, and do not share the unit of work
Exchange copy = processorExchangeFactory.createCorrelatedCopy(exchange, false);
copy.adapt(ExtendedExchange.class).setTransacted(exchange.isTransacted());

// if we share unit of work, we need to prepare the child exchange
if (isShareUnitOfWork()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Route;
Expand Down Expand Up @@ -251,6 +252,7 @@ public ProcessorExchangePair next() {
// create a correlated copy as the new exchange to be routed in the splitter from the copy
// and do not share the unit of work
Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false);
newExchange.adapt(ExtendedExchange.class).setTransacted(original.isTransacted());
// If the splitter has an aggregation strategy
// then the StreamCache created by the child routes must not be
// closed by the unit of work of the child route, but by the unit of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,50 @@
*/
package org.apache.camel.itest.tx;

import java.util.Arrays;

import org.apache.camel.AggregationStrategy;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.cdi.transaction.RequiresNewJtaTransactionPolicy;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class JtaRouteTest extends CamelTestSupport {
@EndpointInject("mock:splitted")
private MockEndpoint splitted;

@EndpointInject("direct:requires_new")
private ProducerTemplate start;
@EndpointInject("mock:test")
private MockEndpoint test;

@EndpointInject("mock:a")
private MockEndpoint a;

@EndpointInject("mock:b")
private MockEndpoint b;

@EndpointInject("mock:c")
private MockEndpoint c;

@EndpointInject("mock:result")
private MockEndpoint result;

@EndpointInject("direct:split_test")
private ProducerTemplate split;

@EndpointInject("direct:multicast_test")
private ProducerTemplate multicast;

@EndpointInject("direct:recipient_test")
private ProducerTemplate recipient;

@EndpointInject("direct:enrich_test")
private ProducerTemplate enrich;

@Override
protected RouteBuilder createRouteBuilder() throws Exception {
Expand All @@ -38,23 +68,104 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() throws Exception {
context.getRegistry().bind("PROPAGATION_REQUIRES_NEW", new RequiresNewJtaTransactionPolicy());

from("direct:requires_new")
from("direct:split_test")
.transacted("PROPAGATION_REQUIRES_NEW")
.split(body()).delimiter("_").to("direct:splitted").end()
.log("after splitter log which you will never see...")
.transform().constant("requires_new");
.to("direct:split");

from("direct:split")
.split(body()).delimiter("_").to("mock:splitted").end()
.log("after splitter log which you will never see...");

from("direct:multicast_test").routeId("r.route1")
.log(LoggingLevel.DEBUG, "Entering route: ${routeId}")
.transacted()
.to("direct:multicast")
.log("will never get here");

from("direct:multicast").routeId("r.route2")
.log(LoggingLevel.DEBUG, "Entering route: ${routeId}")
.multicast()
.to("log:r.test", "direct:route3", "mock:test")
.end();

from("direct:route3").routeId("r.route3")
.process(e -> Assertions.assertTrue(e.isTransacted()))
.log(LoggingLevel.DEBUG, "Entering route: ${routeId}");

from("direct:recipient_test")
.transacted()
.to("direct:recipient");

from("direct:recipient")
.recipientList(constant("mock:a", "mock:b", "mock:c"));

from("direct:enrich_test")
.transacted()
.to("direct:enrich");

from("direct:enrich")
.enrich("direct:content", new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange == null) {
return oldExchange;
}
Object oldBody = oldExchange.getIn().getBody();
Object newBody = newExchange.getIn().getBody();
oldExchange.getIn().setBody(oldBody + " " + newBody);
return oldExchange;
}
})
.to("mock:result");

from("direct:content").transform().constant("Enrich");

from("direct:splitted").to("mock:splitted");
}
};
}

@Test
void testTransactedSplit() throws Exception {
splitted.setExpectedMessageCount(2);
splitted.expectedBodiesReceived("requires", "new");
splitted.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));

start.sendBody("requires_new");

split.sendBody("requires_new");
splitted.assertIsSatisfied();
}

@Test
public void testTransactedMultiCast() throws Exception {
test.setExpectedMessageCount(1);
test.expectedBodiesReceived("multicast");
splitted.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));

multicast.sendBody("multicast");
test.assertIsSatisfied();
}

@Test
public void testTransactedRecipient() throws Exception {
Arrays.asList(a, b, c).forEach(m -> {
m.setExpectedMessageCount(1);
m.expectedBodiesReceived("recipient");
m.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));
});

recipient.sendBody("recipient");

for (MockEndpoint m : Arrays.asList(a, b, c)) {
m.assertIsSatisfied();
}
}

@Test
public void testTransactedEnrich() throws Exception {
result.setExpectedMessageCount(1);
result.expectedBodiesReceived("Message Enrich");
result.whenAnyExchangeReceived(e -> Assertions.assertTrue(e.isTransacted()));

enrich.sendBody("Message");
result.assertIsSatisfied();
}
}

0 comments on commit 2c7b88e

Please sign in to comment.