Skip to content
Permalink
Browse files
[JENKINS-45023] - Prevent execution of commands on closed or beingClo…
…sed channels (#175)

* [JENKINS-45023] - Prevent execution of commands on closed or beingClosed channels

This is a major update of Request execution logic in remoting Channels, which should improve stability of the channel and prevent hanging of commands if the channel gets closed.

- [x] - `Channel#close()` does not always wait of synchronization to happen. There is a sender status check before the lock gets acquired. TODO: find an issue for that
- [x] - `Channel#isClosingOrClosed()` now returns `null` once the first `Channel#close()` command arrives, we do not even wait till it acquires the instance lock. The API is [used outside Remoting](https://github.com/search?q=org%3Ajenkinsci+isClosingOrClosed&type=Code), but it seems that the change is correct in that cases
- [x] - `Channel#call()` and `Channel#callAsync()` now fail if the channel `isClosingOrClosed()`. These calls implement `UserRequest`, and I do not think there is a valid case for even trying any user-space request
- [x] - Offer new API in `hudson.remoting.Request`, which allows checking the channel state before invoking a call. By default it just checks if the channel is closed (just “fail fast” without command initialization)
- [x] - Implement the new API in `UserRequest`, to prevent low-level API calls on a channel, which `isClosingOrClosed()`

* [JENKINS-45023] - Address Javadoc comments from @jglick

* [JENKINS-45023] - Chanel#terminate() should also immediately set the closeRequested flag

* [JENKINS-45023] - Use ChannelClosedException when channel is being closed and cannot accept commands

* [JENKINS-45023] - Add functional tests for the deadlocked channel

* [JENKINS-45023] - UserRequest constructor should not hang when the channel shutdown is pendind && the lock cannot be acquired
  • Loading branch information
oleg-nenashev committed Jul 4, 2017
1 parent 47b20da commit 67edc4bc4896ce99f3246b6b7cac25fbb94f71b3
@@ -326,6 +326,21 @@ public class Channel implements VirtualChannel, IChannel, Closeable {

/*package*/ @Nonnull final ClassFilter classFilter;

/**
* Indicates that close of the channel has been requested.
* When the value is {@code true}, it does not make sense to execute new user-space commands like {@link UserRequest}.
*/
private boolean closeRequested = false;

/**
* Stores cause of the close Request.
*
* In the case of race condition between multiple close operations,
* this field stores just one of them.
*/
@CheckForNull
private Throwable closeRequestCause = null;

/**
* Communication mode used in conjunction with {@link ClassicCommandTransport}.
*
@@ -586,14 +601,43 @@ public boolean isOutClosed() {
return outClosed!=null;
}

/**
* Get why the sender side of the channel has been closed.
* @return Close cause or {@code null} if the sender side is active.
* {@code null} result does not guarantee that the channel is actually operational.
* @since TODO
*/
@CheckForNull
public final Throwable getSenderCloseCause() {
return outClosed;
}

/**
* Returns {@code true} if the channel is either in the process of closing down or has closed down.
*
* If the result is {@code true}, it means that the channel will be closed at some point by Remoting,
* and that it makes no sense to send any new {@link UserRequest}s to the remote side.
* Invocations like {@link #call(hudson.remoting.Callable)} and {@link #callAsync(hudson.remoting.Callable)}
* will just fail as well.
* @since 2.33
*/
public boolean isClosingOrClosed() {
return inClosed != null || outClosed != null;
return closeRequested || inClosed != null || outClosed != null;
}

/**
* Gets cause of the close request.
*
* @return {@link #outClosed} if not {@code null}, value of the transient cache
* {@link #closeRequestCause} otherwise.
* The latter one may show random cause in the case of race conditions.
* @since TODO
*/
@CheckForNull
public Throwable getCloseRequestCause() {
return outClosed != null ? outClosed : closeRequestCause;
}

/**
* Creates the {@link ExecutorService} for writing to pipes.
*
@@ -839,6 +883,12 @@ public void setJarCache(@Nonnull JarCache jarCache) {
*/
public <V,T extends Throwable>
V call(Callable<V,T> callable) throws IOException, T, InterruptedException {
if (isClosingOrClosed()) {
// No reason to even try performing a user request
throw new ChannelClosedException("Remote call on " + name + " failed. "
+ "The channel is closing down or has closed down", getCloseRequestCause());
}

UserRequest<V,T> request=null;
try {
request = new UserRequest<V, T>(this, callable);
@@ -869,6 +919,12 @@ V call(Callable<V,T> callable) throws IOException, T, InterruptedException {
*/
public <V,T extends Throwable>
Future<V> callAsync(final Callable<V,T> callable) throws IOException {
if (isClosingOrClosed()) {
// No reason to even try performing a user request
throw new ChannelClosedException("Remote call on " + name + " failed. "
+ "The channel is closing down or has closed down", getCloseRequestCause());
}

final Future<UserResponse<V,T>> f = new UserRequest<V,T>(this, callable).callAsync(this);
return new FutureAdapter<V,UserResponse<V,T>>(f) {
protected V adapt(UserResponse<V,T> r) throws ExecutionException {
@@ -886,16 +942,28 @@ protected V adapt(UserResponse<V,T> r) throws ExecutionException {
*
* This is an SPI for {@link CommandTransport} implementation to notify
* {@link Channel} when the underlying connection is severed.
*
* Once the call is called {@link #closeRequested} will be set immediately to prevent further executions
* of {@link UserRequest}s.
*
* @param e
* The error that caused the connection to be aborted. Never null.
*/
@java.lang.SuppressWarnings("ToArrayCallWithZeroLengthArrayArgument")
@SuppressWarnings("ITA_INEFFICIENT_TO_ARRAY") // intentionally; race condition on listeners otherwise
public void terminate(@Nonnull IOException e) {

if (e == null) {
throw new IllegalArgumentException("Cause is null. Channel cannot be closed properly in such case");
}
closeRequested = true;
if (closeRequestCause == null) {
// Cache the cause value just in case it takes long to acquire the lock
closeRequestCause = e;
}

try {
synchronized (this) {
if (e == null) throw new IllegalArgumentException();
synchronized (this) {
outClosed = inClosed = e;
// we need to clear these out early in order to ensure that a GC operation while
// proceding with the close does not result in a batch of UnexportCommand instances
@@ -1275,12 +1343,15 @@ public void dumpDiagnostics(@Nonnull PrintWriter w) throws IOException {
/**
* {@inheritDoc}
*/
public synchronized void close() throws IOException {
public void close() throws IOException {
close(null);
}

/**
* Closes the channel.
*
* Once the call is called {@link #closeRequested} will be set immediately to prevent further executions
* of {@link UserRequest}s.
*
* @param diagnosis
* If someone (either this side or the other side) tries to use a channel that's already closed,
@@ -1290,32 +1361,45 @@ public synchronized void close() throws IOException {
*
* @since 2.8
*/
public synchronized void close(@CheckForNull Throwable diagnosis) throws IOException {
public void close(@CheckForNull Throwable diagnosis) throws IOException {
if(outClosed!=null) return; // already closed

try {
send(new CloseCommand(this, diagnosis));
} catch (ChannelClosedException e) {
logger.log(Level.FINEST, "Channel is already closed", e);
terminate(e);
return;
} catch (IOException e) {
// send should only ever - worst case - throw an IOException so we'll just catch that and not Throwable
logger.log(Level.WARNING, "Having to terminate early", e);
terminate(e);
return;
closeRequested = true;
if (closeRequestCause == null) {
// Cache the cause value just in case it takes long to acquire the lock
// TODO: This IOException wrapper is copy-pasted from the original logic, but do we actually need it when diagnosis is non-null?
closeRequestCause = new IOException(diagnosis);
}
outClosed = new IOException().initCause(diagnosis); // last command sent. no further command allowed. lock guarantees that no command will slip inbetween
notifyAll();
try {
transport.closeWrite();
} catch (IOException e) {
// there's a race condition here.
// the remote peer might have already responded to the close command
// and closed the connection, in which case our close invocation
// could fail with errors like
// "java.io.IOException: The pipe is being closed"
// so let's ignore this error.

synchronized(this) {
if(outClosed!=null) {
// It has been closed while we were waiting for the lock
return;
}

try {
send(new CloseCommand(this, diagnosis));
} catch (ChannelClosedException e) {
logger.log(Level.FINEST, "Channel is already closed", e);
terminate(e);
return;
} catch (IOException e) {
// send should only ever - worst case - throw an IOException so we'll just catch that and not Throwable
logger.log(Level.WARNING, "Having to terminate early", e);
terminate(e);
return;
}
outClosed = new IOException(diagnosis); // last command sent. no further command allowed. lock guarantees that no command will slip inbetween
notifyAll();
try {
transport.closeWrite();
} catch (IOException e) {
// there's a race condition here.
// the remote peer might have already responded to the close command
// and closed the connection, in which case our close invocation
// could fail with errors like
// "java.io.IOException: The pipe is being closed"
// so let's ignore this error.
}
}

// termination is done by CloseCommand when we received it.
@@ -1,24 +1,38 @@
package hudson.remoting;

import java.io.IOException;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;

/**
* Indicates that the channel is already closed.
* Indicates that the channel is already closed or being closed.
*
* @author Kohsuke Kawaguchi
*/
public class ChannelClosedException extends IOException {
/**
* @deprecated
* Use {@link #ChannelClosedException(Throwable)}.
* Use {@link #ChannelClosedException(Throwable)} or {@link #ChannelClosedException(java.lang.String, java.lang.Throwable)}.
* This constructor will not include cause of the termination.
*/
@Deprecated
public ChannelClosedException() {
super("channel is already closed");
}

public ChannelClosedException(Throwable cause) {
super("channel is already closed");
initCause(cause);
super("channel is already closed", cause);
}

/**
* Constructor.
*
* @param message Message
* @param cause Cause of the channel close/termination.
* May be {@code null} if it cannot be determined when the exception is constructed.
* @since TODO
*/
public ChannelClosedException(@Nonnull String message, @CheckForNull Throwable cause) {
super(message, cause);
}
}
@@ -33,6 +33,7 @@
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;

/**
* Request/response pattern over {@link Channel}, the layer-1 service.
@@ -103,6 +104,21 @@ protected Request() {
}
}

/**
* Checks if the request can be executed on the channel.
*
* @param channel Channel
* @throws IOException Error with explanation if the request cannot be executed.
* @since TODO
*/
public void checkIfCanBeExecutedOnChannel(@Nonnull Channel channel) throws IOException {
final Throwable senderCloseCause = channel.getSenderCloseCause();
if (senderCloseCause != null) {
// Sender is closed, we won't be able to send anything
throw new ChannelClosedException(senderCloseCause);
}
}

/**
* Sends this request to a remote system, and blocks until we receives a response.
*
@@ -118,6 +134,7 @@ protected Request() {
* If the {@link #perform(Channel)} throws an exception.
*/
public final RSP call(Channel channel) throws EXC, InterruptedException, IOException {
checkIfCanBeExecutedOnChannel(channel);
lastIoId = channel.lastIoId();

// Channel.send() locks channel, and there are other call sequences
@@ -199,6 +216,8 @@ public final RSP call(Channel channel) throws EXC, InterruptedException, IOExcep
* If there's an error during the communication.
*/
public final hudson.remoting.Future<RSP> callAsync(final Channel channel) throws IOException {
checkIfCanBeExecutedOnChannel(channel);

response=null;
lastIoId = channel.lastIoId();

@@ -70,6 +70,16 @@
*/
public UserRequest(Channel local, Callable<?,EXC> c) throws IOException {
this.toString = c.toString();
if (local.isClosingOrClosed()) {
Throwable createdAtValue = createdAt;
if (createdAtValue == null) {
// If Command API changes, the cause may be null here (e.g. if it stops recording cause by default)
createdAtValue = new IllegalStateException("Command is created for the channel being interrupted");
}
throw new ChannelClosedException("Cannot create UserRequest for channel " + local +
". The channel is closed or being closed.", createdAtValue);
}


// Before serializing anything, check that we actually have a classloader for it
final ClassLoader cl = getClassLoader(c);
@@ -90,6 +100,18 @@ public UserRequest(Channel local, Callable<?,EXC> c) throws IOException {
this.classLoaderProxy = RemoteClassLoader.export(cl, local);
}

@Override
public void checkIfCanBeExecutedOnChannel(Channel channel) throws IOException {
// Default check for all requests
super.checkIfCanBeExecutedOnChannel(channel);

// We also do not want to run UserRequests when the channel is being closed
if (channel.isClosingOrClosed()) {
throw new ChannelClosedException("The request cannot be executed on channel " + channel + ". "
+ "The channel is closing down or has closed down", channel.getCloseRequestCause());
}
}

/**
* Retrieves classloader for the callable.
* For {@link DelegatingCallable} the method will try to retrieve a classloader specified there.
@@ -39,29 +39,37 @@ public interface VirtualChannel {
*
* <p>
* Sends {@link Callable} to the remote system, executes it, and returns its result.
*
* Such calls will be considered as user-space requests.
* If the channel cannot execute the requests (e.g. when it is being closed),
* the operations may be rejected even if the channel is still active.
*
* @param callable Callable to be executed
* @throws InterruptedException
* If the current thread is interrupted while waiting for the completion.
* @throws IOException
* If there's any error in the communication between {@link Channel}s.
* @throws T User exception defined by the callable
*/
<V,T extends Throwable>
V call(Callable<V,T> callable) throws IOException, T, InterruptedException;
V call(@Nonnull Callable<V,T> callable) throws IOException, T, InterruptedException;

/**
* Makes an asynchronous remote procedure call.
*
* <p>
* Similar to {@link #call(Callable)} but returns immediately.
* The result of the {@link Callable} can be obtained through the {@link Future} object.
* Such calls will be considered as user-space requests.
* If the channel cannot execute the requests (e.g. when it is being closed),
* the operations may be rejected even if the channel is still active.
*
* @return
* The {@link Future} object that can be used to wait for the completion.
* @throws IOException
* If there's an error during the communication.
*/
<V,T extends Throwable>
Future<V> callAsync(final Callable<V,T> callable) throws IOException;
Future<V> callAsync(@Nonnull final Callable<V,T> callable) throws IOException;

/**
* Performs an orderly shut down of this channel (and the remote peer.)

0 comments on commit 67edc4b

Please sign in to comment.