Skip to content

Commit

Permalink
Merge pull request #54 from jenkinsci/fix-memory-leak
Browse files Browse the repository at this point in the history
[JENKINS-28844] Fix memory leak
  • Loading branch information
kohsuke committed Jul 13, 2015
2 parents 05329f5 + b6714b7 commit bf81838
Show file tree
Hide file tree
Showing 9 changed files with 534 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -373,7 +373,7 @@ THE SOFTWARE.
<plugin> <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId> <artifactId>findbugs-maven-plugin</artifactId>
<version>2.5.2</version> <version>3.0.1</version>
<configuration> <configuration>
<excludeFilterFile>${basedir}/src/findbugs-filter.xml</excludeFilterFile> <excludeFilterFile>${basedir}/src/findbugs-filter.xml</excludeFilterFile>
<failOnError>true</failOnError> <failOnError>true</failOnError>
Expand Down
132 changes: 121 additions & 11 deletions src/main/java/hudson/remoting/Channel.java
Expand Up @@ -23,16 +23,17 @@
*/ */
package hudson.remoting; package hudson.remoting;


import org.jenkinsci.remoting.CallableDecorator;
import edu.umd.cs.findbugs.annotations.SuppressWarnings; import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import hudson.remoting.CommandTransport.CommandReceiver; import hudson.remoting.CommandTransport.CommandReceiver;
import hudson.remoting.PipeWindow.Key; import hudson.remoting.PipeWindow.Key;
import hudson.remoting.PipeWindow.Real; import hudson.remoting.PipeWindow.Real;
import hudson.remoting.forward.ForwarderFactory; import hudson.remoting.forward.ForwarderFactory;
import hudson.remoting.forward.ListeningPort; import hudson.remoting.forward.ListeningPort;
import hudson.remoting.forward.PortForwarder; import hudson.remoting.forward.PortForwarder;
import org.jenkinsci.remoting.CallableDecorator;
import org.jenkinsci.remoting.RoleChecker; import org.jenkinsci.remoting.RoleChecker;


