Skip to content

Commit

Permalink
Merge pull request #3 from tools4j/feature/thread-like
Browse files Browse the repository at this point in the history
 Introduce ThreadLike interface and move all thread like stuff to a new run package
  • Loading branch information
terzerm committed Jul 23, 2019
2 parents 33e9be5 + ffae33d commit 77f670d
Show file tree
Hide file tree
Showing 14 changed files with 349 additions and 85 deletions.
9 changes: 6 additions & 3 deletions src/main/java/org/tools4j/nobark/loop/Loop.java
Expand Up @@ -27,6 +27,9 @@
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;

import org.tools4j.nobark.run.ShutdownableThread;
import org.tools4j.nobark.run.StoppableThread;

/**
* A loop performing a series of {@link Step steps} in an iterative manner as long as the {@link LoopCondition} is true.
* If no step performs any work in a whole iteration, an {@link IdleStrategy} is invoked.
Expand Down Expand Up @@ -107,7 +110,7 @@ public static StoppableThread start(final IdleStrategy idleStrategy,
Objects.requireNonNull(exceptionHandler);
Objects.requireNonNull(steps);
return StoppableThread.start(
running -> new Loop(workDone -> running.isRunning(), idleStrategy, exceptionHandler, steps),
running -> new Loop(workDone -> running.keepRunning(), idleStrategy, exceptionHandler, steps),
threadFactory);
}

Expand All @@ -131,8 +134,8 @@ public static ShutdownableThread start(final IdleStrategy idleStrategy,
Objects.requireNonNull(exceptionHandler);
Objects.requireNonNull(stepProviders);
return ShutdownableThread.start(
main -> mainLoop(workDone -> main.isRunning(), idleStrategy, exceptionHandler, stepProviders),
shutdown -> shutdownLoop(workDone -> workDone && shutdown.isRunning(), IdleStrategy.NO_OP, exceptionHandler, stepProviders),
main -> mainLoop(workDone -> main.keepRunning(), idleStrategy, exceptionHandler, stepProviders),
shutdown -> shutdownLoop(workDone -> workDone && shutdown.keepRunning(), IdleStrategy.NO_OP, exceptionHandler, stepProviders),
threadFactory);
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/tools4j/nobark/loop/StepProvider.java
Expand Up @@ -26,6 +26,8 @@
import java.util.Objects;
import java.util.concurrent.ThreadFactory;

import org.tools4j.nobark.run.ShutdownableThread;

/**
* Provider for {@link Step} distinguishing between normal (main loop) steps and shutdown steps that are used during the
* termination phase of a {@link ShutdownableThread} as returned by
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/org/tools4j/nobark/run/Joinable.java
@@ -0,0 +1,63 @@
/**
* The MIT License (MIT)
*
* Copyright (c) 2019 nobark (tools4j), Marco Terzer, Anton Anufriev
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package org.tools4j.nobark.run;

/**
* Joinable is a running service such as a thread that can be {@link #join() joined} to await
* its termination.
*/
@FunctionalInterface
public interface Joinable {
/**
* Waits for this Joinable to die.
*
* <p> An invocation of this method behaves in exactly the same
* way as the invocation
*
* <blockquote>
* {@linkplain #join(long) join}{@code (0)}
* </blockquote>
*
* @throws IllegalStateException
* if any thread has interrupted the current thread
* @see Thread#join()
*/
default void join() {
join(0);
}

/**
* Waits at most {@code millis} milliseconds for this Joinable to
* die. A timeout of {@code 0} means to wait forever.
*
* @param millis
* the time to wait in milliseconds
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
* @throws IllegalStateException
* if any thread has interrupted the current thread
* @see Thread#join(long)
*/
void join(long millis);
}
Expand Up @@ -21,7 +21,7 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package org.tools4j.nobark.loop;
package org.tools4j.nobark.run;

/**
* A factory for a {@link Runnable} that can be stopped by complying with a
Expand Down Expand Up @@ -54,6 +54,6 @@ interface RunningCondition {
*
* @return true to keep running and false to abort
*/
boolean isRunning();
boolean keepRunning();
}
}
Expand Up @@ -21,11 +21,14 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package org.tools4j.nobark.loop;
package org.tools4j.nobark.run;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.tools4j.nobark.loop.Step;

/**
* Shutdownable is a running service such as a thread that can be shutdown orderly or abruptly in a way similar to
* {@link ExecutorService}.
Expand Down Expand Up @@ -53,8 +56,10 @@ public interface Shutdownable {
* to do that.
* <p>
* Invocation has no additional effect if already shut down.
*
* @return list of tasks that never commenced execution if any, otherwise an empty list
*/
void shutdownNow();
List<Runnable> shutdownNow();

/**
* Returns {@code true} if this service has been shut down.
Expand All @@ -64,22 +69,21 @@ public interface Shutdownable {
boolean isShutdown();

/**
* Returns {@code true} if this service has terminated following shut down. Note that
* {@code isTerminated} is never {@code true} unless either {@code shutdown} or
* {@code shutdownNow} was called first.
* Returns {@code true} if this service has terminated.
*
* @return {@code true} if the service has terminated following shut down
* @return {@code true} if the service has terminated.
*/
boolean isTerminated();

