Skip to content

Commit

Permalink
Implement available() method for Windows subprocesses.
Browse files Browse the repository at this point in the history
RELNOTES: None
PiperOrigin-RevId: 351574909
  • Loading branch information
larsrc-google authored and Copybara-Service committed Jan 13, 2021
1 parent 5b04895 commit 082d987
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 0 deletions.
Expand Up @@ -76,6 +76,12 @@ public static long createProcess(
/** Returns an opaque identifier of stderr stream for the process. */
public static native long getStderr(long process);

/**
* Returns an estimate of the number of bytes available to read on the stream. Unlike {@link
* InputStream#available()}, this returns 0 on closed or broken streams.
*/
public static native int streamBytesAvailable(long stream);

/**
* Reads data from the stream into the given array. {@code stream} should come from {@link
* #getStdout(long)} or {@link #getStderr(long)}.
Expand Down
Expand Up @@ -69,6 +69,20 @@ private static final class ProcessInputStream extends InputStream {
this.nativeStream = nativeStream;
}

@Override
public int available() throws IOException {
if (nativeStream == WindowsProcesses.INVALID) {
throw new IllegalStateException("Stream already closed");
}

int result = WindowsProcesses.streamBytesAvailable(nativeStream);
if (result == -1) {
throw new IOException(WindowsProcesses.streamGetLastError(nativeStream));
}

return result;
}

@Override
public int read() throws IOException {
byte[] buf = new byte[1];
Expand Down
58 changes: 58 additions & 0 deletions src/main/native/windows/processes-jni.cc
Expand Up @@ -97,6 +97,33 @@ class NativeOutputStream {

void SetHandle(HANDLE handle) { handle_ = handle; }

jint StreamBytesAvailable(JNIEnv* env) {
if (closed_.load() || handle_ == INVALID_HANDLE_VALUE) {
error_ = L"";
return 0;
}

DWORD avail = 0;
if (!::PeekNamedPipe(handle_, NULL, 0, NULL, &avail, NULL)) {
// Check if either the other end closed the pipe or we did it with
// NativeOutputStream.Close() . In the latter case, we'll get a "system
// call interrupted" error.
if (GetLastError() == ERROR_BROKEN_PIPE || closed_.load()) {
error_ = L"";
return 0;
} else {
DWORD err_code = GetLastError();
error_ = bazel::windows::MakeErrorMessage(WSTR(__FILE__), __LINE__,
L"nativeStreamBytesAvailable",
L"", err_code);
return -1;
}
} else {
error_ = L"";
}
return avail;
}

jint ReadStream(JNIEnv* env, jbyteArray java_bytes, jint offset,
jint length) {
JavaByteArray bytes(env, java_bytes);
Expand Down Expand Up @@ -210,6 +237,7 @@ class NativeProcess {
}
}

// Set up childs stdin pipe.
{
HANDLE pipe_read_h, pipe_write_h;
if (!CreatePipe(&pipe_read_h, &pipe_write_h, &sa, 0)) {
Expand All @@ -220,6 +248,14 @@ class NativeProcess {
}
stdin_process = pipe_read_h;
stdin_ = pipe_write_h;

// "Our" end of the pipe must not be inherited by the child process
if (!SetHandleInformation(pipe_write_h, HANDLE_FLAG_INHERIT, 0)) {
DWORD err_code = GetLastError();
error_ = bazel::windows::MakeErrorMessage(
WSTR(__FILE__), __LINE__, L"nativeCreateProcess", wpath, err_code);
return false;
}
}

if (!stdout_is_stream) {
Expand Down Expand Up @@ -260,6 +296,13 @@ class NativeProcess {
}
stdout_.SetHandle(pipe_read_h);
stdout_process = pipe_write_h;
// "Our" end of the pipe must not be inherited by the child process
if (!SetHandleInformation(pipe_read_h, HANDLE_FLAG_INHERIT, 0)) {
DWORD err_code = GetLastError();
error_ = bazel::windows::MakeErrorMessage(
WSTR(__FILE__), __LINE__, L"nativeCreateProcess", wpath, err_code);
return false;
}
}

if (stderr_same_handle_as_stdout) {
Expand Down Expand Up @@ -314,6 +357,13 @@ class NativeProcess {
}
stderr_.SetHandle(pipe_read_h);
stderr_process = pipe_write_h;
// "Our" end of the pipe must not be inherited by the child process
if (!SetHandleInformation(pipe_read_h, HANDLE_FLAG_INHERIT, 0)) {
DWORD err_code = GetLastError();
error_ = bazel::windows::MakeErrorMessage(
WSTR(__FILE__), __LINE__, L"nativeCreateProcess", wpath, err_code);
return false;
}
}
return proc_.Create(
wpath, bazel::windows::GetJavaWstring(env, java_argv_rest),
Expand Down Expand Up @@ -443,6 +493,14 @@ Java_com_google_devtools_build_lib_windows_WindowsProcesses_readStream(
return stream->ReadStream(env, java_bytes, offset, length);
}

extern "C" JNIEXPORT jint JNICALL
Java_com_google_devtools_build_lib_windows_WindowsProcesses_streamBytesAvailable(
JNIEnv* env, jclass clazz, jlong stream_long) {
NativeOutputStream* stream =
reinterpret_cast<NativeOutputStream*>(stream_long);
return stream->StreamBytesAvailable(env);
}

extern "C" JNIEXPORT jint JNICALL
Java_com_google_devtools_build_lib_windows_WindowsProcesses_getExitCode(
JNIEnv* env, jclass clazz, jlong process_long) {
Expand Down
Expand Up @@ -198,6 +198,44 @@ public void testPartialRead() throws Exception {
assertThat(new String(two, UTF_8)).isEqualTo("LLO");
}

@Test
public void testAvailable_givesBytesFromLiveProcess() throws Exception {
process =
WindowsProcesses.createProcess(mockBinary, mockArgs("O-HELLOWRLD"), null, null, null, null);
byte[] one = new byte[2];
byte[] two = new byte[3];

long stdout = WindowsProcesses.getStdout(process);
// Need to wait until the process has posted its data before we can check available()
assertThat(readStdout(one, 0, 2)).isEqualTo(2);
assertNoStreamError(stdout);
assertThat(WindowsProcesses.streamBytesAvailable(stdout)).isEqualTo(7);
assertNoStreamError(stdout);

assertThat(readStdout(two, 0, 3)).isEqualTo(3);
assertNoStreamError(stdout);
assertThat(WindowsProcesses.streamBytesAvailable(stdout)).isEqualTo(4);
assertNoStreamError(stdout);

WindowsProcesses.closeStream(stdout);
assertThat(WindowsProcesses.streamBytesAvailable(stdout)).isEqualTo(0);
assertThat(WindowsProcesses.streamGetLastError(stdout)).isEmpty();

assertThat(new String(one, UTF_8)).isEqualTo("HE");
assertThat(new String(two, UTF_8)).isEqualTo("LLO");
}

@Test
public void testAvailable_doesNotFailOnDeadProcess() throws Exception {
process = WindowsProcesses.createProcess(mockBinary, mockArgs("X42"), null, null, null, null);
long stdout = WindowsProcesses.getStdout(process);
assertThat(WindowsProcesses.waitFor(process, -1)).isEqualTo(0);
assertThat(WindowsProcesses.getExitCode(process)).isEqualTo(42);
// Windows allows streams to be read after the process has died.
assertThat(WindowsProcesses.streamBytesAvailable(stdout)).isAtLeast(0);
assertThat(WindowsProcesses.streamGetLastError(stdout)).isEmpty();
}

@Test
public void testArrayOutOfBounds() throws Exception {
process =
Expand Down
Expand Up @@ -16,6 +16,7 @@

import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -26,6 +27,7 @@
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.runfiles.Runfiles;
import java.io.File;
import java.io.InputStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -125,6 +127,24 @@ public void testSystemDriveIsSet() throws Exception {
assertThat(new String(buf, UTF_8).trim()).isEqualTo("X:");
}

@Test
public void testStreamAvailable_zeroAfterClose() throws Exception {
SubprocessBuilder subprocessBuilder = new SubprocessBuilder(WindowsSubprocessFactory.INSTANCE);
subprocessBuilder.setWorkingDirectory(new File("."));
subprocessBuilder.setArgv(ImmutableList.of(mockBinary, "-jar", mockSubprocess, "OHELLO"));
process = subprocessBuilder.start();
InputStream inputStream = process.getInputStream();
// We don't know if the process has already written to the pipe
assertThat(inputStream.available()).isAnyOf(0, 5);
process.waitFor();
// Windows allows streams to be read after the process has died.
assertThat(inputStream.available()).isAnyOf(0, 5);
inputStream.close();
assertThrows(IllegalStateException.class, inputStream::available)
.getMessage()
.contains("Stream already closed");
}

/**
* An argument and its command-line-escaped counterpart.
*
Expand Down

0 comments on commit 082d987

Please sign in to comment.