Skip to content

Commit

Permalink
Merge pull request apache#342 from jboss-fuse/ENTESB-7948
Browse files Browse the repository at this point in the history
ENTESB-7948
  • Loading branch information
oscerd committed Apr 11, 2018
2 parents 84b3faa + 25dea7b commit 40252ae
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 64 deletions.
Expand Up @@ -45,7 +45,7 @@
@UriEndpoint(firstVersion = "2.14.0", scheme = "netty4-http", extendsScheme = "netty4", title = "Netty4 HTTP",
syntax = "netty4-http:protocol:host:port/path", consumerClass = NettyHttpConsumer.class, label = "http", lenientProperties = true,
excludeProperties = "textline,delimiter,autoAppendDelimiter,decoderMaxLineLength,encoding,allowDefaultCodec,udpConnectionlessSending,networkInterface"
+ ",clientMode,reconnect,reconnectInterval,useByteBuf,udpByteArrayCodec,broadcast")
+ ",clientMode,reconnect,reconnectInterval,useByteBuf,udpByteArrayCodec,broadcast,correlationManager")
public class NettyHttpEndpoint extends NettyEndpoint implements AsyncEndpoint, HeaderFilterStrategyAware {

private static final Logger LOG = LoggerFactory.getLogger(NettyHttpEndpoint.class);
Expand Down
3 changes: 2 additions & 1 deletion components/camel-netty4/src/main/docs/netty4-component.adoc
Expand Up @@ -100,7 +100,7 @@ with the following path and query parameters:
|===


==== Query Parameters (71 parameters):
==== Query Parameters (72 parameters):


[width="100%",cols="2,5,^1,2",options="header"]
Expand Down Expand Up @@ -133,6 +133,7 @@ with the following path and query parameters:
| *connectTimeout* (producer) | Time to wait for a socket connection to be available. Value is in millis. | 10000 | int
| *requestTimeout* (producer) | Allows to use a timeout for the Netty producer when calling a remote server. By default no timeout is in use. The value is in milli seconds, so eg 30000 is 30 seconds. The requestTimeout is using Netty's ReadTimeoutHandler to trigger the timeout. | | long
| *clientInitializerFactory* (producer) | To use a custom ClientInitializerFactory | | ClientInitializer Factory
| *correlationManager* (producer) | To use a custom correlation manager to manage how request and reply messages are mapped when using request/reply with the netty producer. This should only be used if you have a way to map requests together with replies such as if there is correlation ids in both the request and reply messages. This can be used if you want to multiplex concurrent messages on the same channel (aka connection) in netty. When doing this you must have a way to correlate the request and reply messages so you can store the right reply on the inflight Camel Exchange before its continued routed. | | NettyCamelState CorrelationManager
| *lazyChannelCreation* (producer) | Channels can be lazily created to avoid exceptions, if the remote server is not up and running when the Camel producer is started. | true | boolean
| *producerPoolEnabled* (producer) | Whether producer pool is enabled or not. Important: Do not turn this off, as the pooling is needed for handling concurrency and reliable request/reply. | true | boolean
| *producerPoolMaxActive* (producer) | Sets the cap on the number of objects that can be allocated by the pool (checked out to clients, or idle awaiting checkout) at a given time. Use a negative value for no limit. | -1 | int
Expand Down
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.netty4;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;

public class DefaultNettyCamelStateCorrelationManager implements NettyCamelStateCorrelationManager {

private final Map<Channel, NettyCamelState> cache = new ConcurrentHashMap<Channel, NettyCamelState>();

@Override
public void putState(Channel channel, NettyCamelState state) {
cache.put(channel, state);
}

@Override
public void removeState(ChannelHandlerContext ctx, Channel channel) {
cache.remove(channel);
}

@Override
public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg) {
return cache.get(channel);
}

@Override
public NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause) {
return cache.get(channel);
}
}
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.netty4;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;

/**
* To manage and correlate state of {@link NettyCamelState} when doing request/reply via {@link NettyProducer}.
* <p/>
* This SPI allows custom implementations to correlate the request and replies.
*/
public interface NettyCamelStateCorrelationManager {

/**
* Puts the state.
* <p/>
* You can get access to the Camel message from the {@link NettyCamelState} instance.
*
* @param channel the channel
* @param state the Camel state to be stored
*/
void putState(Channel channel, NettyCamelState state);

/**
* Removes the state when the channel is inactive.
*
* @param ctx netty channel handler context
* @param channel the channel
*/
void removeState(ChannelHandlerContext ctx, Channel channel);

/**
* Gets the state when a response message has been received.
*
* @param ctx netty channel handler context
* @param channel the channel
* @param msg the response message
*/
NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Object msg);

