Skip to content

Commit

Permalink
Non-sticky thread groups in DefaultThreadFactory
Browse files Browse the repository at this point in the history
Motivation:

A recent change to DefaultThreadFactory modified it so that it is sticky
with respect to thread groups. In particular, this change made it so
that DefaultThreadFactory would hold on to the thread group that created
it, and then use that thread group to create threads.

This can have problematic semantics since it can lead to violations of a
tenet of thread groups that a thread can only modify threads in its own
thread group and descendant thread groups. With a sticky thread group, a
thread triggering the creation of a new thread via
DefaultThreadFactory#newThread will be modifying a thread from the
sticky thread group which will not necessarily be its own nor a
descendant thread group. When a security manager is in place that
enforces this requirement, these modifications are now impossible. This
is especially problematic in the context of Netty because certain global
singletons like GlobalEventExecutor will create a
DefaultThreadFactory. If all DefaultThreadFactory instances are sticky
about their thread groups, it means that submitting tasks to the
GlobalEventExecutor singleton can cause a thread to be created from the
DefaultThreadFactory sticky thread group, exactly the problem with
DefaultThreadFactory being sticky about thread groups. A similar problem
arises from the ThreadDeathWatcher.

Modifications:

This commit modifies DefaultThreadFactory so that a null thread group
can be set with the behavior that all threads created by such an
instance will inherit the default thread group (the thread group
provided by the security manager if there is one, otherwise the thread
group of the creating thread). The construction of the instances of
DefaultThreadFactory used by the GlobalEventExecutor singleton and
ThreadDeathWatcher are modified to use this behavior. Additionally, we
also modify the chained constructor invocations of the
DefaultThreadFactory that do not have a parameter to specify a thread
group to use the thread group from the security manager is available,
otherwise the creating thread's thread group. We also add unit tests
ensuring that all of this behavior is maintained.

Result:

It will be possible to have DefaultThreadFactory instances that are not
sticky about the thread group that led to their creation. Instead,
threads created by such a DefaultThreadFactory will inherit the default
thread group which will either be the thread group from the security
manager or the current thread's thread group.
  • Loading branch information
jasontedor authored and normanmaurer committed Jul 14, 2016
1 parent c221d32 commit 27520f5
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 12 deletions.
8 changes: 6 additions & 2 deletions common/src/main/java/io/netty/util/ThreadDeathWatcher.java
Expand Up @@ -41,7 +41,8 @@
public final class ThreadDeathWatcher {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
private static final ThreadFactory threadFactory;
// visible for testing
static final ThreadFactory threadFactory;

private static final Queue<Entry> pendingEntries = PlatformDependent.newMpscQueue();
private static final Watcher watcher = new Watcher();
Expand All @@ -54,7 +55,10 @@ public final class ThreadDeathWatcher {
if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
poolName = serviceThreadPrefix + poolName;
}
threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY);
// because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
// this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
// must not be sticky about its thread group
threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
}

