Skip to content

Commit

Permalink
TaskSchedulingIntegrationSpec: shutting down the system
Browse files Browse the repository at this point in the history
  • Loading branch information
luontola committed Nov 27, 2008
1 parent 564a84e commit c4e1b7b
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 26 deletions.
Expand Up @@ -118,9 +118,11 @@ private void register(EntityObject entity, BigInteger id) {
if (state == State.FLUSHING) {
flushQueue.add(entity);
}
Object previous1 = entities.put(entity, id);
Object previous2 = entitiesById.put(id, entity);
assert previous1 == null && previous2 == null : "Registered an entity twise: " + entity + ", " + id;
BigInteger prevIdOfSameEntity = entities.put(entity, id);
EntityObject prevEntityWithSameId = entitiesById.put(id, entity);
assert prevIdOfSameEntity == null && prevEntityWithSameId == null : ""
+ "Registered an entity twise: " + entity + ", " + id
+ " (Previous was: " + prevEntityWithSameId + ", " + prevIdOfSameEntity + ")";
}

public BigInteger firstKey() {
Expand Down
Expand Up @@ -56,8 +56,10 @@ public SystemStartupListener[] get() {
}

private static class SystemShutdownListenerProvider implements Provider<SystemShutdownListener[]> {
@Inject public TaskSchedulingLifecycleManager listener1;

public SystemShutdownListener[] get() {
return new SystemShutdownListener[0];
return new SystemShutdownListener[]{listener1};
}
}
}
Expand Up @@ -32,13 +32,13 @@
package net.orfjackal.dimdwarf.scheduler;

import com.google.inject.Inject;
import net.orfjackal.dimdwarf.events.SystemStartupListener;
import net.orfjackal.dimdwarf.events.*;