/**
* Gets the state when some error occurred.
*
* @param ctx netty channel handler context
* @param channel the channel
* @param cause the error
*/
NettyCamelState getState(ChannelHandlerContext ctx, Channel channel, Throwable cause);

}
Expand Up @@ -110,6 +110,8 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
private boolean udpByteArrayCodec;
@UriParam(label = "common")
private boolean reuseChannel;
@UriParam(label = "producer,advanced")
private NettyCamelStateCorrelationManager correlationManager;

/**
* Returns a copy of this configuration
Expand Down Expand Up @@ -655,6 +657,21 @@ public void setReuseChannel(boolean reuseChannel) {
this.reuseChannel = reuseChannel;
}

public NettyCamelStateCorrelationManager getCorrelationManager() {
return correlationManager;
}

/**
* To use a custom correlation manager to manage how request and reply messages are mapped when using request/reply with the netty producer.
* This should only be used if you have a way to map requests together with replies such as if there is correlation ids in both the request
* and reply messages. This can be used if you want to multiplex concurrent messages on the same channel (aka connection) in netty. When doing
* this you must have a way to correlate the request and reply messages so you can store the right reply on the inflight Camel Exchange before
* its continued routed.
*/
public void setCorrelationManager(NettyCamelStateCorrelationManager correlationManager) {
this.correlationManager = correlationManager;
}

