Skip to content
Permalink
Browse files

[JENKINS-45294] - RemoteInvocationHandler#RPCRequest is now a subject…

… for channel status checks (#209)

* [JENKINS-45294] - RemoteInvocationHandler#RPCRequest is now a subject for channel status checks

It is same as #175, but for RPC Requests.
The idea is to prevent hanging of RPC Requests in edge cases when the channel goes down.

* [JENKINS-45294] - Fix the stupid bug

* [JENKINS-45294] - Add direct unit test of hanging RPC Call

* [JENKINS-45294] - Address comments from @jglick
  • Loading branch information
oleg-nenashev committed Nov 7, 2017
1 parent bb1ceee commit b46db643959c9b4c9837d5406cfaf08cad418e6c
@@ -530,7 +530,7 @@ protected Channel(@Nonnull ChannelBuilder settings, @Nonnull CommandTransport tr

if(internalExport(IChannel.class, this, false)!=1)
throw new AssertionError(); // export number 1 is reserved for the channel itself
remoteChannel = RemoteInvocationHandler.wrap(this,1,IChannel.class,true,false);
remoteChannel = RemoteInvocationHandler.wrap(this,1,IChannel.class,true,false,false);

this.remoteCapability = transport.getRemoteCapability();
this.pipeWriter = new PipeWriter(createPipeWriterExecutor());
@@ -678,7 +678,7 @@ private ExecutorService createPipeWriterExecutor() {
*/
@Override
public <T> T export(Class<T> type, T instance) {
return export(type, instance, true);
return export(type, instance, true, true);
}

/**
@@ -695,7 +695,7 @@ private ExecutorService createPipeWriterExecutor() {
* {@code null} if the input instance is {@code null}.
*/
@Nullable
/*package*/ <T> T export(Class<T> type, @CheckForNull T instance, boolean userProxy) {
/*package*/ <T> T export(Class<T> type, @CheckForNull T instance, boolean userProxy, boolean userScope) {
if(instance==null) {
return null;
}
@@ -714,7 +714,7 @@ private ExecutorService createPipeWriterExecutor() {
// either local side will auto-unexport, or the remote side will unexport when it's GC-ed
boolean autoUnexportByCaller = exportedObjects.isRecording();
final int id = internalExport(type, instance, autoUnexportByCaller);
return RemoteInvocationHandler.wrap(null, id, type, userProxy, autoUnexportByCaller);
return RemoteInvocationHandler.wrap(null, id, type, userProxy, autoUnexportByCaller, userScope);
}

/*package*/ <T> int internalExport(Class<T> clazz, T instance) {
@@ -51,7 +51,7 @@
*/
@Nonnull
public synchronized ClassLoader get(int oid) {
return get(RemoteInvocationHandler.wrap(channel,oid,IClassLoader.class,false,false));
return get(RemoteInvocationHandler.wrap(channel,oid,IClassLoader.class,false,false,false));
}

/**
@@ -740,7 +740,10 @@ public static IClassLoader export(@Nonnull ClassLoader cl, Channel local) {
return new RemoteIClassLoader(oid,rcl.proxy);
}
}
return local.export(IClassLoader.class, new ClassLoaderProxy(cl,local), false);
// Remote classloader operates in the System scope (JENKINS-45294).
// It's probably YOLO, but otherwise the termination calls may be unable
// to execute correctly.
return local.export(IClassLoader.class, new ClassLoaderProxy(cl,local), false, false);
}

public static void pin(ClassLoader cl, Channel local) {
@@ -53,6 +53,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jenkinsci.remoting.RoleChecker;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.NoExternalUse;

/**
* Sits behind a proxy object and implements the proxy logic.
@@ -117,27 +119,40 @@
*/
private final Throwable origin;

/**
* Indicates that the handler operates in the user space.
* In such case the requests will be automatically failed if the
* Remoting channel is not fully operational.
*/
private final boolean userSpace;

/**
* Creates a proxy that wraps an existing OID on the remote.
*/
RemoteInvocationHandler(Channel channel, int id, boolean userProxy, boolean autoUnexportByCaller, Class proxyType) {
RemoteInvocationHandler(Channel channel, int id, boolean userProxy,
boolean autoUnexportByCaller, boolean userSpace,
Class proxyType) {
this.channel = channel == null ? null : channel.ref();
this.oid = id;
this.userProxy = userProxy;
this.origin = new Exception("Proxy "+toString()+" was created for "+proxyType);
this.autoUnexportByCaller = autoUnexportByCaller;
this.userSpace = userSpace;
}

/**
* Wraps an OID to the typed wrapper.
*
* @param userProxy If {@code true} (recommended), all commands will be wrapped into {@link UserRequest}s.
* @param userSpace If {@code true} (recommended), the requests will be executed in a user scope
*/
@Nonnull
public static <T> T wrap(Channel channel, int id, Class<T> type, boolean userProxy, boolean autoUnexportByCaller) {
static <T> T wrap(Channel channel, int id, Class<T> type, boolean userProxy, boolean autoUnexportByCaller, boolean userSpace) {
ClassLoader cl = type.getClassLoader();
// if the type is a JDK-defined type, classloader should be for IReadResolve
if(cl==null || cl==ClassLoader.getSystemClassLoader())
cl = IReadResolve.class.getClassLoader();
RemoteInvocationHandler handler = new RemoteInvocationHandler(channel, id, userProxy, autoUnexportByCaller, type);
RemoteInvocationHandler handler = new RemoteInvocationHandler(channel, id, userProxy, autoUnexportByCaller, userSpace, type);
if (channel != null) {
if (!autoUnexportByCaller) {
UNEXPORTER.watch(handler);
@@ -253,7 +268,9 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
// delegate the rest of the methods to the remote object

boolean async = method.isAnnotationPresent(Asynchronous.class);
RPCRequest req = new RPCRequest(oid, method, args, userProxy ? dc.getClassLoader() : null);
RPCRequest req = userSpace
? new UserRPCRequest(oid, method, args, userProxy ? dc.getClassLoader() : null)
: new RPCRequest(oid, method, args, userProxy ? dc.getClassLoader() : null);
try {
if(userProxy) {
if (async) channelOrFail().callAsync(req);
@@ -825,17 +842,17 @@ private void onChannelTermination(Channel channel) {
* The downside of this is that the classes used as a parameter/return value
* must be available to both JVMs.
*
* If used as {@link Callable} in conjunction with {@link UserRequest},
* this can be used to send a method call to user-level objects, and
* classes for the parameters and the return value are sent remotely if needed.
* For user-space commands and operations, there is a {@link UserRPCRequest} implementation.
*
* @see UserRPCRequest
*/
static final class RPCRequest extends Request<Serializable,Throwable> implements DelegatingCallable<Serializable,Throwable> {
static class RPCRequest extends Request<Serializable,Throwable> implements DelegatingCallable<Serializable,Throwable> {
/**
* Target object id to invoke.
*/
private final int oid;
protected final int oid;

private final String methodName;
protected final String methodName;
/**
* Type name of the arguments to invoke. They are names because
* neither {@link Method} nor {@link Class} is serializable.
@@ -935,12 +952,47 @@ private Method choose(Class[] interfaces) {
return arguments;
}

@Override
public String toString() {
return "RPCRequest("+oid+","+methodName+")";
}

private static final long serialVersionUID = 1L;
}

/**
* User-space version of {@link RPCRequest}.
*
* This is an equivalent of {@link UserRequest} for RPC calls.
* Such kind of requests will not be send over closing or malfunctional channel.
*
* If used as {@link Callable} in conjunction with {@link UserRequest},
* this can be used to send a method call to user-level objects, and
* classes for the parameters and the return value are sent remotely if needed.
*/
static class UserRPCRequest extends RPCRequest {
public UserRPCRequest(int oid, Method m, Object[] arguments, ClassLoader cl) {
super(oid, m, arguments, cl);
}

@Override
public String toString() {
return "UserRPCRequest("+oid+","+methodName+")";
}

// Same implementation as UserRequest
@Override
public void checkIfCanBeExecutedOnChannel(Channel channel) throws IOException {
// Default check for all requests
super.checkIfCanBeExecutedOnChannel(channel);

// We also do not want to run UserRequests when the channel is being closed
if (channel.isClosingOrClosed()) {
throw new ChannelClosedException("The request cannot be executed on channel " + channel + ". "
+ "The channel is closing down or has closed down", channel.getCloseRequestCause());
}
}
}

private static final Object[] EMPTY_ARRAY = new Object[0];
}
@@ -11,15 +11,22 @@
import java.lang.reflect.Proxy;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;
import static org.hamcrest.MatcherAssert.assertThat;

import org.jenkinsci.remoting.RoleChecker;
import org.jvnet.hudson.test.Issue;
import sun.rmi.runtime.Log;

/**
* @author Kohsuke Kawaguchi
@@ -241,7 +248,57 @@ public void testShouldNotAcceptUserRequestsWhenIsBeingClosed() throws Exception
assertFailsWithChannelClosedException(TestRunnable.forUserRequest_callAsync(delayedRequest, testPayload));
}
}


/**
* Checks if {@link UserRequest}s can be executed during the pending close operation.
* @throws Exception Test Error
*/
@Issue("JENKINS-45294")
public void testShouldNotAcceptUserRPCRequestsWhenIsBeingClosed() throws Exception {

Collection<String> src = new ArrayList<>();
src.add("Hello");
src.add("World");

//TODO: System request will just hang. Once JENKINS-44785 is implemented, all system requests
// in Remoting codebase must have a timeout.
final Collection remoteList = channel.call(new RMIObjectExportedCallable<>(src, Collection.class, true));

try (ChannelCloseLock lock = new ChannelCloseLock(channel)) {
// Call Async
assertFailsWithChannelClosedException(new TestRunnable() {
@Override
public void run(Channel channel) throws Exception, AssertionError {
remoteList.size();
}
});
}
}

private static class RMIObjectExportedCallable<TInterface> implements Callable<TInterface, Exception> {

private final TInterface object;
private final Class<TInterface> clazz;
private final boolean userSpace;

RMIObjectExportedCallable(TInterface object, Class<TInterface> clazz, boolean userSpace) {
this.object = object;
this.clazz = clazz;
this.userSpace = userSpace;
}

@Override
public TInterface call() throws Exception {
// UserProxy is used only for the user space, otherwise it will be wrapped into UserRequest
return Channel.current().export(clazz, object, userSpace, userSpace);
}

@Override
public void checkRoles(RoleChecker checker) throws SecurityException {

}
}

private static final class NeverEverCallable implements Callable<Void, Exception> {

private static final long serialVersionUID = 1L;
@@ -361,11 +418,13 @@ private void assertFailsWithChannelClosedException(TestRunnable call) throws Ass
try {
call.run(channel);
} catch(Exception ex) {
if (ex instanceof ChannelClosedException) {
Logger.getLogger(ChannelTest.class.getName()).log(Level.WARNING, "Call execution failed with exception", ex);
Throwable cause = ex instanceof RemotingSystemException ? ex.getCause() : ex;
if (cause instanceof ChannelClosedException) {
// Fine
return;
} else {
throw new AssertionError("Expected ChannelClosedException, but got another exception", ex);
throw new AssertionError("Expected ChannelClosedException, but got another exception", cause);
}
}
fail("Expected ChannelClosedException, but the call has completed without any exception");
@@ -23,7 +23,8 @@
@Override
protected void setUp() throws Exception {
super.setUp();
checker = channel.export(PipeWriterTestChecker.class, this, false);
// Checker operates using the user-space RMI
checker = channel.export(PipeWriterTestChecker.class, this, false, true);
}

/**

0 comments on commit b46db64

Please sign in to comment.
You can’t perform that action at this time.