Skip to content
Permalink
Browse files

[JENKINS-11251 JENKINS-9189] Resurrecting what's deleted in e0e154d

When communicating with remoting < 2.15, this allows them to continue to
perform some degree of syncing, so that they can still enjoy the fix for
JENKINS-9189.

None of these code is exposed via API outside remoting, so at some point
we can revert this change to simplify the code a bit and eliminate the
redundancy, because as long as >= 2.15 remoting talk to each other,
PipeWriter does everything we need.
  • Loading branch information
kohsuke committed Jun 16, 2012
1 parent c00a150 commit 8ffed0da4996934bfc28bf6b08c258d367a1c526
Showing with 69 additions and 6 deletions.
  1. +37 −6 src/main/java/hudson/remoting/ProxyOutputStream.java
  2. +32 −0 src/main/java/hudson/remoting/Request.java
@@ -156,12 +156,41 @@ protected void finalize() throws Throwable {
}
}

/**
* I/O operations in remoting gets executed by a separate pipe thread asynchronously.
* So if a closure performs some I/O (such as writing to the RemoteOutputStream) then returns,
* it is possible that the calling thread unblocks before the I/O actually completes.
* <p>
* This race condition creates a truncation problem like JENKINS-9189 or JENKINS-7871.
* The initial fix for this was to introduce {@link Channel#syncLocalIO()}, but given the
* recurrence in JENKINS-9189, I concluded that it's too error prone to expect the user of the
* remoting to make such a call in the right place.
* <p>
* So the goal of this code is to automatically ensure the proper ordering of the return from
* the {@link Request#call(Channel)} and the I/O operations done during the call. We do this
* by attributing I/O call to a {@link Request}, then keeping track of the last I/O operation
* performed.
*
* @deprecated as of 2.16
* {@link PipeWriter} does this job better, but kept for backward compatibility to communicate
* with earlier version of remoting without losing the original fix to JENKINS-9189 completely.
*/
private static void markForIoSync(Channel channel, int requestId, java.util.concurrent.Future<?> ioOp) {
Request<?,?> call = channel.pendingCalls.get(requestId);
// call==null if:
// 1) the remote peer uses old version that doesn't set the requestId field
// 2) a bug in the code, but in that case we are being defensive
if (call!=null)
call.lastIo = ioOp;
}

/**
* {@link Command} for sending bytes.
*/
private static final class Chunk extends Command {
private final int oid;
private final int ioId;
private final int requestId = Request.getCurrentRequestId();
private final byte[] buf;

public Chunk(int ioId, int oid, byte[] buf, int start, int len) {
@@ -181,7 +210,7 @@ public Chunk(int ioId, int oid, byte[] buf, int start, int len) {

protected void execute(final Channel channel) {
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(ioId,new Runnable() {
markForIoSync(channel,requestId,channel.pipeWriter.submit(ioId,new Runnable() {
public void run() {
try {
os.write(buf);
@@ -210,7 +239,7 @@ public void run() {
}
}
}
});
}));
}

public String toString() {
@@ -225,6 +254,7 @@ public String toString() {
*/
private static final class Flush extends Command {
private final int oid;
private final int requestId = Request.getCurrentRequestId();
private final int ioId;

public Flush(int ioId, int oid) {
@@ -235,15 +265,15 @@ public Flush(int ioId, int oid) {

protected void execute(Channel channel) {
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(ioId,new Runnable() {
markForIoSync(channel,requestId,channel.pipeWriter.submit(ioId,new Runnable() {
public void run() {
try {
os.flush();
} catch (IOException e) {
// ignore errors
}
}
});
}));
}

public String toString() {
@@ -288,6 +318,7 @@ public String toString() {
*/
private static final class EOF extends Command {
private final int oid;
private final int requestId = Request.getCurrentRequestId();
private final int ioId;

public EOF(int ioId, int oid) {
@@ -298,7 +329,7 @@ public EOF(int ioId, int oid) {

protected void execute(final Channel channel) {
final OutputStream os = (OutputStream) channel.getExportedObject(oid);
channel.pipeWriter.submit(ioId,new Runnable() {
markForIoSync(channel,requestId,channel.pipeWriter.submit(ioId,new Runnable() {
public void run() {
channel.unexport(oid);
try {
@@ -307,7 +338,7 @@ public void run() {
// ignore errors
}
}
});
}));
}

public String toString() {
@@ -87,6 +87,14 @@
*/
/*package*/ volatile transient int responseIoId;

/**
*
* @deprecated as of 2.16
* {@link PipeWriter} does this job better, but kept for backward compatibility to communicate
* with earlier version of remoting without losing the original fix to JENKINS-9189 completely.
*/
/*package*/ volatile transient Future<?> lastIo;

protected Request() {
synchronized(Request.class) {
id = nextId++;
@@ -144,6 +152,13 @@ public final RSP call(Channel channel) throws EXC, InterruptedException, IOExcep
t.setName(name);
}

if (lastIo != null)
try {
lastIo.get();
} catch (ExecutionException e) {
// ignore the I/O error
}

try {
channel.pipeWriter.get(responseIoId).get();
} catch (ExecutionException e) {
@@ -302,6 +317,7 @@ private int calcLastIoId() {
public void run() {
try {
Command rsp;
CURRENT.set(Request.this);
startIoId = channel.lastIoId();
try {
// make sure any I/O preceding this has completed
@@ -313,6 +329,8 @@ public void run() {
} catch (Throwable t) {
// error return
rsp = new Response<RSP,Throwable>(id,calcLastIoId(),t);
} finally {
CURRENT.set(null);
}
if(chainCause)
rsp.createdAt.initCause(createdAt);
@@ -347,6 +365,20 @@ public void run() {
*/
public static boolean chainCause = Boolean.getBoolean(Request.class.getName()+".chainCause");

/**
* Set to the {@link Request} object during {@linkplain #perform(Channel) the execution of the call}.
*
* @deprecated as of 2.16
* {@link PipeWriter} does this job better, but kept for backward compatibility to communicate
* with earlier version of remoting without losing the original fix to JENKINS-9189 completely.
*/
/*package*/ static ThreadLocal<Request> CURRENT = new ThreadLocal<Request>();

/*package*/ static int getCurrentRequestId() {
Request r = CURRENT.get();
return r!=null ? r.id : 0;
}

/**
* Interrupts the execution of the remote computation.
*/

0 comments on commit 8ffed0d

Please sign in to comment.
You can’t perform that action at this time.