Skip to content

Commit

Permalink
Issues #100, #88 - remove Scheduler#env, make Task fast
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandru committed Dec 29, 2015
1 parent 70aaba9 commit be87ebb
Show file tree
Hide file tree
Showing 34 changed files with 267 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
package monifu.concurrent.schedulers

import java.util.concurrent.TimeUnit
import monifu.concurrent.Scheduler.{Environment, Platform}
import monifu.concurrent.cancelables.BooleanCancelable
import monifu.concurrent.schedulers.Timer.{clearTimeout, setTimeout}
import monifu.concurrent.{Cancelable, UncaughtExceptionReporter}

/**
* An `AsyncScheduler` schedules tasks to happen in the future with
* the given `ScheduledExecutorService` and the tasks themselves are
* executed on the given `ExecutionContext`.
*/
* An `AsyncScheduler` schedules tasks to happen in the future with
* the given `ScheduledExecutorService` and the tasks themselves are
* executed on the given `ExecutionContext`.
*/
final class AsyncScheduler private (reporter: UncaughtExceptionReporter)
extends ReferenceScheduler {

Expand All @@ -52,8 +50,6 @@ final class AsyncScheduler private (reporter: UncaughtExceptionReporter)

override def reportFailure(t: Throwable): Unit =
reporter.reportFailure(t)

override val env = Environment(256, Platform.JS)
}

object AsyncScheduler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package monifu.concurrent.schedulers

import java.util.concurrent.TimeUnit
import monifu.concurrent.Scheduler.{Platform, Environment}
import monifu.concurrent.schedulers.Timer.{clearTimeout, setTimeout}
import monifu.concurrent.{Cancelable, UncaughtExceptionReporter}
import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.duration.{FiniteDuration, TimeUnit}
import scala.concurrent.duration.TimeUnit
import scala.util.control.NonFatal


Expand Down Expand Up @@ -77,9 +76,6 @@ final class TrampolineScheduler private (reporter: UncaughtExceptionReporter)

override def reportFailure(t: Throwable): Unit =
reporter.reportFailure(t)

override val env =
Environment(256, Platform.JS)
}