/**
Expand Down
Expand Up @@ -16,7 +16,6 @@

package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;

import java.util.Locale;
Expand Down Expand Up @@ -64,7 +63,7 @@ public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}

private static String toPoolName(Class<?> poolType) {
public static String toPoolName(Class<?> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
}
Expand Down Expand Up @@ -96,11 +95,12 @@ public DefaultThreadFactory(String poolName, boolean daemon, int priority, Threa
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = ObjectUtil.checkNotNull(threadGroup, "threadGroup");
this.threadGroup = threadGroup;
}

public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, Thread.currentThread().getThreadGroup());
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}

@Override
Expand All @@ -126,7 +126,6 @@ public Thread newThread(Runnable r) {
return t;
}

// TODO: Once we can break the API we should add ThreadGroup to the arguments of this method.
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
Expand Down
Expand Up @@ -49,7 +49,12 @@ public void run() {
}
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);

private final ThreadFactory threadFactory = new DefaultThreadFactory(getClass());
// because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
// can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
// be sticky about its thread group
// visible for testing
final ThreadFactory threadFactory =
new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
private final TaskRunner taskRunner = new TaskRunner();
private final AtomicBoolean started = new AtomicBoolean();
volatile Thread thread;
Expand Down
28 changes: 26 additions & 2 deletions common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java
Expand Up @@ -21,9 +21,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

public class ThreadDeathWatcherTest {

Expand Down Expand Up @@ -113,4 +116,25 @@ public void run() {
// And the task should not run.
assertThat(run.get(), is(false));
}

@Test(timeout = 2000)
public void testThreadGroup() throws InterruptedException {
final ThreadGroup group = new ThreadGroup("group");
final AtomicReference<ThreadGroup> capturedGroup = new AtomicReference<ThreadGroup>();
final Thread thread = new Thread(group, new Runnable() {
@Override
public void run() {
final Thread t = ThreadDeathWatcher.threadFactory.newThread(new Runnable() {
@Override
public void run() {
}
});
capturedGroup.set(t.getThreadGroup());
}
});
thread.start();
thread.join();

assertEquals(group, capturedGroup.get());
}
}
@@ -0,0 +1,266 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.util.concurrent;

import org.junit.Test;

import java.security.Permission;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class DefaultThreadFactoryTest {
@Test(timeout = 2000)
public void testDescendantThreadGroups() throws InterruptedException {
final SecurityManager current = System.getSecurityManager();

try {
// install security manager that only allows parent thread groups to mess with descendant thread groups
System.setSecurityManager(new SecurityManager() {
@Override
public void checkAccess(ThreadGroup g) {
final ThreadGroup source = Thread.currentThread().getThreadGroup();

if (source != null) {
if (!source.parentOf(g)) {
throw new SecurityException("source group is not an ancestor of the target group");
}
super.checkAccess(g);
}
}

// so we can restore the security manager at the end of the test
@Override
public void checkPermission(Permission perm) {
}
});

// holder for the thread factory, plays the role of a global singleton
final AtomicReference<DefaultThreadFactory> factory = new AtomicReference<DefaultThreadFactory>();
final AtomicInteger counter = new AtomicInteger();
final Runnable task = new Runnable() {
@Override
public void run() {
counter.incrementAndGet();
}
};

final AtomicReference<Throwable> interrupted = new AtomicReference<Throwable>();

// create the thread factory, since we are running the thread group brother, the thread
// factory will now forever be tied to that group
// we then create a thread from the factory to run a "task" for us
final Thread first = new Thread(new ThreadGroup("brother"), new Runnable() {
@Override
public void run() {
factory.set(new DefaultThreadFactory("test", false, Thread.NORM_PRIORITY, null));
final Thread t = factory.get().newThread(task);
t.start();
try {
t.join();
} catch (InterruptedException e) {
interrupted.set(e);
Thread.currentThread().interrupt();
}
}
});
first.start();
first.join();

assertNull(interrupted.get());

// now we will use factory again, this time from a sibling thread group sister
// if DefaultThreadFactory is "sticky" about thread groups, a security manager
// that forbids sibling thread groups from messing with each other will strike this down
final Thread second = new Thread(new ThreadGroup("sister"), new Runnable() {
@Override
public void run() {
final Thread t = factory.get().newThread(task);
t.start();
try {
t.join();
} catch (InterruptedException e) {
interrupted.set(e);
Thread.currentThread().interrupt();
}
}
});
second.start();
second.join();

assertNull(interrupted.get());

assertEquals(2, counter.get());
} finally {
System.setSecurityManager(current);
}
}

// test that when DefaultThreadFactory is constructed with a sticky thread group, threads
// created by it have the sticky thread group
@Test(timeout = 2000)
public void testDefaultThreadFactoryStickyThreadGroupConstructor() throws InterruptedException {
final ThreadGroup sticky = new ThreadGroup("sticky");
runStickyThreadGroupTest(
new Callable<DefaultThreadFactory>() {
@Override
public DefaultThreadFactory call() throws Exception {
return new DefaultThreadFactory("test", false, Thread.NORM_PRIORITY, sticky);
}
},
sticky);
}

// test that when DefaultThreadFactory is constructed it is sticky to the thread group from the thread group of the
// thread that created it
@Test(timeout = 2000)
public void testDefaulThreadFactoryInheritsThreadGroup() throws InterruptedException {
final ThreadGroup sticky = new ThreadGroup("sticky");

runStickyThreadGroupTest(
new Callable<DefaultThreadFactory>() {
@Override
public DefaultThreadFactory call() throws Exception {
final AtomicReference<DefaultThreadFactory> factory =
new AtomicReference<DefaultThreadFactory>();
final Thread thread = new Thread(sticky, new Runnable() {
@Override
public void run() {
factory.set(new DefaultThreadFactory("test"));
}
});
thread.start();
thread.join();
return factory.get();
}
},
sticky);
}

// test that when a security manager is installed that provides a ThreadGroup, DefaultThreadFactory inherits from
// the security manager
@Test(timeout = 2000)
public void testDefaultThreadFactoryInheritsThreadGroupFromSecurityManager() throws InterruptedException {
final SecurityManager current = System.getSecurityManager();

try {
final ThreadGroup sticky = new ThreadGroup("sticky");
System.setSecurityManager(new SecurityManager() {
@Override
public ThreadGroup getThreadGroup() {
return sticky;
}

// so we can restore the security manager at the end of the test
@Override
public void checkPermission(Permission perm) {
}
});
runStickyThreadGroupTest(
new Callable<DefaultThreadFactory>() {
@Override
public DefaultThreadFactory call() throws Exception {
return new DefaultThreadFactory("test");
}
},
sticky);
} finally {
System.setSecurityManager(current);
}
}

private void runStickyThreadGroupTest(
final Callable<DefaultThreadFactory> callable,
final ThreadGroup expected) throws InterruptedException {
final AtomicReference<ThreadGroup> captured = new AtomicReference<ThreadGroup>();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();

final Thread first = new Thread(new ThreadGroup("wrong"), new Runnable() {
@Override
public void run() {
final DefaultThreadFactory factory;
try {
factory = callable.call();
} catch (Exception e) {
exception.set(e);
throw new RuntimeException(e);
}
final Thread t = factory.newThread(new Runnable() {
@Override
public void run() {
}
});
captured.set(t.getThreadGroup());
}
});
first.start();
first.join();

assertNull(exception.get());

assertEquals(expected, captured.get());
}

// test that when DefaultThreadFactory is constructed without a sticky thread group, threads
// created by it inherit the correct thread group
@Test(timeout = 2000)
public void testDefaultThreadFactoryNonStickyThreadGroupConstructor() throws InterruptedException {

final AtomicReference<DefaultThreadFactory> factory = new AtomicReference<DefaultThreadFactory>();
final AtomicReference<ThreadGroup> firstCaptured = new AtomicReference<ThreadGroup>();

final ThreadGroup firstGroup = new ThreadGroup("first");
final Thread first = new Thread(firstGroup, new Runnable() {
@Override
public void run() {
factory.set(new DefaultThreadFactory("sticky", false, Thread.NORM_PRIORITY, null));
final Thread t = factory.get().newThread(new Runnable() {
@Override
public void run() {
}
});
firstCaptured.set(t.getThreadGroup());
}
});
first.start();
first.join();

assertEquals(firstGroup, firstCaptured.get());

final AtomicReference<ThreadGroup> secondCaptured = new AtomicReference<ThreadGroup>();

final ThreadGroup secondGroup = new ThreadGroup("second");
final Thread second = new Thread(secondGroup, new Runnable() {
@Override
public void run() {
final Thread t = factory.get().newThread(new Runnable() {
@Override
public void run() {
}
});
secondCaptured.set(t.getThreadGroup());
}
});
second.start();
second.join();

assertEquals(secondGroup, secondCaptured.get());
}
}

0 comments on commit 27520f5

Please sign in to comment.