Skip to content

Commit

Permalink
WFLY-3770 Graceful shutdown for EE concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Dec 9, 2014
1 parent 4e16f37 commit c2a06d8
Show file tree
Hide file tree
Showing 14 changed files with 578 additions and 11 deletions.
4 changes: 4 additions & 0 deletions ee/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
<groupId>org.wildfly.core</groupId>
<artifactId>wildfly-server</artifactId>
</dependency>
<dependency>
<groupId>org.wildfly.core</groupId>
<artifactId>wildfly-request-controller</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.invocation</groupId>
<artifactId>jboss-invocation</artifactId>
Expand Down
142 changes: 142 additions & 0 deletions ee/src/main/java/org/jboss/as/ee/concurrent/ControlPointUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2014, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 2110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.jboss.as.ee.concurrent;

import org.wildfly.extension.requestcontroller.ControlPoint;

import javax.enterprise.concurrent.ManagedTask;
import javax.enterprise.concurrent.ManagedTaskListener;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;

/**
* Class that manages the control point for executor services
*
* @author Stuart Douglas
*/
public class ControlPointUtils {

public static Runnable doWrap(Runnable runnable,ControlPoint controlPoint) {
if(controlPoint == null) {
return runnable;
}
try {
controlPoint.forceBeginRequest();
return new ControlledRunnable(runnable, controlPoint);
} catch (Exception e) {
throw new RejectedExecutionException(e);
}
}

public static <T> Callable<T> doWrap(Callable<T> callable, ControlPoint controlPoint) {
if(controlPoint == null) {
return callable;
}
try {
controlPoint.forceBeginRequest();
return new ControlledCallable<>(callable, controlPoint);
} catch (Exception e) {
throw new RejectedExecutionException(e);
}
}

/**
* Runnable that wraps a a runnable to allow server suspend/resume to work correctly.
*
*/
static class ControlledRunnable implements Runnable, ManagedTask {

private final Runnable runnable;
private final ControlPoint controlPoint;

ControlledRunnable(Runnable runnable, ControlPoint controlPoint) {
this.runnable = runnable;
this.controlPoint = controlPoint;
}

@Override
public void run() {
try {
runnable.run();
} finally {
controlPoint.requestComplete();
}
}
@Override
public Map<String, String> getExecutionProperties() {
if(runnable instanceof ManagedTask) {
((ManagedTask) runnable).getExecutionProperties();
}
return null;
}

@Override
public ManagedTaskListener getManagedTaskListener() {
if(runnable instanceof ManagedTask) {
((ManagedTask) runnable).getManagedTaskListener();
}
return null;
}
}

/**
* Runnable that wraps a a runnable to allow server suspend/resume to work correctly.
*
*/
static class ControlledCallable<T> implements Callable<T>, ManagedTask {

private final Callable<T> callable;
private final ControlPoint controlPoint;

ControlledCallable(Callable<T> callable, ControlPoint controlPoint) {
this.callable = callable;
this.controlPoint = controlPoint;
}

@Override
public T call() throws Exception {
try {
return callable.call();
} finally {
controlPoint.requestComplete();
}
}

@Override
public Map<String, String> getExecutionProperties() {
if(callable instanceof ManagedTask) {
((ManagedTask) callable).getExecutionProperties();
}
return null;
}

@Override
public ManagedTaskListener getManagedTaskListener() {
if(callable instanceof ManagedTask) {
((ManagedTask) callable).getManagedTaskListener();
}
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2014, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 2110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.jboss.as.ee.concurrent;

import org.glassfish.enterprise.concurrent.ContextServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl;
import org.wildfly.extension.requestcontroller.ControlPoint;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.jboss.as.ee.concurrent.ControlPointUtils.doWrap;

/**
* @author Stuart Douglas
*/
public class ManagedExecutorServiceImpl extends org.glassfish.enterprise.concurrent.ManagedExecutorServiceImpl {

private final ControlPoint controlPoint;

public ManagedExecutorServiceImpl(String name, ManagedThreadFactoryImpl managedThreadFactory, long hungTaskThreshold, boolean longRunningTasks, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long threadLifeTime, ContextServiceImpl contextService, RejectPolicy rejectPolicy, BlockingQueue<Runnable> queue, ControlPoint controlPoint) {
super(name, managedThreadFactory, hungTaskThreshold, longRunningTasks, corePoolSize, maxPoolSize, keepAliveTime, keepAliveTimeUnit, threadLifeTime, contextService, rejectPolicy, queue);
this.controlPoint = controlPoint;
}

public ManagedExecutorServiceImpl(String name, ManagedThreadFactoryImpl managedThreadFactory, long hungTaskThreshold, boolean longRunningTasks, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long threadLifeTime, int queueCapacity, ContextServiceImpl contextService, RejectPolicy rejectPolicy, ControlPoint controlPoint) {
super(name, managedThreadFactory, hungTaskThreshold, longRunningTasks, corePoolSize, maxPoolSize, keepAliveTime, keepAliveTimeUnit, threadLifeTime, queueCapacity, contextService, rejectPolicy);
this.controlPoint = controlPoint;
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(doWrap(task, controlPoint));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(doWrap(task, controlPoint), result);
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(doWrap(task, controlPoint));
}

@Override
public void execute(Runnable command) {
super.execute(doWrap(command, controlPoint));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,89 @@

import org.glassfish.enterprise.concurrent.ContextServiceImpl;
import org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl;
import org.jboss.as.ee.logging.EeLogger;
import org.wildfly.extension.requestcontroller.ControlPoint;
import org.wildfly.extension.requestcontroller.RunResult;

import javax.enterprise.concurrent.LastExecution;
import javax.enterprise.concurrent.Trigger;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.jboss.as.ee.concurrent.ControlPointUtils.doWrap;

/**
* WildFly's extension of Java EE 7 RI {@link org.glassfish.enterprise.concurrent.ManagedScheduledExecutorServiceImpl}.
*
* @author Eduardo Martins
*/
public class ManagedScheduledExecutorServiceImpl extends org.glassfish.enterprise.concurrent.ManagedScheduledExecutorServiceImpl {

public ManagedScheduledExecutorServiceImpl(String name, ManagedThreadFactoryImpl managedThreadFactory, long hungTaskThreshold, boolean longRunningTasks, int corePoolSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long threadLifeTime, ContextServiceImpl contextService, RejectPolicy rejectPolicy) {
private final ControlPoint controlPoint;

public ManagedScheduledExecutorServiceImpl(String name, ManagedThreadFactoryImpl managedThreadFactory, long hungTaskThreshold, boolean longRunningTasks, int corePoolSize, long keepAliveTime, TimeUnit keepAliveTimeUnit, long threadLifeTime, ContextServiceImpl contextService, RejectPolicy rejectPolicy, ControlPoint controlPoint) {
super(name, managedThreadFactory, hungTaskThreshold, longRunningTasks, corePoolSize, keepAliveTime, keepAliveTimeUnit, threadLifeTime, contextService, rejectPolicy);
this.controlPoint = controlPoint;
}

@Override
public void execute(Runnable command) {
super.execute(doWrap(command, controlPoint));
}

@Override
public Future<?> submit(Runnable task) {
return super.submit(doWrap(task, controlPoint));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(doWrap(task, controlPoint), result);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(doWrap(task, controlPoint));
}

@Override
public ScheduledFuture<?> schedule(Runnable command, Trigger trigger) {
final CancellableTrigger ctrigger = new CancellableTrigger(trigger);
ctrigger.future = super.schedule(command, ctrigger);
ctrigger.future = super.schedule(new ControlledScheduledRunnable(command, controlPoint, this), ctrigger);
return ctrigger.future;
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, Trigger trigger) {
final CancellableTrigger ctrigger = new CancellableTrigger(trigger);
ctrigger.future = super.schedule(callable, ctrigger);
ctrigger.future = super.schedule(new ControlledScheduledCallable(callable, controlPoint), ctrigger);
return ctrigger.future;
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return super.schedule(new ControlledScheduledRunnable(command, controlPoint, this), delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return super.schedule(new ControlledScheduledCallable(callable, controlPoint), delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return super.scheduleAtFixedRate(new ControlledScheduledRunnable(command, controlPoint, this), initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return super.scheduleWithFixedDelay(new ControlledScheduledRunnable(command, controlPoint, this), initialDelay, delay, unit);
}

/**
* A {@link javax.enterprise.concurrent.Trigger} wrapper that stops scheduling if the related {@link java.util.concurrent.ScheduledFuture} is cancelled.
*/
Expand All @@ -82,4 +132,72 @@ public boolean skipRun(LastExecution lastExecution, Date date) {
return trigger.skipRun(lastExecution, date);
}
}

private static class ControlledScheduledRunnable implements Runnable {

private final Runnable delegate;
private final ControlPoint controlPoint;
private final ExecutorService executorService;

private ControlledScheduledRunnable(Runnable delegate, ControlPoint controlPoint, ExecutorService executorService) {
this.delegate = delegate;
this.controlPoint = controlPoint;
this.executorService = executorService;
}

@Override
public void run() {
if (controlPoint == null) {
delegate.run();
} else
try {
if (controlPoint.beginRequest() == RunResult.RUN) {
try {
delegate.run();
} finally {
controlPoint.requestComplete();
}
return;
} else {
throw EeLogger.ROOT_LOGGER.cannotRunScheduledTask(delegate);
}
} catch (Exception e) {
EeLogger.ROOT_LOGGER.failedToRunTask(e);
}
}
}


private static class ControlledScheduledCallable<V> implements Callable<V> {

private final Callable<V> delegate;
private final ControlPoint controlPoint;

private ControlledScheduledCallable(Callable<V> delegate, ControlPoint controlPoint) {
this.delegate = delegate;
this.controlPoint = controlPoint;
}

@Override
public V call() throws Exception {
if (controlPoint == null) {
return delegate.call();
} else {
try {
if (controlPoint.beginRequest() == RunResult.RUN) {
try {
return delegate.call();
} finally {
controlPoint.requestComplete();
}
}
} catch (Exception e) {
EeLogger.ROOT_LOGGER.failedToRunTask(e);
}
throw EeLogger.ROOT_LOGGER.cannotRunScheduledTask(delegate);
}

}
}

}

0 comments on commit c2a06d8

Please sign in to comment.