Permalink
Browse files

Merge branch 'master' into extra166-integration

  • Loading branch information...
2 parents 96dcdf6 + 9074b54 commit 7c965c26b03836264f089b04c7c5d52d5f99905b @vaclav vaclav committed Sep 7, 2012
@@ -30,8 +30,9 @@ h3. Dataflow
* Kanban-style dataflow operator management has been added
* Chaining of Promises using the new _then()_ method
* Added a DSL for easy operator pipe-lining
-* Polished the way operators can be stopped
+* Lifecycle events for operators and selectors were added
* Added support for custom error handlers
+* Polished the way operators can be stopped
* Added synchronous dataflow variables and channels
* Read channels can report their length
@@ -102,12 +102,16 @@ static boolean isControlMessage(final Object message) {
*/
final void checkPoison(final Object data) {
if (data instanceof PoisonPill) {
- owningProcessor.bindAllOutputsAtomically(data);
+ forwardPoisonPill(data);
owningProcessor.terminate();
((PoisonPill) data).countDown();
}
}
+ protected void forwardPoisonPill(final Object data) {
+ owningProcessor.bindAllOutputsAtomically(data);
+ }
+
final void reportException(final Throwable e) {
owningProcessor.reportError(e);
}
@@ -35,9 +35,11 @@
final class ForkingDataflowOperatorActor extends DataflowOperatorActor {
private final Semaphore semaphore;
private final Pool threadPool;
+ private final int maxForks;
ForkingDataflowOperatorActor(final DataflowOperator owningOperator, final PGroup group, final List outputs, final List inputs, final Closure code, final int maxForks) {
super(owningOperator, group, outputs, inputs, code);
+ this.maxForks = maxForks;
this.semaphore = new Semaphore(maxForks);
this.threadPool = group.getThreadPool();
}
@@ -61,4 +63,16 @@ public void run() {
}
});
}
+
+ @Override
+ protected void forwardPoisonPill(final Object data) {
+ try {
+ semaphore.acquire(maxForks);
+ } catch (InterruptedException e) {
+ owningProcessor.reportError(e);
+ } finally {
+ super.forwardPoisonPill(data);
+ semaphore.release(maxForks);
+ }
+ }
}
@@ -35,10 +35,12 @@
final class ForkingDataflowSelectorActor extends DataflowSelectorActor {
private final Semaphore semaphore;
private final Pool threadPool;
+ private final int maxForks;
@SuppressWarnings({"ConstructorWithTooManyParameters"})
ForkingDataflowSelectorActor(final DataflowSelector owningOperator, final PGroup group, final List outputs, final List inputs, final Closure code, final int maxForks) {
super(owningOperator, group, outputs, inputs, code);
+ this.maxForks = maxForks;
this.semaphore = new Semaphore(maxForks);
this.threadPool = group.getThreadPool();
}
@@ -62,4 +64,16 @@ public void run() {
}
});
}
+
+ @Override
+ protected void forwardPoisonPill(final Object data) {
+ try {
+ semaphore.acquire(maxForks);
+ } catch (InterruptedException e) {
+ owningProcessor.reportError(e);
+ } finally {
+ super.forwardPoisonPill(data);
+ semaphore.release(maxForks);
+ }
+ }
}
@@ -130,8 +130,8 @@ public class DataflowProcessorEventTest extends GroovyTestCase {
assert 1 == listener2.countEventsThatStartWith('beforeRun')
assert 1 == listener1.countEventsThatStartWith('messageSentOut')
assert 1 == listener2.countEventsThatStartWith('messageSentOut')
- assert 1 == listener1.countEventsThatStartWith('afterRun')
- assert 1 == listener2.countEventsThatStartWith('afterRun')
+ assert 1 >= listener1.countEventsThatStartWith('afterRun')
+ assert 1 >= listener2.countEventsThatStartWith('afterRun')
final arrived1 = listener1.retrieveEvents {it.startsWith 'messageArrived'}
assert 'messageArrived 10' == arrived1.first()
@@ -168,8 +168,8 @@ public class DataflowProcessorEventTest extends GroovyTestCase {
assert 2 == listener2.countEventsThatStartWith('beforeRun')
assert 2 == listener1.countEventsThatStartWith('messageSentOut')
assert 2 == listener2.countEventsThatStartWith('messageSentOut')
- assert 2 == listener1.countEventsThatStartWith('afterRun')
- assert 2 == listener2.countEventsThatStartWith('afterRun')
+ assert 2 >= listener1.countEventsThatStartWith('afterRun')
+ assert 2 >= listener2.countEventsThatStartWith('afterRun')
final arrived1 = listener1.retrieveEvents {it.startsWith 'messageArrived'}
assert 'messageArrived 10' == arrived1.first()
@@ -198,8 +198,8 @@ public class DataflowProcessorEventTest extends GroovyTestCase {
assert 0 == listener2.countEventsThatStartWith('beforeRun')
assert 1 == listener1.countEventsThatStartWith('messageSentOut')
assert 1 == listener2.countEventsThatStartWith('messageSentOut')
- assert 0 == listener1.countEventsThatStartWith('afterRun')
- assert 0 == listener2.countEventsThatStartWith('afterRun')
+ assert 0 >= listener1.countEventsThatStartWith('afterRun')
+ assert 0 >= listener2.countEventsThatStartWith('afterRun')
op.terminate()
}
@@ -222,8 +222,8 @@ public class DataflowProcessorEventTest extends GroovyTestCase {
assert 0 == listener2.countEventsThatStartWith('beforeRun')
assert 1 == listener1.countEventsThatStartWith('messageSentOut')
assert 1 == listener2.countEventsThatStartWith('messageSentOut')
- assert 0 == listener1.countEventsThatStartWith('afterRun')
- assert 0 == listener2.countEventsThatStartWith('afterRun')
+ assert 0 >= listener1.countEventsThatStartWith('afterRun')
+ assert 0 >= listener2.countEventsThatStartWith('afterRun')
op.terminate()
}
@@ -0,0 +1,91 @@
+// GPars - Groovy Parallel Systems
+//
+// Copyright © 2008-2012 The original author or authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package groovyx.gpars.dataflow.operator
+
+import groovyx.gpars.dataflow.DataflowQueue
+import groovyx.gpars.group.DefaultPGroup
+import groovyx.gpars.group.PGroup
+import java.util.concurrent.CyclicBarrier
+
+/**
+ * @author Vaclav Pech
+ */
+class PoisonWithForkProcessorTest extends GroovyTestCase {
+ private PGroup group
+ final DataflowQueue a = new DataflowQueue()
+ final DataflowQueue b = new DataflowQueue()
+ final DataflowQueue c = new DataflowQueue()
+
+ protected void setUp() {
+ group = new DefaultPGroup(20)
+ super.setUp()
+ }
+
+ protected void tearDown() {
+ group.shutdown()
+ super.tearDown()
+ }
+
+ public void testPoisonWithSequentialWriteIntoChannels() {
+ def op = group.operator(inputs: [a, b], outputs: [c], maxForks: 4) {x, y ->
+ bindOutput x + y
+ }
+ 100.times {
+ a << 10
+ }
+
+ 100.times {
+ b << 20
+ }
+ a << PoisonPill.instance
+ 100.times {
+ assert 30 == c.val
+ }
+ assert PoisonPill.instance == c.val
+ op.join()
+ }
+
+ public void testPoisonWithParallelWriteIntoChannels() {
+ def op = group.operator(inputs: [a, b], outputs: [c], maxForks: 4) {x, y ->
+ bindOutput x + y
+ }
+ final barrier = new CyclicBarrier(2)
+ final t1 = Thread.start {
+ barrier.await()
+ 100.times {
+ a << 10
+ }
+ a << PoisonPill.instance
+ }
+
+ final t2 = Thread.start {
+ barrier.await()
+ 100.times {
+ b << 20
+ }
+ b << PoisonPill.instance
+ }
+
+ [t1, t2]*.join()
+ a << PoisonPill.instance
+ 100.times {
+ assert 30 == c.val
+ }
+ assert PoisonPill.instance == c.val
+ op.join()
+ }
+}

0 comments on commit 7c965c2

Please sign in to comment.