/**
* @author Esko Luontola
* @since 28.11.2008
*/
public class TaskSchedulingLifecycleManager implements SystemStartupListener {
public class TaskSchedulingLifecycleManager implements SystemStartupListener, SystemShutdownListener {

private final TaskSchedulerImpl taskScheduler;
private final TaskThreadPool taskThreadPool;
Expand All @@ -53,4 +53,8 @@ public void onStartup() {
taskScheduler.start();
taskThreadPool.start();
}

public void onShutdown() {
taskThreadPool.shutdown();
}
}
Expand Up @@ -31,7 +31,7 @@

package net.orfjackal.dimdwarf.scheduler;

import com.google.inject.Inject;
import com.google.inject.*;
import net.orfjackal.dimdwarf.tasks.TaskExecutor;
import org.slf4j.*;

Expand All @@ -43,7 +43,7 @@
* @author Esko Luontola
* @since 26.11.2008
*/
//@Singleton // TODO: a test must fail first
@Singleton
@ThreadSafe
public class TaskThreadPool {
private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(TaskThreadPool.class);
Expand Down Expand Up @@ -77,15 +77,15 @@ public int getRunningTasks() {
}

public void shutdown() {
logger.info("Shutting down...");
logger.info("Shutting down {}...", getClass().getSimpleName());
consumer.interrupt();
try {
consumer.join();
} catch (InterruptedException e) {
logger.error("Interrupted while shutting down", e);
}
workers.shutdown();
logger.info("Shutdown finished");
logger.info("{} has been shut down", getClass().getSimpleName());
}

@SuppressWarnings({"ToArrayCallWithZeroLengthArrayArgument"})
Expand Down Expand Up @@ -129,7 +129,7 @@ public void run() {
runningTasks.add(taskHasFinished);
taskContext.execute(task);
} catch (Throwable t) {
logger.error("Task threw an exception", t);
logger.warn("Task threw an exception", t);
} finally {
runningTasks.remove(taskHasFinished);
taskHasFinished.countDown();
Expand Down
Expand Up @@ -35,13 +35,15 @@
import jdave.*;
import jdave.junit4.JDaveRunner;
import net.orfjackal.dimdwarf.api.TaskScheduler;
import net.orfjackal.dimdwarf.db.inmemory.InMemoryDatabaseManager;
import net.orfjackal.dimdwarf.entities.EntityIdFactoryImpl;
import net.orfjackal.dimdwarf.modules.CommonModules;
import net.orfjackal.dimdwarf.server.ServerLifecycleManager;
import net.orfjackal.dimdwarf.tasks.TaskExecutor;
import org.junit.runner.RunWith;

import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.*;

/**
* @author Esko Luontola
Expand All @@ -51,29 +53,52 @@
@Group({"fast"})
public class TaskSchedulingIntegrationSpec extends Specification<Object> {

private Injector injector;
private TaskExecutor taskContext;
private Provider<TaskScheduler> scheduler;
private TaskThreadPool pool;
private TestSpy spy;

private Provider<ServerLifecycleManager> server;

public void create() throws Exception {
Injector injector = Guice.createInjector(new CommonModules());
startupTheServer(new CommonModules());
}

private void startupTheServer(Module... modules) {
injector = Guice.createInjector(modules);
server = injector.getProvider(ServerLifecycleManager.class);
server.get().start();

taskContext = injector.getInstance(TaskExecutor.class);
scheduler = injector.getProvider(TaskScheduler.class);
pool = injector.getInstance(TaskThreadPool.class);
spy = injector.getInstance(TestSpy.class);
}

server = injector.getProvider(ServerLifecycleManager.class);
server.get().start();
private void shutdownTheServer() {
server.get().shutdown();
}

private void restartTheServer() {
shutdownTheServer();
final InMemoryDatabaseManager db = injector.getInstance(InMemoryDatabaseManager.class);
final EntityIdFactoryImpl idFactory = injector.getInstance(EntityIdFactoryImpl.class);

startupTheServer(
new CommonModules(),
new AbstractModule() {
protected void configure() {
bind(InMemoryDatabaseManager.class).toInstance(db);
bind(EntityIdFactoryImpl.class).toInstance(idFactory);
}
});
}

public void destroy() throws Exception {
server.get().shutdown();
shutdownTheServer();
}

public class WhenAOneTimeTasksIsScheduled {

public class WhenAOneTimeTaskIsScheduled {

public void create() throws InterruptedException {
taskContext.execute(new Runnable() {
Expand All @@ -89,6 +114,36 @@ public void itIsExecutedOnce() {
}
}

public class WhenARepeatedTaskIsScheduled {

public void create() throws InterruptedException {
taskContext.execute(new Runnable() {
public void run() {
scheduler.get().scheduleAtFixedRate(new ExecutionLoggingTask("A"), 0, 0, TimeUnit.MILLISECONDS);
}
});
spy.executionCount.acquire(2);
}

public void itIsExecutedManyTimes() {
specify(spy.executions, should.containAll("A:1", "A:2"));
}

public void afterShuttingDownItIsNoMoreExecuted() throws InterruptedException {
shutdownTheServer();
spy.executions.clear();
Thread.sleep(10); // TODO: figure out a more reliable thread synchronization method than sleeping
specify(spy.executions, should.containExactly());
}

public void afterRestartTheExecutionIsContinued() throws InterruptedException {
restartTheServer();
spy.executionCount.acquire(1);
specify(spy.executions, should.not().containAny("A:1", "A:2"));
specify(spy.executions, should.containAny("A:3", "A:4"));
}
}


@Singleton
public static class TestSpy {
Expand All @@ -114,9 +169,9 @@ public ExecutionLoggingTask(String dummyId) {
public void run() {
myExecutionCount++;
spy.logExecution(getDummyId(), myExecutionCount);
System.out.println("TaskSchedulingIntegrationSpec$ExecutionLoggingTask.run");
System.out.println("getDummyId() = " + getDummyId());
System.out.println("myExecutionCount = " + myExecutionCount);
System.err.println("TaskSchedulingIntegrationSpec$ExecutionLoggingTask.run");
System.err.println("getDummyId() = " + getDummyId());
System.err.println("myExecutionCount = " + myExecutionCount);
}
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ public TaskBootstrap takeNextTask() throws InterruptedException {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void destroy() throws Exception {
checking(new Expectations() {{
allowing(logger).info(with(any(String.class)));
allowing(logger).info(with(any(String.class)), with(any(Object.class)));
allowing(logger).info(with(any(String.class)), with(any(Throwable.class)));
}});
pool.shutdown();
Expand Down Expand Up @@ -292,7 +292,7 @@ public void run() {

public Expectations theExceptionIsLogged() {
return new Expectations() {{
one(logger).error("Task threw an exception", exception);
one(logger).warn("Task threw an exception", exception);
}};
}

Expand All @@ -317,9 +317,9 @@ public void run() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
private Expectations theShutdownIsLogged() {
return new Expectations() {{
one(logger).info("Shutting down...");
one(logger).info("Shutting down {}...", "TaskThreadPool");
allowing(logger).info(with(equal("Task consumer was interrupted")), with(aNonNull(InterruptedException.class)));
one(logger).info("Shutdown finished");
one(logger).info("{} has been shut down", "TaskThreadPool");
}};
}

Expand Down

0 comments on commit c4e1b7b

Please sign in to comment.