Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add methods to interrupt HandlerPool #143

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

amuttsch
Copy link
Contributor

@amuttsch amuttsch commented May 7, 2018

No description provided.

@@ -28,6 +27,9 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>

private val cellsNotDone = new AtomicReference[Map[Cell[_, _], Queue[SequentialCallbackRunnable[_, _]]]](Map()) // use `values` to store all pending sequential triggers

private var interruptLatch = new CountDownLatch(1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also be @volatile due to assignment on line 466 below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -444,4 +449,20 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>

def reportFailure(t: Throwable): Unit =
t.printStackTrace()

/**
* Interrupt the computation of cells. It can be resumed using the `resume` method.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"It" -> "This handler pool"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

def resume(): Unit = {
isInterrupted = false
interruptLatch.countDown()
interruptLatch = new CountDownLatch(1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it simplify finding bugs if this method would throw an exception (e.g., IllegalStateException) in case it is attempted to resume a non-interrupted pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PropertyStore the interrupt is implemented as @volatile var isInterrupted: () ⇒ Boolean. Therefore I call resume every time the client waits for quiescense, which should resume the store if it was interrupted.


assert(cell2.getResult() == 0)

pool.resume()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to add a test case where more than one task is interrupted. Also, there should be a test case, where only one of two tasks is interrupted, because the interrupt is too late for the first task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more tests

/**
* Interrupt the computation of cells. It can be resumed using the `resume` method.
*/
def interrupt(): Unit = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"interrupt" -> "suspend". The pair suspend/resume is much more common. Moreover, interrupt already has a different meaning for threads on the JVM (including InterruptedException etc.), so it is actually quite confusing to call this method interrupt!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to suspend.

@amuttsch amuttsch force-pushed the feature/interrupt branch from 0c2c4c6 to 56072a0 Compare May 19, 2018 09:25
@amuttsch
Copy link
Contributor Author

I applied your requested changes. Once test fails, because the putNext did not schedule an asynchronous task for whenNextSequential. This problem should be fixed once the new "one-element-queue" branch is merged. I tagged the test as ignored therefore and added a TODO to include it once the branch is merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants