Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change overflow-strategy onOverflow handling #203

Merged
merged 1 commit into from Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions build.sbt
Expand Up @@ -106,6 +106,9 @@ lazy val sharedSettings = warnUnusedImport ++ Seq(
Resolver.sonatypeRepo("releases")
),

// https://github.com/sbt/sbt/issues/2654
incOptions := incOptions.value.withLogRecompileOnMacro(false),

// -- Settings meant for deployment on oss.sonatype.org

useGpg := true,
Expand Down Expand Up @@ -260,9 +263,7 @@ lazy val typesJS = project.in(file("monix-types/js"))
.settings(scalaJSSettings)

lazy val executionCommon = crossVersionSharedSources ++ Seq(
name := "monix-execution",
// https://github.com/sbt/sbt/issues/2654
incOptions := incOptions.value.withLogRecompileOnMacro(false)
name := "monix-execution"
)

lazy val executionJVM = project.in(file("monix-execution/jvm"))
Expand Down
Expand Up @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
* [[monix.reactive.OverflowStrategy.DropNew DropNew]] overflow strategy.
*/
private[buffers] final class SyncBufferedSubscriber[-T] private
(underlying: Subscriber[T], buffer: EvictingQueue[T], onOverflow: Long => T = null)
(underlying: Subscriber[T], buffer: EvictingQueue[T], onOverflow: Long => Option[T] = null)
extends BufferedSubscriber[T] with Subscriber.Sync[T] {

implicit val scheduler = underlying.scheduler
Expand Down Expand Up @@ -107,11 +107,14 @@ private[buffers] final class SyncBufferedSubscriber[-T] private
val nextEvent =
if (eventsDropped > 0 && onOverflow != null) {
try {
val message = onOverflow(eventsDropped).asInstanceOf[AnyRef]
eventsDropped = 0
message
}
catch {
onOverflow(eventsDropped) match {
case Some(message) =>
eventsDropped = 0
message.asInstanceOf[AnyRef]
case None =>
buffer.poll()
}
} catch {
case NonFatal(ex) =>
errorThrown = ex
upstreamIsComplete = true
Expand Down Expand Up @@ -212,7 +215,7 @@ private[monix] object SyncBufferedSubscriber {
* for the [[monix.reactive.OverflowStrategy.DropNew DropNew]]
* overflow strategy.
*/
def dropNewAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T): Subscriber.Sync[T] = {
def dropNewAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T]): Subscriber.Sync[T] = {
require(bufferSize > 1,
"bufferSize must be a strictly positive number, bigger than 1")

Expand All @@ -239,7 +242,7 @@ private[monix] object SyncBufferedSubscriber {
* overflow strategy, with signaling of the number of events that
* were dropped.
*/
def dropOldAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T): Subscriber.Sync[T] = {
def dropOldAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T]): Subscriber.Sync[T] = {
require(bufferSize > 1,
"bufferSize must be a strictly positive number, bigger than 1")

Expand All @@ -266,7 +269,7 @@ private[monix] object SyncBufferedSubscriber {
* overflow strategy, with signaling of the number of events that
* were dropped.
*/
def clearBufferAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T): Subscriber.Sync[T] = {
def clearBufferAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T]): Subscriber.Sync[T] = {
require(bufferSize > 1,
"bufferSize must be a strictly positive number, bigger than 1")

Expand Down
Expand Up @@ -27,13 +27,12 @@ import scala.annotation.tailrec
import scala.concurrent.Future
import scala.util.control.NonFatal

/**
* A high-performance and non-blocking [[BufferedSubscriber]] implementation
* for the [[monix.reactive.OverflowStrategy.DropNew DropNew]]
* overflow strategy.
*/
/** A high-performance and non-blocking [[BufferedSubscriber]] implementation
* for the [[monix.reactive.OverflowStrategy.DropNew DropNew]]
* overflow strategy.
*/
private[buffers] final class DropNewBufferedSubscriber[-T] private
(underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T = null)
(underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T] = null)
extends BufferedSubscriber[T] with Subscriber.Sync[T] { self =>

require(bufferSize > 0, "bufferSize must be a strictly positive number")
Expand Down Expand Up @@ -75,12 +74,14 @@ private[buffers] final class DropNewBufferedSubscriber[-T] private
// composing the overflow message; we've got to do error handling
// because this is a user supplied function
val shouldContinue = try {
val message = onOverflow(state.eventsDropped)
queue.offer(message)
pushToConsumer(update)
onOverflow(state.eventsDropped) match {
case None => ()
case Some(message) =>
queue.offer(message)
pushToConsumer(update)
}
true
}
catch {
} catch {
case NonFatal(ex) =>
onError(ex)
false
Expand Down Expand Up @@ -147,12 +148,12 @@ private[buffers] final class DropNewBufferedSubscriber[-T] private
// composing the overflow message; we've got to do error handling
// because this is a user supplied function
val ex = try {
val message = onOverflow(state.eventsDropped)
queue.offer(message)
pushToConsumer(state)
for (message <- onOverflow(state.eventsDropped)) {
queue.offer(message)
pushToConsumer(state)
}
null
}
catch {
} catch {
case NonFatal(ref) => ref
}

Expand Down Expand Up @@ -291,21 +292,19 @@ private[buffers] final class DropNewBufferedSubscriber[-T] private
}

private[monix] object DropNewBufferedSubscriber {
/**
* Returns an instance of a [[DropNewBufferedSubscriber]]
* for the [[monix.reactive.OverflowStrategy.DropNew DropNew]]
* overflowStrategy.
*/
/** Returns an instance of a [[DropNewBufferedSubscriber]]
* for the [[monix.reactive.OverflowStrategy.DropNew DropNew]]
* overflowStrategy.
*/
def simple[T](underlying: Subscriber[T], bufferSize: Int): Subscriber.Sync[T] = {
new DropNewBufferedSubscriber[T](underlying, bufferSize, null)
}

/**
* Returns an instance of a [[DropNewBufferedSubscriber]]
* for the [[monix.reactive.OverflowStrategy.DropNew DropNew]]
* overflowStrategy.
*/
def withSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T): Subscriber.Sync[T] = {
/** Returns an instance of a [[DropNewBufferedSubscriber]]
* for the [[monix.reactive.OverflowStrategy.DropNew DropNew]]
* overflowStrategy.
*/
def withSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T]): Subscriber.Sync[T] = {
new DropNewBufferedSubscriber[T](underlying, bufferSize, onOverflow)
}

Expand All @@ -318,24 +317,19 @@ private[monix] object DropNewBufferedSubscriber {
isDoneInProgress: Boolean = false,
errorThrown: Throwable = null) {

def upstreamShouldStop: Boolean = {
def upstreamShouldStop: Boolean =
upstreamIsComplete || downstreamIsDone || isDoneInProgress
}

def downstreamComplete: State = {
def downstreamComplete: State =
copy(itemsToPush = 0, downstreamIsDone = true)
}

def declareProcessed(processed: Int): State = {
def declareProcessed(processed: Int): State =
copy(itemsToPush = itemsToPush - processed)
}

def incrementDropped: State = {
def incrementDropped: State =
copy(eventsDropped = eventsDropped + 1)
}

def incrementItemsToPush: State = {
def incrementItemsToPush: State =
copy(itemsToPush = itemsToPush + 1)
}
}
}
Expand Up @@ -26,12 +26,11 @@ import scala.annotation.tailrec
import scala.util.Failure
import scala.util.control.NonFatal

/**
* A [[BufferedSubscriber]] implementation for the
* [[monix.reactive.OverflowStrategy.DropNew DropNew]] overflow strategy.
*/
/** A [[BufferedSubscriber]] implementation for the
* [[monix.reactive.OverflowStrategy.DropNew DropNew]] overflow strategy.
*/
private[buffers] final class EvictingBufferedSubscriber[-T] private
(underlying: Subscriber[T], buffer: EvictingQueue[AnyRef], onOverflow: Long => T = null)
(underlying: Subscriber[T], buffer: EvictingQueue[AnyRef], onOverflow: Long => Option[T] = null)
extends BufferedSubscriber[T] with Subscriber.Sync[T] { self =>

implicit val scheduler = underlying.scheduler
Expand Down Expand Up @@ -95,12 +94,15 @@ private[buffers] final class EvictingBufferedSubscriber[-T] private
val count =
if (eventsDropped > 0 && onOverflow != null) {
try {
val message = onOverflow(eventsDropped).asInstanceOf[AnyRef]
eventsDropped = 0
consumerBuffer(0) = message
1 + buffer.pollMany(consumerBuffer, 1)
}
catch {
onOverflow(eventsDropped) match {
case Some(message) =>
eventsDropped = 0
consumerBuffer(0) = message.asInstanceOf[AnyRef]
1 + buffer.pollMany(consumerBuffer, 1)
case None =>
buffer.pollMany(consumerBuffer)
}
} catch {
case NonFatal(ex) =>
errorThrown = ex
upstreamIsComplete = true
Expand Down Expand Up @@ -221,7 +223,7 @@ private[monix] object EvictingBufferedSubscriber {
* were dropped.
*/
def dropOldAndSignal[A](underlying: Subscriber[A],
bufferSize: Int, onOverflow: Long => A): Subscriber.Sync[A] = {
bufferSize: Int, onOverflow: Long => Option[A]): Subscriber.Sync[A] = {

require(bufferSize > 1,
"bufferSize must be a strictly positive number, bigger than 1")
Expand Down Expand Up @@ -250,7 +252,7 @@ private[monix] object EvictingBufferedSubscriber {
* were dropped.
*/
def clearBufferAndSignal[A](underlying: Subscriber[A],
bufferSize: Int, onOverflow: Long => A): Subscriber.Sync[A] = {
bufferSize: Int, onOverflow: Long => Option[A]): Subscriber.Sync[A] = {

require(bufferSize > 1,
"bufferSize must be a strictly positive number, bigger than 1")
Expand Down
Expand Up @@ -37,18 +37,18 @@ object BufferDropNewAndSignalConcurrencySuite
}

def buildNewForInt(bufferSize: Int, underlying: Observer[Int])(implicit s: Scheduler) = {
BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => nr.toInt))
BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => Some(nr.toInt)))
}

def buildNewForLong(bufferSize: Int, underlying: Observer[Long])(implicit s: Scheduler) = {
BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => nr))
BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => Some(nr)))
}