/**
* Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs,
* whichever happens first.
* Blocks until this service has terminated, or the timeout occurs, whichever happens first.
* Zero timeout means waiting forever.
*
* @param timeout the maximum time to wait
* @param timeout the maximum time to wait, zero to wait indefinitely
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws IllegalStateException if any thread has interrupted the current thread
*/
boolean awaitTermination(long timeout, TimeUnit unit);
}
Expand Up @@ -21,8 +21,10 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package org.tools4j.nobark.loop;
package org.tools4j.nobark.run;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,12 +36,11 @@
* A thread that performs a main {@link java.lang.Runnable runnable} in a new thread and another shutdown runnable the
* graceful {@link #shutdown} phase of the thread. The thread is started immediately upon construction.
*/
public class ShutdownableThread implements Shutdownable {
public class ShutdownableThread implements ThreadLike, Shutdownable {

private static final int RUNNING = 0;
private static final int SHUTDOWN = 1;
private static final int SHUTDOWN_NOW = 2;
private static final int TERMINATED = 4;

private final RunnableFactory mainRunnableFactory;
private final RunnableFactory shutdownRunnableFactory;
Expand All @@ -51,9 +52,9 @@ public class ShutdownableThread implements Shutdownable {
* Constructor for shutdownable thread; it is recommended to use the static start(..) methods instead.
*
* @param mainRunnableFactory the factory for the main runnable;
* the <i>{@link #isRunning}</i> condition is passed to the factory as lambda
* the <i>{@link #keepMainRunning}</i> condition is passed to the factory as lambda
* @param shutdownRunnableFactory the factory for the shutdown phase runnable;
* the <i>{@link #isShutdownRunning}</i> condition is passed to the factory as lambda
* the <i>{@link #keepShutdownRunning}</i> condition is passed to the factory as lambda
* @param threadFactory the factory to provide the thread
*/
protected ShutdownableThread(final RunnableFactory mainRunnableFactory,
Expand All @@ -69,9 +70,9 @@ protected ShutdownableThread(final RunnableFactory mainRunnableFactory,
* Creates, starts and returns a new shutdownable thread.
*
* @param mainRunnableFactory the factory for the main runnable;
* the <i>{@link #isRunning}</i> condition is passed to the factory as lambda
* the <i>{@link #keepMainRunning}</i> condition is passed to the factory as lambda
* @param shutdownRunnableFactory the factory for the shutdown phase runnable;
* the <i>{@link #isShutdownRunning}</i> condition is passed to the factory as lambda
* the <i>{@link #keepShutdownRunning}</i> condition is passed to the factory as lambda
* @param threadFactory the factory to provide the thread
* @return the newly created and started shutdownable thread
*/
Expand All @@ -82,37 +83,32 @@ public static ShutdownableThread start(final RunnableFactory mainRunnableFactory
}

private void run() {
final Runnable main = mainRunnableFactory.create(this::isRunning);
final Runnable shutdown = shutdownRunnableFactory.create(this::isShutdownRunning);
final Runnable main = mainRunnableFactory.create(this::keepMainRunning);
final Runnable shutdown = shutdownRunnableFactory.create(this::keepShutdownRunning);
main.run();
shutdown.run();
notifyTerminated();
}

@Override
public void shutdown() {
state.compareAndSet(RUNNING, SHUTDOWN);
}


@Override
public void shutdownNow() {
public List<Runnable> shutdownNow() {
final int shutdownAndNow = SHUTDOWN | SHUTDOWN_NOW;
if (!state.compareAndSet(RUNNING, shutdownAndNow)) {
state.compareAndSet(SHUTDOWN, shutdownAndNow);
}
return Collections.emptyList();
}

private void notifyTerminated() {
if (!state.compareAndSet(SHUTDOWN, SHUTDOWN | TERMINATED)) {
state.compareAndSet(SHUTDOWN | SHUTDOWN_NOW, SHUTDOWN | SHUTDOWN_NOW | TERMINATED);
}
}

private boolean isRunning() {
private boolean keepMainRunning() {
return (state.get() & SHUTDOWN) == 0;
}

private boolean isShutdownRunning() {
private boolean keepShutdownRunning() {
return (state.get() & SHUTDOWN_NOW) == 0;
}

Expand All @@ -123,7 +119,7 @@ public boolean isShutdown() {

@Override
public boolean isTerminated() {
return (state.get() & TERMINATED) != 0;
return threadState() == Thread.State.TERMINATED;
}

@Override
Expand All @@ -134,9 +130,6 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) {
if (isTerminated()) {
return true;
}
if (timeout == 0) {
return isTerminated();
}
final long millis = unit.toMillis(timeout);
final long nanos = unit.toNanos(timeout - unit.convert(millis, TimeUnit.MILLISECONDS));
try {
Expand All @@ -151,6 +144,27 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) {
return isTerminated();
}

@Override
public Thread.State threadState() {
return thread.getState();
}

/**
* Calls {@link #shutdown()}.
*/
@Override
public void stop() {
shutdown();
}

/**
* Delegates to {@link #awaitTermination(long, TimeUnit)}.
*/
@Override
public void join(final long millis) {
awaitTermination(millis, TimeUnit.MILLISECONDS);
}

/**
* Returns the name of the thread that was created with the thread factory passed to the constructor.
*
Expand Down
Expand Up @@ -21,7 +21,7 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package org.tools4j.nobark.loop;
package org.tools4j.nobark.run;

/**
* Stoppable is a running service such as a thread that can be stopped either by calling
Expand Down

0 comments on commit 77f670d

Please sign in to comment.