diff --git a/build.gradle b/build.gradle index 7fcc84f..6dfbea5 100644 --- a/build.gradle +++ b/build.gradle @@ -8,13 +8,15 @@ sourceCompatibility = 1.8 targetCompatibility = 1.8 repositories { - maven { url "http://repo.maven.apache.org/maven2" } + maven { url "http://maven.springframework.org/release" } + maven { url "http://maven.restlet.org" } + mavenCentral() } dependencies { - compile 'io.netty:netty-all:4.1.17.Final' - compile 'com.google.guava:guava:23.4-jre' - compile 'org.slf4j:slf4j-api:1.7.25' + compile 'io.netty:netty-all:4.1.76.Final' + compile 'com.google.guava:guava:29.0-jre' + compile 'org.slf4j:slf4j-api:1.7.30' runtime 'ch.qos.logback:logback-classic:1.2.3' testCompile 'junit:junit:4.8.1' } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 20acdad..f05cc53 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.3-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-3.0-all.zip diff --git a/src/main/java/org/freeswitch/esl/client/inbound/InboundChannelInitializer.java b/src/main/java/org/freeswitch/esl/client/inbound/InboundChannelInitializer.java index ea70595..a47ba72 100644 --- a/src/main/java/org/freeswitch/esl/client/inbound/InboundChannelInitializer.java +++ b/src/main/java/org/freeswitch/esl/client/inbound/InboundChannelInitializer.java @@ -23,10 +23,9 @@ public InboundChannelInitializer(ChannelHandler handler) { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("decoder", new EslFrameDecoder(8192)); - // now the inbound client logic pipeline.addLast("clientHandler", handler); - pipeline.addLast("encoder", new StringEncoder()); } } diff --git a/src/main/java/org/freeswitch/esl/client/outbound/OutboundChannelInitializer.java b/src/main/java/org/freeswitch/esl/client/outbound/OutboundChannelInitializer.java index 197abca..5c45e8d 100644 --- a/src/main/java/org/freeswitch/esl/client/outbound/OutboundChannelInitializer.java +++ b/src/main/java/org/freeswitch/esl/client/outbound/OutboundChannelInitializer.java @@ -4,23 +4,24 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; import org.freeswitch.esl.client.transport.message.EslFrameDecoder; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; public class OutboundChannelInitializer extends ChannelInitializer { private final IClientHandlerFactory clientHandlerFactory; - private ExecutorService callbackExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService callbackExecutor = Executors.newSingleThreadExecutor(); + private final EventExecutorGroup customLogicExecutor; - public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory) { + public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory, int outboundEventThreadCount) { this.clientHandlerFactory = clientHandlerFactory; - } - - public OutboundChannelInitializer setCallbackExecutor(ExecutorService callbackExecutor) { - this.callbackExecutor = callbackExecutor; - return this; + this.customLogicExecutor = new DefaultEventExecutorGroup(outboundEventThreadCount, new OutboundClientEventThreadFactory()); } @Override @@ -32,9 +33,36 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast("decoder", new EslFrameDecoder(8092, true)); // now the outbound client logic - pipeline.addLast("clientHandler", + pipeline.addLast(this.customLogicExecutor, "clientHandler", new OutboundClientHandler( clientHandlerFactory.createClientHandler(), callbackExecutor)); } + + static class OutboundClientEventThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + OutboundClientEventThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "outboundClientEventExecutor-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable runnable) { + Thread rabbitmqTaskWorker = new Thread(group, runnable, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (rabbitmqTaskWorker.isDaemon()) + rabbitmqTaskWorker.setDaemon(false); + if (rabbitmqTaskWorker.getPriority() != Thread.NORM_PRIORITY) + rabbitmqTaskWorker.setPriority(Thread.NORM_PRIORITY); + return rabbitmqTaskWorker; + } + } } diff --git a/src/main/java/org/freeswitch/esl/client/outbound/SocketClient.java b/src/main/java/org/freeswitch/esl/client/outbound/SocketClient.java index 00f6154..b1488bf 100644 --- a/src/main/java/org/freeswitch/esl/client/outbound/SocketClient.java +++ b/src/main/java/org/freeswitch/esl/client/outbound/SocketClient.java @@ -40,10 +40,12 @@ public class SocketClient extends AbstractService { private final Logger log = LoggerFactory.getLogger(this.getClass()); + private static final int DEFAULT_OUTBOUND_EVENT_EXECUTOR_THREAD_COUNT = 32; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; private final IClientHandlerFactory clientHandlerFactory; private final SocketAddress bindAddress; + private final int outboundEventExecutorThreadCount; private Channel serverChannel; @@ -52,6 +54,14 @@ public SocketClient(SocketAddress bindAddress, IClientHandlerFactory clientHandl this.clientHandlerFactory = clientHandlerFactory; this.bossGroup = new NioEventLoopGroup(); this.workerGroup = new NioEventLoopGroup(); + this.outboundEventExecutorThreadCount = DEFAULT_OUTBOUND_EVENT_EXECUTOR_THREAD_COUNT; + } + public SocketClient(SocketAddress bindAddress, IClientHandlerFactory clientHandlerFactory, int outboundEventExecutorThreadCount) { + this.bindAddress = bindAddress; + this.clientHandlerFactory = clientHandlerFactory; + this.bossGroup = new NioEventLoopGroup(); + this.workerGroup = new NioEventLoopGroup(); + this.outboundEventExecutorThreadCount = outboundEventExecutorThreadCount; } @Override @@ -59,7 +69,7 @@ protected void doStart() { final ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) - .childHandler(new OutboundChannelInitializer(clientHandlerFactory)) + .childHandler(new OutboundChannelInitializer(clientHandlerFactory, outboundEventExecutorThreadCount)) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true); diff --git a/src/main/java/org/freeswitch/esl/client/transport/message/EslFrameDecoder.java b/src/main/java/org/freeswitch/esl/client/transport/message/EslFrameDecoder.java index f1bb095..1a2d4d7 100644 --- a/src/main/java/org/freeswitch/esl/client/transport/message/EslFrameDecoder.java +++ b/src/main/java/org/freeswitch/esl/client/transport/message/EslFrameDecoder.java @@ -136,7 +136,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List ou * read the content-length specified */ int contentLength = currentMessage.getContentLength(); - ByteBuf bodyBytes = buffer.readBytes(contentLength); + ByteBuf bodyBytes = buffer.readSlice(contentLength); log.debug("read [{}] body bytes", bodyBytes.writerIndex()); // most bodies are line based, so split on LF while (bodyBytes.isReadable()) {