test("merge test should work") { implicit s =>
val num = 100000
val source = Observable.repeat(1L).take(num)
val f = Observable.fromIterable(Seq(source, source, source))
.mergeMap(x => x)(DropNewAndSignal(1000, dropped => dropped))
.mergeMap(x => x)(DropNewAndSignal(1000, dropped => Some(dropped)))
.sumF
.runAsyncGetFirst

Expand Down
Expand Up @@ -90,13 +90,20 @@ object OverflowStrategy {
* the pipeline should begin dropping incoming events until the buffer
* has room in it again and is free to process more elements.
*
* The given `onOverflow` function get be used for logging the event
* and for sending a message to the downstream consumers to inform
* them of dropped messages. The function can return `None` in which
* case no message is sent and thus you can use it just to log a warning.
*
* @param bufferSize specifies how many events our buffer can hold
* before overflowing.
*
* @param onOverflow is a function that can get called on overflow with
* a number of messages that were dropped, a function that builds
* a new message that will be sent to downstream.
* a new message that will be sent to downstream. If it returns
* `None`, then no message gets sent to downstream.
*/
final case class DropNewAndSignal[A](bufferSize: Int, onOverflow: Long => A)
final case class DropNewAndSignal[A](bufferSize: Int, onOverflow: Long => Option[A])
extends Evicted[A] {

require(bufferSize > 1, "bufferSize should be strictly greater than 1")
Expand All @@ -119,13 +126,20 @@ object OverflowStrategy {
* the currently buffered events should start being dropped in a FIFO order,
* so the oldest events from the buffer will be dropped first.
*
* The given `onOverflow` function get be used for logging the event
* and for sending a message to the downstream consumers to inform
* them of dropped messages. The function can return `None` in which
* case no message is sent and thus you can use it just to log a warning.
*
* @param bufferSize specifies how many events our buffer can hold
* before overflowing
*
* @param onOverflow is a function that can get called on overflow with
* a number of messages that were dropped, a function that builds
* a new message that will be sent to downstream.
* a new message that will be sent to downstream. If it returns
* `None`, then no message gets sent to downstream.
*/
final case class DropOldAndSignal[A](bufferSize: Int, onOverflow: Long => A)
final case class DropOldAndSignal[A](bufferSize: Int, onOverflow: Long => Option[A])
extends Evicted[A] {

require(bufferSize > 1, "bufferSize should be strictly greater than 1")
Expand All @@ -148,19 +162,24 @@ object OverflowStrategy {
* the current buffer should be dropped completely to make room for
* new events.
*
* The given `onOverflow` function get be used for logging the event
* and for sending a message to the downstream consumers to inform
* them of dropped messages. The function can return `None` in which
* case no message is sent and thus you can use it just to log a warning.
*
* @param bufferSize specifies how many events our buffer can hold
* before overflowing
*
* @param onOverflow is a function that can get called on overflow with
* a number of messages that were dropped, a function that builds
* a new message that will be sent to downstream.
*/
final case class ClearBufferAndSignal[A](bufferSize: Int, onOverflow: Long => A)
final case class ClearBufferAndSignal[A](bufferSize: Int, onOverflow: Long => Option[A])
extends Evicted[A] {

require(bufferSize > 1, "bufferSize should be strictly greater than 1")
}


/** A category of [[OverflowStrategy]] for buffers that can be used
* synchronously, without worrying about back-pressure concerns.
*/
Expand Down