Skip to content

Commit

Permalink
Channel: Implement throttle()
Browse files Browse the repository at this point in the history
Fixes #7
  • Loading branch information
tindzk committed Oct 5, 2015
1 parent 15e2d06 commit 4cc7930
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 0 deletions.
36 changes: 36 additions & 0 deletions js/src/main/scala/pl/metastack/metarx/AsyncScheduler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pl.metastack.metarx

import scala.concurrent.duration._

import scala.scalajs.js

class AsyncScheduler extends Scheduler {
type Timeout = js.Dynamic
type Interval = js.Dynamic

def setTimeout(delayMillis: Long, r: Runnable): Timeout = {
val lambda: js.Function = () => r.run()
js.Dynamic.global.setTimeout(lambda, delayMillis)
}

def setInterval(intervalMillis: Long, r: Runnable): Interval = {
val lambda: js.Function = () => r.run()
js.Dynamic.global.setInterval(lambda, intervalMillis)
}

def clearTimeout(task: Timeout): Unit =
js.Dynamic.global.clearTimeout(task)

def clearInterval(task: Interval): Unit =
js.Dynamic.global.clearInterval(task)

def schedule(interval: FiniteDuration, r: Runnable): Cancelable = {
val task = setInterval(interval.toMillis, r)
Cancelable(clearInterval(task))
}

def scheduleOnce(initialDelay: FiniteDuration, r: Runnable): Cancelable = {
val task = setTimeout(initialDelay.toMillis, r)
Cancelable(clearTimeout(task))
}
}
5 changes: 5 additions & 0 deletions js/src/main/scala/pl/metastack/metarx/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package pl.metastack.metarx

object Platform {
implicit lazy val DefaultScheduler: Scheduler = new AsyncScheduler
}
32 changes: 32 additions & 0 deletions jvm/src/main/scala/pl/metastack/metarx/AsyncScheduler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pl.metastack.metarx

import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

class AsyncScheduler(s: ScheduledExecutorService,
ec: ExecutionContext) extends Scheduler {
def schedule(interval: FiniteDuration, r: Runnable): Cancelable =
schedule(interval.length, interval.unit, r)

def schedule(interval: Long, unit: TimeUnit, r: Runnable): Cancelable = {
require(interval > 0)
val initialDelay = interval
val task = s.scheduleAtFixedRate(r, initialDelay, interval, unit)
Cancelable(task.cancel(true))
}

def scheduleOnce(initialDelay: FiniteDuration, r: Runnable): Cancelable =
scheduleOnce(initialDelay.length, initialDelay.unit, r)

def scheduleOnce(initialDelay: Long, unit: TimeUnit, r: Runnable): Cancelable = {
if (initialDelay <= 0) {
ec.execute(r)
Cancelable()
} else {
val task = s.schedule(r, initialDelay, unit)
Cancelable(task.cancel(true))
}
}
}
11 changes: 11 additions & 0 deletions jvm/src/main/scala/pl/metastack/metarx/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package pl.metastack.metarx

import java.util.concurrent.Executors

import scala.concurrent.ExecutionContext

object Platform {
implicit lazy val DefaultScheduler: Scheduler = new AsyncScheduler(
Executors.newSingleThreadScheduledExecutor(),

This comment has been minimized.

Copy link
@mkotsbak

mkotsbak Oct 9, 2015

Contributor

What is the reasoning about hard coding the executor, rather than letting the user supply it using implicit?

ExecutionContext.Implicits.global)
}
15 changes: 15 additions & 0 deletions shared/src/main/scala/pl/metastack/metarx/Channel.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.metastack.metarx

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

object Channel {
Expand Down Expand Up @@ -328,6 +329,20 @@ trait ReadChannel[T]
}
}, cur)
}

def throttle(interval: FiniteDuration)
(implicit scheduler: Scheduler): ReadChannel[T] = {
val intervalMs = interval.toMillis
var next = 0L
forkUni { t =>
val time = scheduler.currentTimeMillis()
if (next > time) Result.Next()
else {
next = time + intervalMs
Result.Next(t)
}
}
}
}

trait WriteChannel[T]
Expand Down
44 changes: 44 additions & 0 deletions shared/src/main/scala/pl/metastack/metarx/Scheduler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package pl.metastack.metarx

import scala.concurrent.duration._

/**
* Inspired from Monifu's scheduling code.
*/
trait Scheduler {
def schedule(interval: FiniteDuration, r: Runnable): Cancelable
def scheduleOnce(initialDelay: FiniteDuration, action: Runnable): Cancelable

def schedule(interval: FiniteDuration)(action: => Unit): Cancelable =
schedule(interval, new Runnable {
def run(): Unit = action
})

def scheduleOnce(initialDelay: FiniteDuration)(action: => Unit): Cancelable =
scheduleOnce(initialDelay, new Runnable {
def run(): Unit = action
})

def currentTimeMillis(): Long = System.currentTimeMillis()
}

trait Cancelable {
def cancel(): Boolean
}

object Cancelable {
def apply(): Cancelable = apply({})

def apply(callback: => Unit): Cancelable =
new Cancelable {
var isCanceled = false

def cancel(): Boolean =
if (isCanceled) false
else {
isCanceled = true
callback
true
}
}
}
24 changes: 24 additions & 0 deletions shared/src/test/scala/pl/metastack/metarx/ChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.metastack.metarx
import minitest._

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import scala.concurrent.Promise
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -996,4 +997,27 @@ object ChannelTest extends SimpleTestSuite {
val ch4 = ch1 + test2
assertEquals(ch4.cache.get, Some(concated))
}

test("throttle()") {
import scala.concurrent.duration._
import Platform.DefaultScheduler
val scheduler = implicitly[Scheduler]

val ch = Channel[Int]()
val throttled = ch.throttle(600.millis)

var i = 0
val task = scheduler.schedule(500.millis) {
ch := i
i += 1
}

val collected = ArrayBuffer.empty[Int]
throttled.attach(collected += _)

scheduler.scheduleOnce(2000.millis) {
assertEquals(collected, Seq(0, 2))
task.cancel()
}
}
}
29 changes: 29 additions & 0 deletions shared/src/test/scala/pl/metastack/metarx/SchedulerTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pl.metastack.metarx

import minitest.SimpleTestSuite

import scala.collection.mutable.ArrayBuffer

object SchedulerTest extends SimpleTestSuite {

test("schedule()") {
import scala.concurrent.duration._
val ch = Channel[Int]()

val scheduler: Scheduler = Platform.DefaultScheduler
var i = 0
val task = scheduler.schedule(500.millis) {
ch := i
i += 1
}

val collected = ArrayBuffer.empty[Int]
ch.attach(collected += _)

scheduler.scheduleOnce(2100.millis) {
assertEquals(collected, Seq(0, 1, 2, 3))
task.cancel()
}
}

}

0 comments on commit 4cc7930

Please sign in to comment.