import javax.annotation.CheckForNull;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
Expand Down Expand Up @@ -153,12 +154,7 @@ public class Channel implements VirtualChannel, IChannel, Closeable {
* Remembers last I/O ID issued from locally to the other side, per thread. * Remembers last I/O ID issued from locally to the other side, per thread.
* int[1] is used as a holder of int. * int[1] is used as a holder of int.
*/ */
private final ThreadLocal<int[]> lastIoId = new ThreadLocal<int[]>() { private final ThreadLocal<int[]> lastIoId = new ThreadLastIoId();
@Override
protected int[] initialValue() {
return new int[1];
}
};


/** /**
* Records the {@link Request}s being executed on this channel, sent by the remote peer. * Records the {@link Request}s being executed on this channel, sent by the remote peer.
Expand Down Expand Up @@ -194,6 +190,14 @@ protected int[] initialValue() {
* and therefore should be OK. * and therefore should be OK.
*/ */
private final WeakHashMap<PipeWindow.Key, WeakReference<PipeWindow>> pipeWindows = new WeakHashMap<PipeWindow.Key, WeakReference<PipeWindow>>(); private final WeakHashMap<PipeWindow.Key, WeakReference<PipeWindow>> pipeWindows = new WeakHashMap<PipeWindow.Key, WeakReference<PipeWindow>>();
/**
* There are cases where complex object cycles can cause a closed channel to fail to be garbage collected,
* these typically arrise when an {@link #export(Class, Object)} is {@link #setProperty(Object, Object)}
* (a supported and intended use case), the {@link Ref} allows us to break the object cycle on channel
* termination and simplify the circles into chains which can then be collected easily by the garbage collector.
* @since FIXME after merge
*/
private final Ref reference = new Ref(this);


/** /**
* Registered listeners. * Registered listeners.
Expand Down Expand Up @@ -504,6 +508,17 @@ public void terminate(IOException e) {
}); });
} }


/**
* Gets the {@link Ref} for this {@link Channel}. The {@link Ref} will be {@linkplain Ref#clear()}ed when
* the channel is terminated in order to break any complex object cycles.
* @return the {@link Ref} for this {@link Channel}
* @since FIXME after merge
*/
@Nonnull
/*package*/ Ref ref() {
return reference;
}

/** /**
* Callback "interface" for changes in the state of {@link Channel}. * Callback "interface" for changes in the state of {@link Channel}.
*/ */
Expand Down Expand Up @@ -814,6 +829,10 @@ public void terminate(IOException e) {
synchronized (this) { synchronized (this) {
if (e == null) throw new IllegalArgumentException(); if (e == null) throw new IllegalArgumentException();
outClosed = inClosed = e; outClosed = inClosed = e;
// we need to clear these out early in order to ensure that a GC operation while
// proceding with the close does not result in a batch of UnexportCommand instances
// being attempted to send over the locked and now closing channel.
RemoteInvocationHandler.notifyChannelTermination(this);
try { try {
transport.closeRead(); transport.closeRead();
} catch (IOException x) { } catch (IOException x) {
Expand All @@ -833,6 +852,8 @@ public void terminate(IOException e) {
executingCalls.clear(); executingCalls.clear();
} }
exportedObjects.abort(e); exportedObjects.abort(e);
// break any object cycles into simple chains to simplify work for the garbage collector
reference.clear();
} finally { } finally {
notifyAll(); notifyAll();
} }
Expand Down Expand Up @@ -936,7 +957,7 @@ public boolean isInClosed() {
* Returns true if this channel has any of the security restrictions enabled. * Returns true if this channel has any of the security restrictions enabled.
* *
* @deprecated * @deprecated
* Use methods like {@link #allowsRemoteClassLoading()} and {@link #allowsArbitraryCallable()} * Use methods like {@link #isRemoteClassLoadingAllowed()} and {@link #isArbitraryCallableAllowed()}
* to test individual features. * to test individual features.
*/ */
@Deprecated @Deprecated
Expand All @@ -948,7 +969,7 @@ public boolean isRestricted() {
* Activates/disables all the security restriction mode. * Activates/disables all the security restriction mode.
* *
* @deprecated * @deprecated
* Use methods like {@link #allowClassLoading(boolean)} and {@link #allowArbitraryCallable(boolean)} * Use methods like {@link #setRemoteClassLoadingAllowed(boolean)} and {@link #setArbitraryCallableAllowed(boolean)}
* to control individual features. * to control individual features.
*/ */
@Deprecated @Deprecated
Expand Down Expand Up @@ -1128,7 +1149,14 @@ public synchronized void close() throws IOException {
public synchronized void close(Throwable diagnosis) throws IOException { public synchronized void close(Throwable diagnosis) throws IOException {
if(outClosed!=null) return; // already closed if(outClosed!=null) return; // already closed


send(new CloseCommand(this,diagnosis)); try {
send(new CloseCommand(this, diagnosis));
} catch (IOException e) {
// send should only ever - worst case - throw an IOException so we'll just catch that and not Throwable
logger.log(Level.WARNING, "Having to terminate early", e);
terminate(e);
return;
}
outClosed = new IOException().initCause(diagnosis); // last command sent. no further command allowed. lock guarantees that no command will slip inbetween outClosed = new IOException().initCause(diagnosis); // last command sent. no further command allowed. lock guarantees that no command will slip inbetween
notifyAll(); notifyAll();
try { try {
Expand Down Expand Up @@ -1430,7 +1458,11 @@ private void updateLastHeard() {


/*package*/ static Channel setCurrent(Channel channel) { /*package*/ static Channel setCurrent(Channel channel) {
Channel old = CURRENT.get(); Channel old = CURRENT.get();
CURRENT.set(channel); if (channel == null) {
CURRENT.remove();
} else {
CURRENT.set(channel);
}
return old; return old;
} }


Expand Down Expand Up @@ -1484,4 +1516,82 @@ public static Channel current() {
// to avoid situations like this, create proxy classes that we need during the classloading // to avoid situations like this, create proxy classes that we need during the classloading
jarLoaderProxy=RemoteInvocationHandler.getProxyClass(JarLoader.class); jarLoaderProxy=RemoteInvocationHandler.getProxyClass(JarLoader.class);
} }

/**
* Do not use an anonymous inner class as that can cause a {@code this} reference to escape.
*/
private static class ThreadLastIoId extends ThreadLocal<int[]> {
@Override
protected int[] initialValue() {
return new int[1];
}
}

/**
* A reference for the {@link Channel} that can be cleared out on {@link #close()}/{@link #terminate(IOException)}.
* Could probably be replaced with {@link AtomicReference} but then we would not retain the only change being
* from valid channel to {@code null} channel symmantics of this class.
* @since FIXME after merge
* @see #reference
*/
/*package*/ static final class Ref {
/**
* The channel.
*/
@CheckForNull
private Channel channel;

/**
* Constructor.
* @param channel the {@link Channel}.
*/
private Ref(@CheckForNull Channel channel) {
this.channel = channel;
}

/**
* Returns the {@link Channel} or {@code null} if the the {@link Channel} has been closed/terminated.
* @return the {@link Channel} or {@code null}
*/
@CheckForNull
public Channel channel() {
return channel;
}

/**
* Clears the {@link #channel} to signify that the {@link Channel} has been closed and break any complex
* object cycles that might prevent the full garbage collection of the channel's associated object tree.
*/
public void clear() {
channel = null;
}

/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object o) {
// compare based on instance identity
return this == o;
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return System.identityHashCode(this);
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Channel.Ref{");
sb.append("channel=").append(channel);
sb.append('}');
return sb.toString();
}
}
} }
4 changes: 3 additions & 1 deletion src/main/java/hudson/remoting/JarCacheSupport.java
Expand Up @@ -36,7 +36,9 @@ public abstract class JarCacheSupport extends JarCache {
/** /**
* Throttle the jar downloading activity so that it won't eat up all the channel bandwidth. * Throttle the jar downloading activity so that it won't eat up all the channel bandwidth.
*/ */
private final ExecutorService downloader = new AtmostOneThreadExecutor(); private final ExecutorService downloader = new AtmostOneThreadExecutor(
new NamingThreadFactory(new DaemonThreadFactory(), JarCacheSupport.class.getSimpleName())
);


@Override @Override
public Future<URL> resolve(final Channel channel, final long sum1, final long sum2) throws IOException, InterruptedException { public Future<URL> resolve(final Channel channel, final long sum1, final long sum2) throws IOException, InterruptedException {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/hudson/remoting/Launcher.java
Expand Up @@ -158,7 +158,7 @@ public void setConnectTo(String target) {
System.err.println("Illegal parameter: "+target); System.err.println("Illegal parameter: "+target);
System.exit(1); System.exit(1);
} }
connectionTarget = new InetSocketAddress(tokens[0],Integer.valueOf(tokens[1])); connectionTarget = new InetSocketAddress(tokens[0],Integer.parseInt(tokens[1]));
} }


/** /**
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/hudson/remoting/NamingThreadFactory.java
@@ -0,0 +1,56 @@
/*
* The MIT License
*
* Copyright 2013 Jesse Glick.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package hudson.remoting;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Thread factory that sets thread name so we know who is responsible for so many threads being created.
* @since FIXME after merge
*/
public class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;

/**
* Creates a new naming factory.
* @param delegate a baseline factory, such as {@link Executors#defaultThreadFactory} or {@link DaemonThreadFactory}
* @param name an identifier to be used in thread names; might be e.g. your {@link Class#getSimpleName}
*/
public NamingThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName(String.format("%s [#%d]", name, threadNum.incrementAndGet()));
return result;
}
}
31 changes: 24 additions & 7 deletions src/main/java/hudson/remoting/RemoteClassLoader.java
Expand Up @@ -46,6 +46,8 @@


import org.jenkinsci.constant_pool_scanner.ConstantPoolScanner; import org.jenkinsci.constant_pool_scanner.ConstantPoolScanner;


import javax.annotation.CheckForNull;

import static hudson.remoting.Util.*; import static hudson.remoting.Util.*;
import static java.util.logging.Level.*; import static java.util.logging.Level.*;


Expand Down Expand Up @@ -83,7 +85,7 @@ final class RemoteClassLoader extends URLClassLoader {
/** /**
* Remote peer that the {@link #proxy} is connected to. * Remote peer that the {@link #proxy} is connected to.
*/ */
private final Channel channel; private /*mostly final*/ Channel.Ref channel;


private final Map<String,URLish> resourceMap = new HashMap<String,URLish>(); private final Map<String,URLish> resourceMap = new HashMap<String,URLish>();
private final Map<String,Vector<URLish>> resourcesMap = new HashMap<String,Vector<URLish>>(); private final Map<String,Vector<URLish>> resourcesMap = new HashMap<String,Vector<URLish>>();
Expand Down Expand Up @@ -113,13 +115,24 @@ public static ClassLoader create(ClassLoader parent, IClassLoader proxy) {


private RemoteClassLoader(ClassLoader parent, IClassLoader proxy) { private RemoteClassLoader(ClassLoader parent, IClassLoader proxy) {
super(new URL[0],parent); super(new URL[0],parent);
this.channel = RemoteInvocationHandler.unwrap(proxy); final Channel channel = RemoteInvocationHandler.unwrap(proxy);
this.channel = channel == null ? null : channel.ref();
this.underlyingProxy = proxy; this.underlyingProxy = proxy;
if (!channel.remoteCapability.supportsPrefetch() || channel.getJarCache()==null) if (channel == null || !channel.remoteCapability.supportsPrefetch() || channel.getJarCache()==null)
proxy = new DumbClassLoaderBridge(proxy); proxy = new DumbClassLoaderBridge(proxy);
this.proxy = proxy; this.proxy = proxy;
} }


/**
* Returns the backing channel or {@code null} if the channel is disconnected or otherwise unavailable.
* @return the backing channel or {@code null}.
* @since FIXME after merge
*/
@CheckForNull
private Channel channel() {
return this.channel == null ? null : this.channel.channel();
}

/** /**
* If this {@link RemoteClassLoader} represents a classloader from the specified channel, * If this {@link RemoteClassLoader} represents a classloader from the specified channel,
* return its exported OID. Otherwise return -1. * return its exported OID. Otherwise return -1.
Expand All @@ -133,7 +146,8 @@ protected Class<?> findClass(String name) throws ClassNotFoundException {
// first attempt to load from locally fetched jars // first attempt to load from locally fetched jars
return super.findClass(name); return super.findClass(name);
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
if(!channel.isRemoteClassLoadingAllowed()) final Channel channel = channel();
if(channel == null || !channel.isRemoteClassLoadingAllowed())
throw e; throw e;
// delegate to remote // delegate to remote
if (channel.remoteCapability.supportsMultiClassLoaderRPC()) { if (channel.remoteCapability.supportsMultiClassLoaderRPC()) {
Expand Down Expand Up @@ -309,7 +323,8 @@ private Class<?> loadClassFile(String name, byte[] bytes) {
throw new ClassFormatError(name + " is <8 bytes long"); throw new ClassFormatError(name + " is <8 bytes long");
} }
short bytecodeLevel = (short) ((bytes[6] << 8) + (bytes[7] & 0xFF) - 44); short bytecodeLevel = (short) ((bytes[6] << 8) + (bytes[7] & 0xFF) - 44);
if (bytecodeLevel > channel.maximumBytecodeLevel) { final Channel channel = channel();
if (channel != null && bytecodeLevel > channel.maximumBytecodeLevel) {
throw new ClassFormatError("this channel is restricted to JDK 1." + channel.maximumBytecodeLevel + " compatibility but " + name + " was compiled for 1." + bytecodeLevel); throw new ClassFormatError("this channel is restricted to JDK 1." + channel.maximumBytecodeLevel + " compatibility but " + name + " was compiled for 1." + bytecodeLevel);
} }


Expand Down Expand Up @@ -347,7 +362,8 @@ private void definePackage(String name) {
public URL findResource(String name) { public URL findResource(String name) {
// first attempt to load from locally fetched jars // first attempt to load from locally fetched jars
URL url = super.findResource(name); URL url = super.findResource(name);
if(url!=null || !channel.isRemoteClassLoadingAllowed()) return url; final Channel channel = channel();
if(url!=null || channel == null || !channel.isRemoteClassLoadingAllowed()) return url;


try { try {
if(resourceMap.containsKey(name)) { if(resourceMap.containsKey(name)) {
Expand Down Expand Up @@ -393,7 +409,8 @@ private static Vector<URL> toURLs(Vector<URLish> src) throws MalformedURLExcepti
} }


public Enumeration<URL> findResources(String name) throws IOException { public Enumeration<URL> findResources(String name) throws IOException {
if(!channel.isRemoteClassLoadingAllowed()) final Channel channel = channel();
if(channel == null || !channel.isRemoteClassLoadingAllowed())
return EMPTY_ENUMERATION; return EMPTY_ENUMERATION;


// TODO: use the locally fetched jars to speed up the look up // TODO: use the locally fetched jars to speed up the look up
Expand Down

0 comments on commit bf81838

Please sign in to comment.