Skip to content

Commit

Permalink
DSL: .gateway() SubFlow support
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Oct 31, 2014
1 parent e6af7f7 commit c347780
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
Expand Up @@ -212,8 +212,7 @@ public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector) {
return this.filter(payloadType, genericSelector, null);
}

public <P> B filter(GenericSelector<P> genericSelector,
Consumer<FilterEndpointSpec> endpointConfigurer) {
public <P> B filter(GenericSelector<P> genericSelector, Consumer<FilterEndpointSpec> endpointConfigurer) {
return filter(null, genericSelector, endpointConfigurer);
}

Expand All @@ -227,13 +226,11 @@ public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector,
return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer);
}

public <H extends MessageHandler> B handleWithAdapter(
Function<Adapters, MessageHandlerSpec<?, H>> adapters) {
public <H extends MessageHandler> B handleWithAdapter(Function<Adapters, MessageHandlerSpec<?, H>> adapters) {
return handleWithAdapter(adapters, null);
}

public <H extends MessageHandler> B handleWithAdapter(
Function<Adapters, MessageHandlerSpec<?, H>> adapters,
public <H extends MessageHandler> B handleWithAdapter(Function<Adapters, MessageHandlerSpec<?, H>> adapters,
Consumer<GenericEndpointSpec<H>> endpointConfigurer) {
return handle(adapters.apply(new Adapters()), endpointConfigurer);
}
Expand Down Expand Up @@ -290,8 +287,7 @@ public <H extends MessageHandler> B handle(MessageHandlerSpec<?, H> messageHandl
return handle(messageHandlerSpec.get(), endpointConfigurer);
}

public <H extends MessageHandler> B handle(H messageHandler,
Consumer<GenericEndpointSpec<H>> endpointConfigurer) {
public <H extends MessageHandler> B handle(H messageHandler, Consumer<GenericEndpointSpec<H>> endpointConfigurer) {
Assert.notNull(messageHandler);
return this.register(new GenericEndpointSpec<H>(messageHandler), endpointConfigurer);
}
Expand Down Expand Up @@ -529,8 +525,7 @@ public B route(String beanName, String method) {
return this.route(beanName, method, null);
}

public B route(String beanName, String method,
Consumer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
public B route(String beanName, String method, Consumer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(beanName, method, routerConfigurer, null);
}

Expand All @@ -548,8 +543,7 @@ public B route(String expression, Consumer<RouterSpec<ExpressionEvaluatingRouter
return this.route(expression, routerConfigurer, null);
}

public B route(String expression,
Consumer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer,
public B route(String expression, Consumer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer,
Consumer<GenericEndpointSpec<ExpressionEvaluatingRouter>> endpointConfigurer) {
return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer,
endpointConfigurer);
Expand All @@ -559,8 +553,7 @@ public <S, T> B route(Function<S, T> router) {
return this.route(null, router);
}

public <S, T> B route(Function<S, T> router,
Consumer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
public <S, T> B route(Function<S, T> router, Consumer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(null, router, routerConfigurer);
}

Expand All @@ -573,8 +566,7 @@ public <P, T> B route(Class<P> payloadType, Function<P, T> router,
return this.route(payloadType, router, routerConfigurer, null);
}

public <S, T> B route(Function<S, T> router,
Consumer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
public <S, T> B route(Function<S, T> router, Consumer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
Consumer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
return route(null, router, routerConfigurer, endpointConfigurer);
}
Expand All @@ -588,8 +580,7 @@ public <P, T> B route(Class<P> payloadType, Function<P, T> router,
return route(methodInvokingRouter, routerConfigurer, endpointConfigurer);
}

public <R extends AbstractMappingMessageRouter> B route(R router,
Consumer<RouterSpec<R>> routerConfigurer,
public <R extends AbstractMappingMessageRouter> B route(R router, Consumer<RouterSpec<R>> routerConfigurer,
Consumer<GenericEndpointSpec<R>> endpointConfigurer) {
Collection<Object> componentsToRegister = null;
if (routerConfigurer != null) {
Expand Down Expand Up @@ -646,31 +637,40 @@ public B route(AbstractMessageRouter router) {
return route(router, null);
}

public <R extends AbstractMessageRouter> B route(R router,
Consumer<GenericEndpointSpec<R>> endpointConfigurer) {
public <R extends AbstractMessageRouter> B route(R router, Consumer<GenericEndpointSpec<R>> endpointConfigurer) {
return handle(router, endpointConfigurer);
}

public B gateway(String requestChannel) {
return gateway(requestChannel, null);
}

public B gateway(String requestChannel,
Consumer<GatewayEndpointSpec> endpointConfigurer) {
public B gateway(String requestChannel, Consumer<GatewayEndpointSpec> endpointConfigurer) {
return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer);
}

public B gateway(MessageChannel requestChannel) {
return gateway(requestChannel, null);
}

public B gateway(MessageChannel requestChannel,
Consumer<GatewayEndpointSpec> endpointConfigurer) {
public B gateway(MessageChannel requestChannel, Consumer<GatewayEndpointSpec> endpointConfigurer) {
return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer);
}

private <S extends ConsumerEndpointSpec<S, ?>> B register(S endpointSpec,
Consumer<S> endpointConfigurer) {
public B gateway(IntegrationFlow flow) {
return gateway(flow, null);
}

public B gateway(IntegrationFlow flow, Consumer<GatewayEndpointSpec> endpointConfigurer) {
Assert.notNull(flow);
final DirectChannel requestChannel = new DirectChannel();
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(requestChannel);
flow.accept(flowBuilder);
addComponent(flowBuilder.get());
return gateway(requestChannel, endpointConfigurer);
}

private <S extends ConsumerEndpointSpec<S, ?>> B register(S endpointSpec, Consumer<S> endpointConfigurer) {
if (endpointConfigurer != null) {
endpointConfigurer.accept(endpointSpec);
}
Expand Down
Expand Up @@ -765,7 +765,7 @@ public void testGatewayFlow() throws Exception {

Message<?> receive = replyChannel.receive(2000);
assertNotNull(receive);
assertEquals("FOO", receive.getPayload());
assertEquals("From Gateway SubFlow: FOO", receive.getPayload());
assertNull(this.gatewayError.receive(1));

message = MessageBuilder.withPayload("bar").setReplyChannel(replyChannel).build();
Expand Down Expand Up @@ -1228,6 +1228,7 @@ public IntegrationFlow routeMultiMethodInvocationFlow() {
public IntegrationFlow gatewayFlow() {
return IntegrationFlows.from("gatewayInput")
.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
.gateway(f -> f.transform("From Gateway SubFlow: "::concat))
.get();
}

Expand Down

0 comments on commit c347780

Please sign in to comment.