private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) {
if (handlers != null) {
for (T handler : handlers) {
Expand Down
Expand Up @@ -18,8 +18,6 @@

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -64,7 +62,7 @@ public class NettyProducer extends DefaultAsyncProducer {
private CamelLogger noReplyLogger;
private EventLoopGroup workerGroup;
private ObjectPool<ChannelFuture> pool;
private Map<Channel, NettyCamelState> nettyCamelStatesMap = new ConcurrentHashMap<Channel, NettyCamelState>();
private NettyCamelStateCorrelationManager correlationManager;

public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
super(nettyEndpoint);
Expand All @@ -87,13 +85,24 @@ public CamelContext getContext() {
return context;
}

public NettyCamelStateCorrelationManager getCorrelationManager() {
return correlationManager;
}

protected boolean isTcp() {
return configuration.getProtocol().equalsIgnoreCase("tcp");
}

@Override
protected void doStart() throws Exception {
super.doStart();

if (configuration.getCorrelationManager() != null) {
correlationManager = configuration.getCorrelationManager();
} else {
correlationManager = new DefaultNettyCamelStateCorrelationManager();
}

if (configuration.getWorkerGroup() == null) {
// create new pool which we should shutdown when stopping as its not shared
workerGroup = new NettyWorkerPoolBuilder()
Expand Down Expand Up @@ -301,7 +310,7 @@ public void onComplete(Exchange exchange) {
}

// setup state as attachment on the channel, so we can access the state later when needed
putState(channel, new NettyCamelState(producerCallback, exchange));
correlationManager.putState(channel, new NettyCamelState(producerCallback, exchange));
// here we need to setup the remote address information here
InetSocketAddress remoteAddress = null;
if (!isTcp()) {
Expand Down Expand Up @@ -372,28 +381,6 @@ protected Object getRequestBody(Exchange exchange) throws Exception {
return body;
}

/**
* To get the {@link NettyCamelState} from the given channel.
*/
public NettyCamelState getState(Channel channel) {
return nettyCamelStatesMap.get(channel);
}

/**
* To remove the {@link NettyCamelState} stored on the channel,
* when no longer needed
*/
public void removeState(Channel channel) {
nettyCamelStatesMap.remove(channel);
}

/**
* Put the {@link NettyCamelState} into the map use the given channel as the key
*/
public void putState(Channel channel, NettyCamelState state) {
nettyCamelStatesMap.put(channel, state);
}

protected EventLoopGroup getWorkerGroup() {
// prefer using explicit configured thread pools
EventLoopGroup wg = configuration.getWorkerGroup();
Expand All @@ -420,7 +407,7 @@ protected ChannelFuture openConnection() throws Exception {
clientBootstrap.option(ChannelOption.SO_REUSEADDR, configuration.isReuseAddress());
clientBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.getConnectTimeout());

//TODO need to check it later
//TODO need to check it later;
// set any additional netty options
/*
if (configuration.getOptions() != null) {
Expand Down Expand Up @@ -526,7 +513,6 @@ public void setConfiguration(NettyConfiguration configuration) {
this.configuration = configuration;
}


public ChannelGroup getAllChannels() {
return allChannels;
}
Expand Down Expand Up @@ -656,8 +642,7 @@ public void operationComplete(ChannelFuture future) {
}

/**
* This class is used to release body in case when some error occured and body was not handed over
* to netty
* This class is used to release body in case when some error occurred and body was not handed over to netty
*/
private static final class BodyReleaseCallback implements AsyncCallback {
private volatile Object body;
Expand Down
Expand Up @@ -75,8 +75,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
LOG.debug("Closing channel as an exception was thrown from Netty", cause);
}

Exchange exchange = getExchange(ctx);
AsyncCallback callback = getAsyncCallback(ctx);
NettyCamelState state = getState(ctx, cause);
Exchange exchange = state != null ? state.getExchange() : null;
AsyncCallback callback = state != null ? state.getCallback() : null;

// the state may not be set
if (exchange != null && callback != null) {
Expand All @@ -102,35 +103,38 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.trace("Channel closed: {}", ctx.channel());
}

Exchange exchange = getExchange(ctx);
AsyncCallback callback = getAsyncCallback(ctx);
NettyCamelState state = getState(ctx, null);
Exchange exchange = state != null ? state.getExchange() : null;
AsyncCallback callback = state != null ? state.getCallback() : null;

// remove state
producer.removeState(ctx.channel());
producer.getCorrelationManager().removeState(ctx, ctx.channel());

// to keep track of open sockets
producer.getAllChannels().remove(ctx.channel());

// this channel is maybe closing graceful and the exchange is already done
// and if so we should not trigger an exception
boolean doneUoW = exchange.getUnitOfWork() == null;

NettyConfiguration configuration = producer.getConfiguration();
if (configuration.isSync() && !doneUoW && !messageReceived && !exceptionHandled) {
// To avoid call the callback.done twice
exceptionHandled = true;
// session was closed but no message received. This could be because the remote server had an internal error
// and could not return a response. We should count down to stop waiting for a response
String address = configuration != null ? configuration.getAddress() : "";
if (LOG.isDebugEnabled()) {
LOG.debug("Channel closed but no message received from address: {}", address);
}
// don't fail the exchange if we actually specify to disconnect
if (!configuration.isDisconnect()) {
exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
if (exchange != null) {
// this channel is maybe closing graceful and the exchange is already done
// and if so we should not trigger an exception
boolean doneUoW = exchange.getUnitOfWork() == null;

NettyConfiguration configuration = producer.getConfiguration();
if (configuration.isSync() && !doneUoW && !messageReceived && !exceptionHandled) {
// To avoid call the callback.done twice
exceptionHandled = true;
// session was closed but no message received. This could be because the remote server had an internal error
// and could not return a response. We should count down to stop waiting for a response
String address = configuration.getAddress();
if (LOG.isDebugEnabled()) {
LOG.debug("Channel closed but no message received from address: {}", address);
}
// don't fail the exchange if we actually specify to disconnect
if (!configuration.isDisconnect()) {
exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
}
// signal callback
callback.done(false);
}
// signal callback
callback.done(false);
}

// make sure the event can be processed by other handlers
Expand All @@ -151,12 +155,13 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
ctx.pipeline().remove(handler);
}

Exchange exchange = getExchange(ctx);
NettyCamelState state = getState(ctx, msg);
Exchange exchange = state != null ? state.getExchange() : null;
if (exchange == null) {
// we just ignore the received message as the channel is closed
return;
}
AsyncCallback callback = getAsyncCallback(ctx);
AsyncCallback callback = state.getCallback();

Message message;
try {
Expand Down Expand Up @@ -246,14 +251,12 @@ protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ct
}
}

private Exchange getExchange(ChannelHandlerContext ctx) {
NettyCamelState state = producer.getState(ctx.channel());
return state != null ? state.getExchange() : null;
private NettyCamelState getState(ChannelHandlerContext ctx, Object msg) {
return producer.getCorrelationManager().getState(ctx, ctx.channel(), msg);
}

private AsyncCallback getAsyncCallback(ChannelHandlerContext ctx) {
NettyCamelState state = producer.getState(ctx.channel());
return state != null ? state.getCallback() : null;
private NettyCamelState getState(ChannelHandlerContext ctx, Throwable cause) {
return producer.getCorrelationManager().getState(ctx, ctx.channel(), cause);
}

}

0 comments on commit 40252ae

Please sign in to comment.