Skip to content
Permalink
Browse files

[FIXED JENKINS-11251] reimplemented I/O and Request/Response sync

See PipeWriter javadoc for the discussion and the context of this.
This change re-implements the original fix for JENKINS-9189.
  • Loading branch information...
kohsuke committed Jun 15, 2012
1 parent cb1854b commit e0e154d12d7a10759287b187467389c6e643c12b
@@ -137,8 +137,23 @@
*/
private volatile Throwable outClosed = null;

/**
* Requests that are sent to the remote side for execution, yet we are waiting locally until
* we hear back their responses.
*/
/*package*/ final Map<Integer,Request<?,?>> pendingCalls = new Hashtable<Integer,Request<?,?>>();

/**
* Remembers last I/O ID issued from locally to the other side, per thread.
* int[1] is used as a holder of int.
*/
private final ThreadLocal<int[]> lastIoId = new ThreadLocal<int[]>() {
@Override
protected int[] initialValue() {
return new int[1];
}
};

/**
* Records the {@link Request}s being executed on this channel, sent by the remote peer.
*/
@@ -215,6 +230,8 @@
*/
public final AtomicInteger resourceLoadingCount = new AtomicInteger();

private final AtomicInteger ioId = new AtomicInteger();

/**
* Property bag that contains application-specific stuff.
*/
@@ -250,7 +267,7 @@
* in case read/write blocks. It is single thread to ensure FIFO; I/O needs to execute
* in the same order the remote peer told us to execute them.
*/
/*package*/ final ExecutorService pipeWriter;
/*package*/ final PipeWriter pipeWriter;

