Skip to content

Commit

Permalink
execution.AsyncQueue and catnap.ConcurrentQueue (#757)
Browse files Browse the repository at this point in the history
* New AsyncQueue

* Add ConcurrentQueue

* Fix Mima

* Fix test

* Remove the retryDelay param

* Add tryOffer/tryPoll tests

* Fix test for Scala 2.11

* Add bounded/unbounded buffer capacity to queues

* Small fixes, rename configure to custom
  • Loading branch information
alexandru committed Oct 31, 2018
1 parent 1afaaa9 commit 840c090
Show file tree
Hide file tree
Showing 35 changed files with 2,060 additions and 146 deletions.
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

/*
package monix.benchmarks
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -67,3 +68,4 @@ class ArrayStackBenchmark {
sum
}
}
*/
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

/*
package monix.benchmarks
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -76,3 +77,4 @@ class ObservableMapTaskBenchmark {
Await.result(p.future, Duration.Inf)
}
}
*/
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

/*
package monix.benchmarks

import java.util.concurrent.TimeUnit
import monix.eval.Task
import org.openjdk.jmh.annotations._
Expand Down Expand Up @@ -71,3 +71,4 @@ class TaskAttemptBenchmark {
Await.result(loop(0).runAsync, Duration.Inf)
}
}
*/
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

/*
package monix.benchmarks
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -77,3 +78,4 @@ class TaskDeepBindBenchmark {
Await.result(loop(0).runAsync, Duration.Inf)
}
}
*/
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

/*
package monix.benchmarks
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -73,3 +74,4 @@ class TaskHandleErrorBenchmark {
Await.result(loop(0).runAsync, Duration.Inf)
}
}
*/
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

/*
package monix.benchmarks
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -69,4 +70,5 @@ object TaskMapCallsBenchmark {
}
sum
}
}
}
*/
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

/*
package monix.benchmarks
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -91,3 +92,4 @@ object TaskMapStreamBenchmark {
Task.pure(acc)
}
}
*/
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

/*
package monix.benchmarks
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -78,3 +79,4 @@ class TaskShallowBindBenchmark {
Await.result(task.runToFuture, Duration.Inf)
}
}
*/
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.benchmarks

import java.util.concurrent.TimeUnit
import monix.execution.ChannelType.{MPMC, SPMC, SPSC}
import monix.execution.{AsyncQueue, CancelableFuture, ChannelType}
import org.openjdk.jmh.annotations._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

/** To do comparative benchmarks between versions:
*
* benchmarks/run-benchmark AsyncQueueBenchmark
*
* This will generate results in `benchmarks/results`.
*
* Or to run the benchmark from within SBT:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 monix.benchmarks.AsyncQueueBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread".
* Please note that benchmarks should be usually executed at least in
* 10 iterations (as a rule of thumb), but more is better.
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class AsyncQueueBenchmark {
@Param(Array("10000"))
var size: Int = _

@Benchmark
def spsc(): Long = {
test(producers = 1, workers = 1, channelType = SPSC)
}

@Benchmark
def spmc(): Long = {
test(producers = 1, workers = 4, channelType = SPMC)
}

@Benchmark
def mpmc(): Long = {
test(producers = 4, workers = 4, channelType = MPMC)
}

def test(producers: Int, workers: Int, channelType: ChannelType): Long = {
val queue = AsyncQueue[Int](capacity = 1024, channelType = channelType)
val workers = 1

def producer(n: Int): Future[Long] =
if (n > 0)
queue.offer(n).flatMap(_ => producer(n - 1))
else
CancelableFuture.successful(0L)

def consumer(n: Int, acc: Long): Future[Long] =
if (n > 0) queue.poll().flatMap(i => consumer(n - 1, acc + i))
else CancelableFuture.successful(acc)

val producerTasks = (0 until producers).map(_ => producer(size / producers))
val workerTasks = (0 until workers).map(_ => consumer(size / workers, 0))

val futures = producerTasks ++ workerTasks
val r = for (l <- Future.sequence(futures)) yield l.sum
Await.result(r, Duration.Inf)
}
}

@@ -0,0 +1,85 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.benchmarks

import java.util.concurrent.TimeUnit
import monix.execution.CancelableFuture
import monix.execution.misc.AsyncQueue
import org.openjdk.jmh.annotations._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

/** To do comparative benchmarks between versions:
*
* benchmarks/run-benchmark AsyncQueueBenchmark
*
* This will generate results in `benchmarks/results`.
*
* Or to run the benchmark from within SBT:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 monix.benchmarks.AsyncQueueBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread".
* Please note that benchmarks should be usually executed at least in
* 10 iterations (as a rule of thumb), but more is better.
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class AsyncQueueBenchmark {
@Param(Array("10000"))
var size: Int = _

@Benchmark
def spsc(): Long = {
test(producers = 1, workers = 1)
}

@Benchmark
def spmc(): Long = {
test(producers = 1, workers = 4)
}

@Benchmark
def mpmc(): Long = {
test(producers = 4, workers = 4)
}

def test(producers: Int, workers: Int): Long = {
val queue = AsyncQueue[Int]()
val workers = 1

def producer(n: Int): Future[Long] =
if (n > 0)
CancelableFuture.successful(queue.offer(1)).flatMap(_ => producer(n - 1))
else
CancelableFuture.successful(0L)

def consumer(n: Int, acc: Long): Future[Long] =
if (n > 0) queue.poll().flatMap(i => consumer(n - 1, acc + i))
else CancelableFuture.successful(acc)

val producerTasks = (0 until producers).map(_ => producer(size / producers))
val workerTasks = (0 until workers).map(_ => consumer(size / workers, 0))

val futures = producerTasks ++ workerTasks
val r = for (l <- Future.sequence(futures)) yield l.sum
Await.result(r, Duration.Inf)
}
}

Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

/*
package monix.benchmarks
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -154,3 +155,4 @@ object TaskShiftBenchmark {
ctx.scheduler.executeTrampolined(() => cb.onSuccess(()))
}
}
*/
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -354,6 +354,7 @@ lazy val executionJVM = project.in(file("monix-execution/jvm"))
.settings(testSettings)
.settings(requiredMacroDeps)
.settings(executionCommon)
.settings(libraryDependencies += "org.jctools" % "jctools-core" % jcToolsVersion)
.settings(libraryDependencies += "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion)
.settings(mimaSettings("monix-execution"))

Expand Down Expand Up @@ -434,7 +435,6 @@ lazy val reactiveJVM = project.in(file("monix-reactive/jvm"))
.configure(profile)
.dependsOn(executionJVM, evalJVM % "compile->compile; test->test")
.settings(reactiveCommon)
.settings(libraryDependencies += "org.jctools" % "jctools-core" % jcToolsVersion)
.settings(mimaSettings("monix-reactive"))
.settings(doctestTestSettings)

Expand Down

0 comments on commit 840c090

Please sign in to comment.