Skip to content

Commit

Permalink
Allow event handlers to block (puniverse/pulsar#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fabio Tudone committed Apr 18, 2016
1 parent c8c67b6 commit 842ed49
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
@@ -1,6 +1,6 @@
/*
* Quasar: lightweight threads and actors for the JVM.
* Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved.
* Copyright (c) 2013-2016, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand All @@ -13,12 +13,14 @@
*/
package co.paralleluniverse.actors.behaviors;

import co.paralleluniverse.fibers.SuspendExecution;

/**
* A handler that can be registered with an {@link EventSource} actor to receive all events {@link EventSource#notify(java.lang.Object) sent}
* to the actor.
*
* @author pron
*/
public interface EventHandler<Event> {
void handleEvent(Event event);
void handleEvent(Event event) throws SuspendExecution, InterruptedException;
}
Expand Up @@ -231,7 +231,7 @@ protected void onTerminate(Throwable cause) throws SuspendExecution, Interrupted
handlers.clear();
}

private void notifyHandlers(Event event) {
private void notifyHandlers(Event event) throws InterruptedException, SuspendExecution {
log().debug("{} Got event {}", this, event);
for (ListIterator<EventHandler<Event>> it = handlers.listIterator(); it.hasNext();) {
EventHandler<Event> handler = it.next();
Expand Down
@@ -1,6 +1,6 @@
/*
* Quasar: lightweight threads and actors for the JVM.
* Copyright (c) 2013-2015, Parallel Universe Software Co. All rights reserved.
* Copyright (c) 2013-2016, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand Down Expand Up @@ -38,14 +38,14 @@
*
* @author pron
*/
public class EventSourceTest {
public final class EventSourceTest {
@Rule
public TestName name = new TestName();
public final TestName name = new TestName();
@Rule
public TestRule watchman = TestUtil.WATCHMAN;
public final TestRule watchman = TestUtil.WATCHMAN;

@After
public void tearDown() {
public final void tearDown() {
ActorRegistry.clear();
}

Expand All @@ -59,7 +59,7 @@ private EventSource<String> spawnEventSource(Initializer initializer) {
}

private <T extends Actor<Message, V>, Message, V> T spawnActor(T actor) {
Fiber fiber = new Fiber(actor);
final Fiber fiber = new Fiber<>(actor);
fiber.setUncaughtExceptionHandler(new Strand.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Strand s, Throwable e) {
Expand All @@ -72,9 +72,9 @@ public void uncaughtException(Strand s, Throwable e) {
}

@Test
public void testInitializationAndTermination() throws Exception {
public final void testInitializationAndTermination() throws Exception {
final Initializer init = mock(Initializer.class);
EventSource<String> es = spawnEventSource(init);
final EventSource<String> es = spawnEventSource(init);

Thread.sleep(100);
verify(init).init();
Expand All @@ -86,7 +86,7 @@ public void testInitializationAndTermination() throws Exception {
}

@Test
public void testNotify() throws Exception {
public final void testNotify() throws Exception {
final EventHandler<String> handler1 = mock(EventHandler.class);
final EventHandler<String> handler2 = mock(EventHandler.class);

Expand All @@ -113,9 +113,34 @@ public void testNotify() throws Exception {
verify(handler2).handleEvent("goodbye");
}

private static final class BlockingStringEventHandler implements EventHandler<String> {
@Override
public final void handleEvent(String s) throws SuspendExecution, InterruptedException {
Fiber.sleep(10);
}
}

@Test
public final void testBlock() throws Exception {
final EventHandler<String> handler1 = new BlockingStringEventHandler();
final EventHandler<String> handler2 = new BlockingStringEventHandler();

final EventSource<String> es = spawnEventSource(null);

es.addHandler(handler1);
es.addHandler(handler2);

es.notify("hello");

Thread.sleep(100);

es.shutdown();
LocalActor.join(es, 100, TimeUnit.MILLISECONDS);
}

@Ignore
@Test
public void testExceptionThrownInHandler() throws Exception {
public final void testExceptionThrownInHandler() throws Exception {
final Initializer init = mock(Initializer.class);
final EventHandler<String> handler1 = mock(EventHandler.class);
final EventHandler<String> handler2 = mock(EventHandler.class);
Expand All @@ -139,10 +164,10 @@ public void testExceptionThrownInHandler() throws Exception {
}

@Test
public void testRegistration() throws Exception {
EventSource<String> es = new EventSourceActor<String>() {
public final void testRegistration() throws Exception {
final EventSource<String> es = new EventSourceActor<String>() {
@Override
protected void init() throws SuspendExecution, InterruptedException {
protected final void init() throws SuspendExecution, InterruptedException {
// Strand.sleep(1000);
register("test1");
}
Expand Down

0 comments on commit 842ed49

Please sign in to comment.