Skip to content
Permalink
Browse files

[JENKINS-8856]

Added additional diagnostic code that will try to read ahead and provide
what's ahead in the input stream.
  • Loading branch information
kohsuke committed Jun 21, 2013
1 parent 2c47b1c commit 68be3b1741d8e9e2da8e31491b1b558a8cdbd33b
@@ -140,6 +140,12 @@ THE SOFTWARE.
<artifactId>constant-pool-scanner</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
@@ -2,12 +2,15 @@

import hudson.remoting.Channel.Mode;

import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.StreamCorruptedException;

/**
* The default {@link CommandTransport} that has been used historically.
@@ -23,12 +26,21 @@
private final ObjectInputStream ois;
private final ObjectOutputStream oos;
private final Capability remoteCapability;
private final OutputStream underlyingStream;

private ClassicCommandTransport(ObjectInputStream ois, ObjectOutputStream oos, OutputStream underlyingStream, Capability remoteCapability) {
/**
* Transport level {@link InputStream} that we use only for diagnostics in case we detect stream
* corruption. Can be null.
*/
private final @Nullable InputStream rawIn;
/**
* See {@link CommandTransport#getUnderlyingStream()}
*/
private final OutputStream rawOut;

private ClassicCommandTransport(ObjectInputStream ois, ObjectOutputStream oos, InputStream rawIn, OutputStream rawOut, Capability remoteCapability) {
this.ois = ois;
this.oos = oos;
this.underlyingStream = underlyingStream;
this.rawIn= rawIn;
this.rawOut = rawOut;
this.remoteCapability = remoteCapability;
}

@@ -56,7 +68,54 @@ public void closeWrite() throws IOException {
}

public final Command read() throws IOException, ClassNotFoundException {
return Command.readFrom(channel,ois);
try {
return Command.readFrom(channel,ois);
} catch (StreamCorruptedException e) {
throw diagnoseStreamCorruption(e);
}
}

/**
* To diagnose stream corruption, we'll try to read ahead the data.
* This operation can block, so we'll use another thread to do this.
*/
private StreamCorruptedException diagnoseStreamCorruption(StreamCorruptedException e) throws StreamCorruptedException {
if (rawIn==null)
return e; // no source of diagnostics information. can't diagnose.


final ByteArrayOutputStream readAhead = new ByteArrayOutputStream();
final IOException[] error = new IOException[1];

Thread diagnosisThread = new Thread(channel+" stream corruption diagnosis thread") {
public void run() {
int b;
try {
// not all InputStream will look for the thread interrupt flag, so check that explicitly to be defensive
while (!Thread.interrupted() && (b=rawIn.read())!=-1) {
readAhead.write(b);
}
} catch (IOException e) {
error[0] = e;
}
}
};

// wait up to 1 sec to grab as much data as possible
diagnosisThread.start();
try {
diagnosisThread.join(1000);
} catch (InterruptedException _) {
// we are only waiting for a fixed amount of time, so we'll pretend like we were in a busy loop
Thread.currentThread().interrupt();
// fall through
}

IOException diagnosisProblem = error[0]; // capture the error, if any, before we kill the thread
if (diagnosisThread.isAlive())
diagnosisThread.interrupt(); // if it's not dead, kill

return new DiagnosedStreamCorruptionException(e,diagnosisProblem,readAhead.toByteArray());
}

public void closeRead() throws IOException {
@@ -65,7 +124,7 @@ public void closeRead() throws IOException {

@Override
OutputStream getUnderlyingStream() {
return underlyingStream;
return rawOut;
}

public static CommandTransport create(Mode mode, InputStream is, OutputStream os, OutputStream header, ClassLoader base, Capability capability) throws IOException {
@@ -120,7 +179,7 @@ public static CommandTransport create(Mode mode, InputStream is, OutputStream os

return new ClassicCommandTransport(
new ObjectInputStreamEx(mode.wrap(is),base),
oos, os, cap);
oos, is, os, cap);
case 2:
cap = Capability.read(is);
break;
@@ -0,0 +1,38 @@
package hudson.remoting;

import java.io.PrintWriter;
import java.io.StreamCorruptedException;
import java.io.StringWriter;

/**
* Signals a {@link StreamCorruptedException} with some additional diagnostic information.
*
* @author Kohsuke Kawaguchi
*/
public class DiagnosedStreamCorruptionException extends StreamCorruptedException {
private final Exception diagnoseFailure;
private final byte[] readAhead;

public DiagnosedStreamCorruptionException(StreamCorruptedException cause, Exception diagnoseFailure, byte[] readAhead) {
initCause(cause);
this.diagnoseFailure = diagnoseFailure;
this.readAhead = readAhead;
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(super.toString()).append("\n");
buf.append("Read ahead: "+HexDump.toHex(readAhead));
if (diagnoseFailure!=null) {
StringWriter w = new StringWriter();
PrintWriter p = new PrintWriter(w);
diagnoseFailure.printStackTrace(p);
p.flush();

buf.append("Diagnosis problem:\n ");
buf.append(w.toString().trim().replace("\n","\n "));
}
return buf.toString();
}
}
@@ -0,0 +1,43 @@
package hudson.remoting;

import hudson.remoting.Channel.Mode;
import org.apache.commons.io.output.NullOutputStream;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.PrintWriter;
import java.io.StreamCorruptedException;
import java.io.StringWriter;

import static junit.framework.Assert.*;

/**
* @author Kohsuke Kawaguchi
*/
public class DiagnosedStreamCorruptionExceptionTest {
@Test
public void exercise() throws Exception {
byte[] payload = {
0,0,0,0, /* binary stream preamble*/
(byte)0xAC, (byte)0xED, 0x00, 0x05, /* object input stream header */
1, 2, 3, 4, 5 /* bogus data */
};

ClassicCommandTransport ct = (ClassicCommandTransport) ClassicCommandTransport.create(Mode.BINARY, new ByteArrayInputStream(payload), new NullOutputStream(), new NullOutputStream(), getClass().getClassLoader(), new Capability());

try {
ct.read();
fail();
} catch (DiagnosedStreamCorruptionException e) {
StringWriter s = new StringWriter();
PrintWriter w = new PrintWriter(s);
e.printStackTrace(w);
w.close();

String msg = s.toString();
assertTrue(msg.contains("Read ahead: 02030405"));
assertTrue(msg.contains("invalid type code: 01"));
assertSame(StreamCorruptedException.class, e.getCause().getClass());
}
}
}

0 comments on commit 68be3b1

Please sign in to comment.