Skip to content

Commit

Permalink
JBTHR-14: Executor shutdown hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
dmlloyd committed Aug 6, 2010
1 parent 065bf97 commit 4da2261
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 4 deletions.
38 changes: 38 additions & 0 deletions src/main/java/org/jboss/threads/EventListener.java
@@ -0,0 +1,38 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2010, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.jboss.threads;

/**
* An event listener which handles a single event and accepts an attachment of some type.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
public interface EventListener<A> extends java.util.EventListener {

/**
* Handle the event.
*
* @param attachment the attachment
*/
void handleEvent(A attachment);
}
Expand Up @@ -30,8 +30,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.threads.management.ThreadPoolExecutorMBean;

public final class JBossScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor implements ThreadPoolExecutorMBean {
public final class JBossScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor implements ThreadPoolExecutorMBean, ShutdownListenable {

private final SimpleShutdownListenable shutdownListenable = new SimpleShutdownListenable();
private final AtomicInteger rejectCount = new AtomicInteger();

public JBossScheduledThreadPoolExecutor(int corePoolSize) {
Expand Down Expand Up @@ -96,6 +97,15 @@ public void setRejectedExecutionHandler(final RejectedExecutionHandler handler)
super.setRejectedExecutionHandler(new CountingRejectHandler(handler));
}

/** {@inheritDoc} */
public <A> void addShutdownListener(final EventListener<A> shutdownListener, final A attachment) {
shutdownListenable.addShutdownListener(shutdownListener, attachment);
}

protected void terminated() {
shutdownListenable.shutdown();
}

private final class CountingRejectHandler implements RejectedExecutionHandler {
private final RejectedExecutionHandler delegate;

Expand Down
12 changes: 11 additions & 1 deletion src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
Expand Up @@ -34,8 +34,9 @@
/**
*
*/
public final class JBossThreadPoolExecutor extends ThreadPoolExecutor implements BlockingExecutor, BoundedQueueThreadPoolExecutorMBean {
public final class JBossThreadPoolExecutor extends ThreadPoolExecutor implements BlockingExecutor, BoundedQueueThreadPoolExecutorMBean, ShutdownListenable {

private final SimpleShutdownListenable shutdownListenable = new SimpleShutdownListenable();
private final AtomicInteger rejectCount = new AtomicInteger();

public JBossThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
Expand Down Expand Up @@ -134,6 +135,15 @@ public void setRejectedExecutionHandler(final RejectedExecutionHandler handler)
super.setRejectedExecutionHandler(new CountingRejectHandler(handler));
}

/** {@inheritDoc} */
public <A> void addShutdownListener(final EventListener<A> shutdownListener, final A attachment) {
shutdownListenable.addShutdownListener(shutdownListener, attachment);
}

protected void terminated() {
shutdownListenable.shutdown();
}

private final class CountingRejectHandler implements RejectedExecutionHandler {
private final RejectedExecutionHandler delegate;

Expand Down
13 changes: 12 additions & 1 deletion src/main/java/org/jboss/threads/QueueExecutor.java
Expand Up @@ -42,8 +42,9 @@
/**
* An executor which uses a regular queue to hold tasks. The executor may be tuned at runtime in many ways.
*/
public final class QueueExecutor extends AbstractExecutorService implements ExecutorService, BlockingExecutor, BoundedQueueThreadPoolExecutorMBean {
public final class QueueExecutor extends AbstractExecutorService implements ExecutorService, BlockingExecutor, BoundedQueueThreadPoolExecutorMBean, ShutdownListenable {
private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
private final SimpleShutdownListenable shutdownListenable = new SimpleShutdownListenable();

private final Lock lock = new ReentrantLock();
// signal when a task is written to the queue
Expand Down Expand Up @@ -657,6 +658,11 @@ public void setHandoffExecutor(final Executor handoffExecutor) {
}
}

/** {@inheritDoc} */
public <A> void addShutdownListener(final EventListener<A> shutdownListener, final A attachment) {
shutdownListenable.addShutdownListener(shutdownListener, attachment);
}

// call with lock held!
private void startNewThread(final Runnable task) {
final Thread thread = threadFactory.newThread(new Worker(task));
Expand Down Expand Up @@ -822,12 +828,17 @@ public void run() {
Thread.interrupted();
}
} finally {
boolean last = false;
lock.lock();
try {
workers.remove(Thread.currentThread());
last = stop && workers.isEmpty();
} finally {
lock.unlock();
}
if (last) {
shutdownListenable.shutdown();
}
}
}
}
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/org/jboss/threads/QueuelessExecutor.java
Expand Up @@ -43,9 +43,11 @@
* A queueless thread pool. If one or more threads are waiting for work when a task is submitted, it will be used.
* Otherwise, if fewer than the maximum threads are started, a new thread is created.
*/
public final class QueuelessExecutor extends AbstractExecutorService implements ExecutorService, BlockingExecutor, BoundedThreadPoolExecutorMBean {
public final class QueuelessExecutor extends AbstractExecutorService implements ExecutorService, BlockingExecutor, BoundedThreadPoolExecutorMBean, ShutdownListenable {

private static final Logger log = Logger.getLogger("org.jboss.threads.executor");

private final SimpleShutdownListenable shutdownListenable = new SimpleShutdownListenable();
private final ThreadFactory threadFactory;
private final DirectExecutor taskExecutor;

Expand Down Expand Up @@ -498,6 +500,12 @@ public void executeBlocking(final Runnable task, final long timeout, final TimeU
}

public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
throw new RejectedExecutionException("Not implemented");
}

/** {@inheritDoc} */
public <A> void addShutdownListener(final EventListener<A> shutdownListener, final A attachment) {
shutdownListenable.addShutdownListener(shutdownListener, attachment);
}

private static long clipHigh(long value) {
Expand Down Expand Up @@ -590,14 +598,19 @@ public void run() {
}
}
} finally {
boolean last = false;
lock.lock();
try {
if (stop && runningThreads.remove(thread) && runningThreads.isEmpty()) {
threadDeath.signalAll();
last = true;
}
} finally {
lock.unlock();
}
if (last) {
shutdownListenable.shutdown();
}
}
}
}
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/org/jboss/threads/ShutdownListenable.java
@@ -0,0 +1,40 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2010, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.jboss.threads;

/**
* An object which can have shutdown listeners registered on it.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
public interface ShutdownListenable {

/**
* Add a shutdown listener. If the target object is already shut down, the listener is invoked directly.
*
* @param shutdownListener the listener
* @param attachment the attachment value to pass to the listener
* @param <A> the attachment type
*/
<A> void addShutdownListener(EventListener<A> shutdownListener, A attachment);
}
81 changes: 81 additions & 0 deletions src/main/java/org/jboss/threads/SimpleShutdownListenable.java
@@ -0,0 +1,81 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2010, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.jboss.threads;

import java.util.ArrayList;
import java.util.List;

/**
* A simple shutdown-listenable registry.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
public final class SimpleShutdownListenable implements ShutdownListenable {
private List<Registration<?>> list = new ArrayList<Registration<?>>();

/** {@inheritDoc} */
public <A> void addShutdownListener(final EventListener<A> shutdownListener, final A attachment) {
synchronized (this) {
final Registration<A> reg = new Registration<A>(shutdownListener, attachment);
if (list == null) {
reg.run();
} else {
list.add(reg);
}
}
}

/**
* Run and remove all registered listeners, and mark this object as having been shut down so that
* future listeners are invoked immediately.
*/
public void shutdown() {
synchronized (this) {
final List<Registration<?>> list = this.list;
this.list = null;
if (list != null) {
for (Registration<?> registration : list) {
registration.run();
}
}
}
}

private static final class Registration<A> {
private final EventListener<A> listener;
private final A attachment;

private Registration(final EventListener<A> listener, final A attachment) {
this.listener = listener;
this.attachment = attachment;
}

void run() {
try {
listener.handleEvent(attachment);
} catch (Throwable t) {
// todo log it
}
}
}
}

0 comments on commit 4da2261

Please sign in to comment.