/**
* ClassLaoder that remote classloaders should use as the basis.
@@ -403,7 +420,7 @@ public Channel(String name, ExecutorService exec, CommandTransport transport, bo
remoteChannel = RemoteInvocationHandler.wrap(this,1,IChannel.class,true,false);

this.remoteCapability = transport.getRemoteCapability();
this.pipeWriter = createPipeWriter();
this.pipeWriter = new PipeWriter(createPipeWriterExecutor());

this.transport = transport;

@@ -453,7 +470,7 @@ public void onClosed(Channel channel, IOException cause) {}
* reader thread (thus prevent blockage.) Otherwise let the channel reader thread do it,
* which is the historical behaviour.
*/
private ExecutorService createPipeWriter() {
private ExecutorService createPipeWriterExecutor() {
if (remoteCapability.supportsPipeThrottling())
return Executors.newSingleThreadExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
@@ -1024,6 +1041,28 @@ public ListeningPort createRemoteToLocalPortForwarding(int recvPort, String forw
ForwarderFactory.create(forwardHost, forwardPort));
}

/**
* Dispenses the unique I/O ID.
*
* When a {@link Channel} requests an activity that happens in {@link #pipeWriter},
* the sender assigns unique I/O ID to this request, which enables later
* commands to sync up with their executions.
*
* @see PipeWriter
*/
/*package*/ int newIoId() {
int v = ioId.incrementAndGet();
lastIoId.get()[0] = v;
return v;
}

/**
* Gets the last I/O ID issued by the calling thread, or 0 if none is recorded.
*/
/*package*/ int lastIoId() {
return lastIoId.get()[0];
}

/**
* Blocks until all the I/O packets sent before this gets fully executed by the remote side, then return.
*
@@ -1034,15 +1073,34 @@ public void syncIO() throws IOException, InterruptedException {
call(new IOSyncer());
}

// Barrier doesn't work because IOSyncer is a Callable and not Command
// (yet making it Command would break JENKINS-5977, which introduced this split in the first place!)
// /**
// * Non-blocking version of {@link #syncIO()} that has a weaker commitment.
// *
// * This method only guarantees that any later remote commands will happen after all the I/O packets sent before
// * this method call gets fully executed. This is faster in that it it doesn't wait for a response
// * from the other side, yet it normally achieves the desired semantics.
// */
// public void barrierIO() throws IOException {
// callAsync(new IOSyncer());
// }

public void syncLocalIO() throws InterruptedException {
Thread t = Thread.currentThread();
String old = t.getName();
t.setName("I/O sync: "+old);
try {
pipeWriter.submit(new Runnable() {
// no one waits for the completion of this Runnable, so not using I/O ID
pipeWriter.submit(0,new Runnable() {
public void run() {
// noop
}
}).get();
} catch (ExecutionException e) {
throw new AssertionError(e); // impossible
} finally {
t.setName(old);
}
}

@@ -193,7 +193,8 @@ public ConnectCommand(int oidRos, int oidPos) {
}

protected void execute(final Channel channel) {
channel.pipeWriter.submit(new Runnable() {
// ordering barrier not needed for this I/O call, so not giving I/O ID.
channel.pipeWriter.submit(0,new Runnable() {
public void run() {
try {
final ProxyOutputStream ros = (ProxyOutputStream) channel.getExportedObject(oidRos);
@@ -0,0 +1,180 @@
package hudson.remoting;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;

/**
* I/O task scheduler.
*
* <p>
* Remoting handles I/O in a separate thread to prevent deadlock (JENKINS-5977), but that means occasionally
* we need to synchronize between {@link Command}s and I/O (JENKINS-9189, JENKINS-7871, JENKINS-11251.)
*
* This class controls the task submission to I/O thread, and provides means to synchronize with it as follows:
*
* <ol>
* <li>
* The sending {@link Channel} assigns an unique I/O ID (via {@link Channel#ioId} to every I/O {@link Command}
* that it sends out. It also remembers the last I/O ID issued by the thread via {@link Channel#lastIoId}.
*
* <li>
* The receiving {@link Channel} uses that in {@link PipeWriter#submit(int, Runnable)} to enable re-discovery later.
*
* <li>
* {@link Future}s are maintained and made discoverable by their I/O ID as I/O operations take place.
*
* <li>
* When sending {@link Channel} later issues a {@link Request}, it uses {@link Channel#lastIoId} to recover
* which I/O operation needs to take place before the {@link Request} can happen.
*
* <li>
* The receiving {@link Channel} honors that before it gets {@link Request}.
*
* <li>
* By the same token, the receiving {@link Channel} also records what I/O operations are issued by the
* closure sent by {@link Request}. When it sends back {@link Response}, it also sends out the last I/O ID
* issued by the closure ({@link Response#lastIoId}.
*
* <li>
* {@link Request} on the sending {@link Channel} honors this "last I/O ID" before it returns with the
* response.
* </ol>
*
* I/O ID tracking and synchronization is per thread. This prevents the regression of JENKINS-5977.
*
* <h2>Backward Compatibility</h2>
* <p>
* When one side (say sender) predates I/O ID, the other side sees all I/O IDs as 0. So the receiver won't actually does the
* {@link Future} book-keeping, and it will not wait for any I/O operations on its side, thanks to 0 being a special value.
* Similarly, all the I/O ID tracking the receiver does get ignored by the sender.
*
* <h2>Motivation</h2>
* <p>
* This change is motivated by the fact that a certain degree of synchronization between {@link Request}/{@link Response}s
* and I/O operations are desirable. The original issue was JENKINS-9189, which had the following:
*
* <pre>
* OutputStream os = new RemoteOutputStream(...);
* channel.call(new Callable() {
* os.write(...);
* });
* os.close();
* </pre>
*
* <p>
* The normal expectation is that by the time closure returns, all 'os.write()' operations are completed.
* Yet since I/O and commands are separate, so unless the remoting library does synchronization,
* 'os.close()' can happen before some of 'os.write()' calls, and ends up in a file truncation.
*
* <p>
* That was fixed by 9cdd9cc0c5640beeb6bf36a4b26fa1ddcce7fd60 in the core originally, by adding
* synchronization between I/O calls and {@link Response}, but then later
* we discovered JENKINS-11251, which basically boils down to the following:
*
* <pre>
* FilePath f = ...;
* OutputStream os = f.write()
* IOUtils.copy(data,os)
* os.close()
*
* f.act(new Callable() {
* ... act on this newly created file ...
* });
* </pre>
*
* <p>
* This now requires {@link Response} and I/O call coordination.
*
* <p>
* This I/O ID based approach unifies both kind of coordination.
*
* @author Kohsuke Kawaguchi
*/
class PipeWriter {
private static final class FutureHolder {
private Future<?> f;

public synchronized Future<?> set(Future<?> f) {
this.f = f;
notifyAll();
return f;
}

public synchronized Future<?> get() throws InterruptedException {
while (f==null)
wait();
return f;
}
}

private final Map<Integer,FutureHolder> pendingIO = Collections.synchronizedMap(new HashMap<Integer, FutureHolder>());

/**
* Actually carries out the {@link Runnable}s.
*/
private final ExecutorService base;

public PipeWriter(ExecutorService base) {
this.base = base;
}

public void shutdown() {
base.shutdown();
}

/**
* @param id
* I/O ID that used later in {@link #get(int)}. The value 0 has a special meaning
* that indicates that no sync is needed later. Otherwise the caller is responsible
* for assigning unique values.
*
* @return
* Future object that can be used to wait for the completion of the submitted I/O task.
*/
public Future<?> submit(final int id, final Runnable command) {
if (id==0)
return base.submit(command);

// this indirection is needed to ensure that put happens before remove
// if we store Future itself, then remove can happen before put, and
// we'll end up leaking
FutureHolder fh = new FutureHolder();

FutureHolder old = pendingIO.put(id, fh);
assert old==null;

return fh.set(base.submit(new Runnable() {
public void run() {
try {
command.run();
} finally {
FutureHolder old = pendingIO.remove(id);
assert old!=null;
}
}
}));
}

/**
* Gets the {@link Future} object that can be used to wait for the completion of the submitted I/O task.
*
* Unlike POSIX wait() call, {@link PipeWriter} doesn't require that someone waits for the completion
* of an I/O task. The consequence of that is that {@link PipeWriter} cannot differentiate between
* invalid ID vs ID that was used and completed long time ago. In both cases, a {@link Future} object
* that's already in the signaled state is returned.
*
* @return
* never null. Unlike normal contract of {@link Future}, the object returned from this method
* cannot necessarily be able to distinguish the normal and abnormal completion of the task.
*/
public Future<?> get(int id) throws InterruptedException {
FutureHolder f = pendingIO.get(id);
if (f==null) return SIGNALED; // already completed
return f.get();
}

private static final Future<?> SIGNALED = new AsyncFutureImpl<Object>(new Object());
}
Oops, something went wrong.

0 comments on commit e0e154d

Please sign in to comment.
You can’t perform that action at this time.