Skip to content
Permalink
Browse files

[JENKINS-47965, JENKINS-38696] - IOHub should not wait infinitely for…

… Selector in Windows. (#221)

* [JENKINS-47965, JENKINS-38696] - IOHub should not wait infinitely for Selector in Windows.

Windows implementation of NIO Channel Selector does not request "Select now" in https://github.com/jenkinsci/remoting/blob/6165d6fb6a71a7f4fce52ce5fc4cac479052ce04/src/main/java/org/jenkinsci/remoting/protocol/IOHub.java#L436 and hence calls selector#select()... and this method has no timeout.
When the code gets into this branch, Selector will be waiting infinitely without calling selector#isOpen(). Such implementation relies on the Selector implementation, which shout interrupt the select() wait if the selector is being closed. But apparently it does not no my machine (Win 10, amd64, Oracle JDK 8u131)

This change adds wait timeout and makes the IOHub implementation to loop in the logic.

* [JENKINS-47965] - Lame implementation of IOHub Selector wakeup thread for Windows

* [JENKINS-47965] - Polish the implementation

* [JENKINS-38696] - Add Windows back to CI and make @jtnord happy

* [JENKINS-47965] - Simplify the logic

* [JENKINS-47965] - Replace Thread.sleep() by object wait()/notify()

* [JENKINS-47965] - Cleanup the locking logic

* [JENKINS-47965] - Address comments from @jtnord, run Selector Watcher in Unix systems as well
  • Loading branch information...
oleg-nenashev committed Dec 22, 2017
1 parent 3f07563 commit a916c6474de134874818c9847253fb163d360dbc
Showing with 73 additions and 6 deletions.
  1. +1 −2 Jenkinsfile
  2. +72 −4 src/main/java/org/jenkinsci/remoting/protocol/IOHub.java
@@ -8,8 +8,7 @@ properties([[$class: 'BuildDiscarderProperty',
/* These platforms correspond to labels in ci.jenkins.io, see:
* https://github.com/jenkins-infra/documentation/blob/master/ci.adoc
*/
//TODO: Enable Windows once JENKINS-38696 is fixed. No sense to spend CPU cycles before that
List platforms = ['linux']
List platforms = ['linux', 'windows']
Map branches = [:]

for (int i = 0; i < platforms.size(); ++i) {
@@ -55,6 +55,7 @@
import javax.annotation.Nonnull;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import javax.annotation.concurrent.GuardedBy;

import org.jenkinsci.remoting.util.ByteBufferPool;
import org.jenkinsci.remoting.util.DirectByteBufferPool;
import org.kohsuke.accmod.Restricted;
@@ -71,6 +72,13 @@
* Our logger.
*/
private static final Logger LOGGER = Logger.getLogger(IOHub.class.getName());

/**
* Defines the Selector wakeup timeout via a system property. Defaults to {@code 1000ms}.
* @since TODO
*/
private static final long SELECTOR_WAKEUP_TIMEOUT_MS = Long.getLong(IOHub.class.getName() + ".selectorWakeupTimeout", 1000);

/**
* The next ID to use.
*/
@@ -83,6 +91,9 @@
* Our selector.
*/
private final Selector selector;
private volatile boolean ioHubRunning = false;
private final Object selectorLockObject = new Object();

/**
* Our executor.
*/
@@ -126,6 +137,7 @@
*/
private IOHub(Executor executor) throws IOException {
this.selector = Selector.open();
this.ioHubRunning = true;
this.executor = executor;
this.bufferPool = new DirectByteBufferPool(16916, Runtime.getRuntime().availableProcessors() * 4);
}
@@ -140,6 +152,8 @@ private IOHub(Executor executor) throws IOException {
public static IOHub create(Executor executor) throws IOException {
IOHub result = new IOHub(executor);
executor.execute(result);
LOGGER.log(Level.FINE, "Staring an additional Selector wakeup thread. See JENKINS-47965 for more info");
executor.execute(new IOHubSelectorWatcher(result));
return result;
}

@@ -408,6 +422,10 @@ public final void unregister(SelectableChannel channel) {
selectionKey.attach(null);
}

private String getThreadNameBase(String executorThreadName) {
return "IOHub#" + _id + ": Selector[keys:" + selector.keys().size() + ", gen:" + gen + "] / " + executorThreadName;
}

/**
* {@inheritDoc}
*/
@@ -419,9 +437,7 @@ public final void run() {
long cpuOverheatProtection = System.nanoTime();
try {
while (isOpen()) {
selectorThread.setName(
"IOHub#" + _id + ": Selector[keys:" + selector.keys().size() + ", gen:" + gen + "] / " + oldName
);
selectorThread.setName(getThreadNameBase(oldName));
try {
processScheduledTasks();
boolean wantSelectNow = processRegistrations();
@@ -433,8 +449,12 @@ public final void run() {
// in an immediately ready selection key, hence we use the non-blocking form
selected = selector.selectNow();
} else {
selected = selector.select(); // WINDOWS!!! Waiting the whole timeout anyway if timeout specified?!?
// On Windows the select(timeout) operation ALWAYS waits for the timeout,
// so we workaround it by IOHubSelectorWatcher
// "Ubuntu on Windows also qualifies as Windows, so we just rely on the wakeup thread ad use infinite timeout"
selected = selector.select();
}

if (selected == 0) {
// don't stress the GC by creating instantiating the selected keys
continue;
@@ -489,6 +509,54 @@ public final void run() {
// ignore, happens routinely
} finally {
selectorThread.setName(oldName);
ioHubRunning = false;
synchronized (selectorLockObject) {
selectorLockObject.notifyAll();
}
}
}

/**
* This is an artificial thread, which monitors IOHub Selector and wakes it up if it waits for more than 1 second.
* It is a workaround for Selector#select(long timeout) on Windows, where the call always waits for the entire timeout before returning back.
* Since the same behavior happens on Unix emulation layer in Windows, we run this thread on Unix platforms as well.
*/
private static class IOHubSelectorWatcher implements Runnable {

private final IOHub iohub;

public IOHubSelectorWatcher(IOHub iohub) {
this.iohub = iohub;
}

@Override
public void run() {
final Thread watcherThread = Thread.currentThread();
final String oldName = watcherThread.getName();
final String watcherName = "Windows IOHub Watcher for " + iohub.getThreadNameBase(oldName);
LOGGER.log(Level.FINEST, "{0}: Started", watcherName);
try {
watcherThread.setName(watcherName);
while (true) {
synchronized (iohub.selectorLockObject) {
if (iohub.ioHubRunning) {
iohub.selectorLockObject.wait(SELECTOR_WAKEUP_TIMEOUT_MS);
} else {
break;
}
}
iohub.selector.wakeup();
}
} catch (InterruptedException ex) {
// interrupted
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "{0}: Interrupted", ex);
}
return;
} finally {
watcherThread.setName(oldName);
LOGGER.log(Level.FINEST, "{0}: Finished", watcherName);
}
}
}

0 comments on commit a916c64

Please sign in to comment.
You can’t perform that action at this time.