Skip to content

Commit

Permalink
esl-client#32 EslFrameDecoder memory leak is fixed.
Browse files Browse the repository at this point in the history
esl-client#23 Outbound deadlock is fixed
esl-client#13 Outbound deadlock is fixed
  • Loading branch information
ykalay7 committed Apr 29, 2022
1 parent 67fa4ec commit 70c3aad
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 17 deletions.
10 changes: 6 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
maven { url "http://repo.maven.apache.org/maven2" }
maven { url "http://maven.springframework.org/release" }
maven { url "http://maven.restlet.org" }
mavenCentral()
}

dependencies {
compile 'io.netty:netty-all:4.1.17.Final'
compile 'com.google.guava:guava:23.4-jre'
compile 'org.slf4j:slf4j-api:1.7.25'
compile 'io.netty:netty-all:4.1.76.Final'
compile 'com.google.guava:guava:29.0-jre'
compile 'org.slf4j:slf4j-api:1.7.30'
runtime 'ch.qos.logback:logback-classic:1.2.3'
testCompile 'junit:junit:4.8.1'
}
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.3-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-3.0-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ public InboundChannelInitializer(ChannelHandler handler) {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("decoder", new EslFrameDecoder(8192));

// now the inbound client logic
pipeline.addLast("clientHandler", handler);
pipeline.addLast("encoder", new StringEncoder());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import org.freeswitch.esl.client.transport.message.EslFrameDecoder;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class OutboundChannelInitializer extends ChannelInitializer<SocketChannel> {

private final IClientHandlerFactory clientHandlerFactory;
private ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
private final ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
private final EventExecutorGroup customLogicExecutor;

public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory) {
public OutboundChannelInitializer(IClientHandlerFactory clientHandlerFactory, int outboundEventThreadCount) {
this.clientHandlerFactory = clientHandlerFactory;
}

public OutboundChannelInitializer setCallbackExecutor(ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
return this;
this.customLogicExecutor = new DefaultEventExecutorGroup(outboundEventThreadCount, new OutboundClientEventThreadFactory());
}

@Override
Expand All @@ -32,9 +33,36 @@ protected void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast("decoder", new EslFrameDecoder(8092, true));

// now the outbound client logic
pipeline.addLast("clientHandler",
pipeline.addLast(this.customLogicExecutor, "clientHandler",
new OutboundClientHandler(
clientHandlerFactory.createClientHandler(),
callbackExecutor));
}

static class OutboundClientEventThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

OutboundClientEventThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "outboundClientEventExecutor-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable runnable) {
Thread rabbitmqTaskWorker = new Thread(group, runnable,
namePrefix + threadNumber.getAndIncrement(),
0);
if (rabbitmqTaskWorker.isDaemon())
rabbitmqTaskWorker.setDaemon(false);
if (rabbitmqTaskWorker.getPriority() != Thread.NORM_PRIORITY)
rabbitmqTaskWorker.setPriority(Thread.NORM_PRIORITY);
return rabbitmqTaskWorker;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
public class SocketClient extends AbstractService {

private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final int DEFAULT_OUTBOUND_EVENT_EXECUTOR_THREAD_COUNT = 32;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final IClientHandlerFactory clientHandlerFactory;
private final SocketAddress bindAddress;
private final int outboundEventExecutorThreadCount;

private Channel serverChannel;

Expand All @@ -52,14 +54,22 @@ public SocketClient(SocketAddress bindAddress, IClientHandlerFactory clientHandl
this.clientHandlerFactory = clientHandlerFactory;
this.bossGroup = new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
this.outboundEventExecutorThreadCount = DEFAULT_OUTBOUND_EVENT_EXECUTOR_THREAD_COUNT;
}
public SocketClient(SocketAddress bindAddress, IClientHandlerFactory clientHandlerFactory, int outboundEventExecutorThreadCount) {
this.bindAddress = bindAddress;
this.clientHandlerFactory = clientHandlerFactory;
this.bossGroup = new NioEventLoopGroup();
this.workerGroup = new NioEventLoopGroup();
this.outboundEventExecutorThreadCount = outboundEventExecutorThreadCount;
}

@Override
protected void doStart() {
final ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new OutboundChannelInitializer(clientHandlerFactory))
.childHandler(new OutboundChannelInitializer(clientHandlerFactory, outboundEventExecutorThreadCount))
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
* read the content-length specified
*/
int contentLength = currentMessage.getContentLength();
ByteBuf bodyBytes = buffer.readBytes(contentLength);
ByteBuf bodyBytes = buffer.readSlice(contentLength);
log.debug("read [{}] body bytes", bodyBytes.writerIndex());
// most bodies are line based, so split on LF
while (bodyBytes.isReadable()) {
Expand Down

0 comments on commit 70c3aad

Please sign in to comment.