Skip to content
Permalink
Browse files

[JENKINS-27555]: Re-implements JNA agent with non-blocking code

Re-implements JNA agent with a non blocking socket acceptor to
fix socket leak caused by blocking on accept() native call.
  • Loading branch information...
ydubreuil committed Jun 1, 2015
1 parent f0dc141 commit 37c4469c144b705e7e5330c0d16f3bce5dfc11ce
Showing with 94 additions and 42 deletions.
  1. +94 −42 src/main/java/com/cloudbees/jenkins/plugins/sshagent/jna/AgentServer.java
@@ -18,6 +18,7 @@
*/
package com.cloudbees.jenkins.plugins.sshagent.jna;

import jnr.enxio.channels.NativeSelectorProvider;
import jnr.posix.POSIXFactory;
import jnr.unixsocket.UnixServerSocket;
import jnr.unixsocket.UnixServerSocketChannel;
@@ -34,6 +35,11 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;


/**
@@ -47,7 +53,8 @@
private UnixSocketAddress address;
private UnixServerSocketChannel channel;
private UnixServerSocket socket;
private volatile boolean stopped = false;
private Selector selector;
private volatile boolean selectable = true;

public AgentServer() {
this(new AgentImpl());
@@ -65,31 +72,48 @@ public String start() throws Exception {
authSocket = createLocalSocketAddress();
address = new UnixSocketAddress(new File(authSocket));
channel = UnixServerSocketChannel.open();
channel.configureBlocking(true);
channel.configureBlocking(false);
socket = channel.socket();
socket.bind(address);
stopped = false;
selector = NativeSelectorProvider.getInstance().openSelector();

channel.register(selector, SelectionKey.OP_ACCEPT, new SshAgentServerSocketHandler());

POSIXFactory.getPOSIX().chmod(authSocket, 0600);
thread = new Thread("SSH Agent thread") {
public void run() {
try {
while (!stopped) {
final UnixSocketChannel clientSock = channel.accept();
clientSock.configureBlocking(true);
new SshAgentSession(clientSock, agent);
}
} catch (Exception e) {
if (!stopped) {
e.printStackTrace();
}
}
}
};

thread = new Thread(new AgentSocketAcceptor(), "SSH Agent socket acceptor " + authSocket);
thread.setDaemon(true);
thread.start();
return authSocket;
}

final class AgentSocketAcceptor implements Runnable {
public void run() {
try {
// The select() will be woke up if some new connection
// have occurred, or if the selector has been explicitly
// woke up
while (selector.select() > 0 && selectable) {
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();

while(selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();

if (key.isValid()) {
EventHandler processor = ((EventHandler) key.attachment());
processor.process(key);
}
}
}

LOGGER.log(Level.FINE, "Death of thread " + Thread.currentThread().getName());
} catch (IOException ioe) {
LOGGER.log(Level.WARNING, "Error while waiting for events", ioe);
}
}
}

static String createLocalSocketAddress() throws IOException {
String name;
if (OsUtils.isUNIX()) {
@@ -105,64 +129,92 @@ static String createLocalSocketAddress() throws IOException {
}

public void close() {
stopped = true;
selectable = false;
selector.wakeup();

// forcibly close remaining sockets
for (SelectionKey key : selector.keys()) {
if (key != null) {
safelyClose(key.channel());
}
}

safelyClose(selector);
agent.close();
safelyClose(channel);
if (authSocket != null) {
FileUtils.deleteQuietly(new File(authSocket));
}
}

protected class SshAgentSession extends AbstractAgentClient implements Runnable {
interface EventHandler {
void process(SelectionKey key) throws IOException;
}

final class SshAgentServerSocketHandler implements EventHandler {
public final void process(SelectionKey key) throws IOException {
try {
UnixSocketChannel clientChannel = channel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ, new SshAgentSessionSocketHandler(clientChannel));
} catch (IOException ex) {
LOGGER.log(Level.WARNING, "failed to accept new connection", ex);
safelyClose(channel);
throw ex;
}
}
}

private final UnixSocketChannel channel;
final class SshAgentSessionSocketHandler extends AbstractAgentClient implements EventHandler {

public SshAgentSession(UnixSocketChannel channel, SshAgent agent) {
private final UnixSocketChannel sessionChannel;

public SshAgentSessionSocketHandler(UnixSocketChannel sessionChannel) {
super(agent);
this.channel = channel;
new Thread(this).start();
this.sessionChannel = sessionChannel;
}

public void run() {
public void process(SelectionKey key) {
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
while (!stopped) {
buf.rewind();
int result = channel.read(buf);
if (result > 0) {
buf.flip();
messageReceived(new Buffer(buf.array(), buf.position(), buf.remaining()));
} else {
break;
}
int result = sessionChannel.read(buf);

if (result > 0) {
buf.flip();
messageReceived(new Buffer(buf.array(), buf.position(), buf.remaining()));
return;
}
} catch (Exception e) {
if (!stopped) {
e.printStackTrace();

if (result == -1) {
// EOF => remote closed the connection, cancel the selection key and close the channel.
key.cancel();
sessionChannel.close();
}
} finally {
safelyClose(channel);
} catch (IOException e) {
LOGGER.log(Level.INFO, "Could not write response to socket", e);
key.cancel();
safelyClose(sessionChannel);
}
}

protected void reply(Buffer buf) throws IOException {
ByteBuffer b = ByteBuffer.wrap(buf.array(), buf.rpos(), buf.available());
int result = channel.write(b);
int result = sessionChannel.write(b);
if (result < 0) {
throw new IOException("Could not write response to socket");
}
}

}

private static void safelyClose(Closeable channel) {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
// ignore
LOGGER.log(Level.INFO, "Error while closing resource", e);
}
}
}

private static final Logger LOGGER = Logger.getLogger(AgentServer.class.getName());
}

0 comments on commit 37c4469

Please sign in to comment.
You can’t perform that action at this time.