object TrampolineScheduler {
Expand Down
32 changes: 32 additions & 0 deletions core/js/src/main/scala/monifu/internal/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2014-2015 by its authors. Some rights reserved.
* See the project homepage at: https://monifu.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monifu.internal

private[monifu] object Platform {
/**
* Returns `true` in case Monifu is running on top of Scala.js,
* or `false` otherwise.
*/
final val isJS = true

/**
* Returns `true` in case Monifu is running on top of the JVM,
* or `false` otherwise.
*/
final val isJVM = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package monifu.observers.buffers

import monifu.concurrent.Scheduler
import monifu.concurrent.atomic.padded.Atomic
import monifu.Ack.{Cancel, Continue}
import monifu.observers.BufferedSubscriber
Expand Down Expand Up @@ -47,7 +48,7 @@ private[monifu] final class BackPressuredBufferedSubscriber[-T] private
// side in order to know how many items to process and when to stop
private[this] val queue = mutable.Queue.empty[T]
// Used on the consumer side to split big synchronous workloads in batches
private[this] val batchSizeModulus = scheduler.env.batchSize - 1
private[this] val batchSizeModulus = Scheduler.recommendedBatchSize - 1

def onNext(elem: T): Future[Ack] = {
val state = stateRef.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package monifu.observers.buffers

import monifu.concurrent.Scheduler
import monifu.internal.collection.{EvictingQueue, DropHeadOnOverflowQueue, DropAllOnOverflowQueue}
import monifu.Ack.{Cancel, Continue}
import monifu.exceptions.BufferOverflowException
Expand Down Expand Up @@ -46,7 +47,7 @@ private[buffers] final class SynchronousBufferedSubscriber[-T] private
// events being dropped
private[this] var eventsDropped = 0L
// Used on the consumer side to split big synchronous workloads in batches
private[this] val batchSizeModulus = scheduler.env.batchSize - 1
private[this] val batchSizeModulus = Scheduler.recommendedBatchSize - 1

def onNext(elem: T): Ack = {
if (!upstreamIsComplete && !downstreamIsDone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
package monifu.concurrent.schedulers

import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import monifu.concurrent.Scheduler._
import monifu.concurrent.cancelables.BooleanCancelable
import monifu.concurrent.{Cancelable, UncaughtExceptionReporter}
import scala.concurrent.ExecutionContext


/**
* An `AsyncScheduler` schedules tasks to happen in the future with the
* given `ScheduledExecutorService` and the tasks themselves are executed on
* the given `ExecutionContext`.
*/
* An `AsyncScheduler` schedules tasks to happen in the future with the
* given `ScheduledExecutorService` and the tasks themselves are executed on
* the given `ExecutionContext`.
*/
final class AsyncScheduler private
(s: ScheduledExecutorService, ec: ExecutionContext, r: UncaughtExceptionReporter)
extends ReferenceScheduler {
Expand Down Expand Up @@ -64,8 +63,6 @@ final class AsyncScheduler private

override def reportFailure(t: Throwable): Unit =
r.reportFailure(t)

override val env = Environment(512, Platform.JVM)
}

object AsyncScheduler {
Expand Down
32 changes: 32 additions & 0 deletions core/jvm/src/main/scala/monifu/internal/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2014-2015 by its authors. Some rights reserved.
* See the project homepage at: https://monifu.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monifu.internal

private[monifu] object Platform {
/**
* Returns `true` in case Monifu is running on top of Scala.js,
* or `false` otherwise.
*/
final val isJS = false

/**
* Returns `true` in case Monifu is running on top of the JVM,
* or `false` otherwise.
*/
final val isJVM = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monifu.observers.buffers

import java.util.concurrent.ConcurrentLinkedQueue
import monifu.concurrent.Scheduler
import monifu.concurrent.atomic.padded.Atomic
import monifu.Ack.{Cancel, Continue}
import monifu.observers.BufferedSubscriber
Expand Down Expand Up @@ -47,7 +48,7 @@ private[monifu] final class BackPressuredBufferedSubscriber[-T] private
// side in order to know how many items to process and when to stop
private[this] val queue = new ConcurrentLinkedQueue[T]()
// Used on the consumer side to split big synchronous workloads in batches
private[this] val batchSizeModulus = scheduler.env.batchSize - 1
private[this] val batchSizeModulus = Scheduler.recommendedBatchSize - 1

def onNext(elem: T): Future[Ack] = {
val state = stateRef.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monifu.observers.buffers

import java.util.concurrent.ConcurrentLinkedQueue
import monifu.concurrent.Scheduler
import monifu.concurrent.atomic.padded.Atomic
import monifu.Ack.{Cancel, Continue}
import monifu.observers.{BufferedSubscriber, SynchronousSubscriber}
Expand Down Expand Up @@ -48,7 +49,7 @@ private[buffers] final class DropNewBufferedSubscriber[-T] private
// side in order to know how many items to process and when to stop
private[this] val queue = new ConcurrentLinkedQueue[T]()
// Used on the consumer side to split big synchronous workloads in batches
private[this] val batchSizeModulus = scheduler.env.batchSize - 1
private[this] val batchSizeModulus = Scheduler.recommendedBatchSize - 1

@tailrec
def onNext(elem: T): Ack = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package monifu.observers.buffers

import monifu.concurrent.Scheduler
import monifu.internal.collection.{EvictingQueue, DropHeadOnOverflowQueue, DropAllOnOverflowQueue}
import monifu.Ack.{Cancel, Continue}
import monifu.observers.{BufferedSubscriber, SynchronousSubscriber}
Expand Down Expand Up @@ -46,7 +47,7 @@ private[buffers] final class EvictingBufferedSubscriber[-T] private
// events being dropped
private[this] var eventsDropped = 0L
// MUST only be accessed within the consumer loop
private[this] val consumerBuffer = new Array[AnyRef](scheduler.env.batchSize)
private[this] val consumerBuffer = new Array[AnyRef](Scheduler.recommendedBatchSize)

def onNext(elem: T): Ack = self.synchronized {
if (!upstreamIsComplete && !downstreamIsDone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monifu.observers.buffers

import java.util.concurrent.ConcurrentLinkedQueue

import monifu.concurrent.Scheduler
import monifu.concurrent.atomic.padded.Atomic
import monifu.Ack.{Cancel, Continue}
import monifu.exceptions.BufferOverflowException
Expand Down Expand Up @@ -47,7 +48,7 @@ private[buffers] final class SimpleBufferedSubscriber[-T] private

implicit val scheduler = underlying.scheduler
private[this] val queue = new ConcurrentLinkedQueue[T]()
private[this] val batchSizeModulus = scheduler.env.batchSize - 1
private[this] val batchSizeModulus = Scheduler.recommendedBatchSize - 1

// to be modified only in onError, before upstreamIsComplete
private[this] var errorThrown: Throwable = null
Expand Down
Loading

0 comments on commit be87ebb

Please sign in to comment.