diff --git a/build.sbt b/build.sbt index 42ee59bcb..31a110e1b 100644 --- a/build.sbt +++ b/build.sbt @@ -106,6 +106,9 @@ lazy val sharedSettings = warnUnusedImport ++ Seq( Resolver.sonatypeRepo("releases") ), + // https://github.com/sbt/sbt/issues/2654 + incOptions := incOptions.value.withLogRecompileOnMacro(false), + // -- Settings meant for deployment on oss.sonatype.org useGpg := true, @@ -260,9 +263,7 @@ lazy val typesJS = project.in(file("monix-types/js")) .settings(scalaJSSettings) lazy val executionCommon = crossVersionSharedSources ++ Seq( - name := "monix-execution", - // https://github.com/sbt/sbt/issues/2654 - incOptions := incOptions.value.withLogRecompileOnMacro(false) + name := "monix-execution" ) lazy val executionJVM = project.in(file("monix-execution/jvm")) diff --git a/monix-reactive/js/src/main/scala/monix/reactive/observers/buffers/SyncBufferedSubscriber.scala b/monix-reactive/js/src/main/scala/monix/reactive/observers/buffers/SyncBufferedSubscriber.scala index 2731fc506..f95c9655c 100644 --- a/monix-reactive/js/src/main/scala/monix/reactive/observers/buffers/SyncBufferedSubscriber.scala +++ b/monix-reactive/js/src/main/scala/monix/reactive/observers/buffers/SyncBufferedSubscriber.scala @@ -30,7 +30,7 @@ import scala.util.control.NonFatal * [[monix.reactive.OverflowStrategy.DropNew DropNew]] overflow strategy. */ private[buffers] final class SyncBufferedSubscriber[-T] private - (underlying: Subscriber[T], buffer: EvictingQueue[T], onOverflow: Long => T = null) + (underlying: Subscriber[T], buffer: EvictingQueue[T], onOverflow: Long => Option[T] = null) extends BufferedSubscriber[T] with Subscriber.Sync[T] { implicit val scheduler = underlying.scheduler @@ -107,11 +107,14 @@ private[buffers] final class SyncBufferedSubscriber[-T] private val nextEvent = if (eventsDropped > 0 && onOverflow != null) { try { - val message = onOverflow(eventsDropped).asInstanceOf[AnyRef] - eventsDropped = 0 - message - } - catch { + onOverflow(eventsDropped) match { + case Some(message) => + eventsDropped = 0 + message.asInstanceOf[AnyRef] + case None => + buffer.poll() + } + } catch { case NonFatal(ex) => errorThrown = ex upstreamIsComplete = true @@ -212,7 +215,7 @@ private[monix] object SyncBufferedSubscriber { * for the [[monix.reactive.OverflowStrategy.DropNew DropNew]] * overflow strategy. */ - def dropNewAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T): Subscriber.Sync[T] = { + def dropNewAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T]): Subscriber.Sync[T] = { require(bufferSize > 1, "bufferSize must be a strictly positive number, bigger than 1") @@ -239,7 +242,7 @@ private[monix] object SyncBufferedSubscriber { * overflow strategy, with signaling of the number of events that * were dropped. */ - def dropOldAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T): Subscriber.Sync[T] = { + def dropOldAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T]): Subscriber.Sync[T] = { require(bufferSize > 1, "bufferSize must be a strictly positive number, bigger than 1") @@ -266,7 +269,7 @@ private[monix] object SyncBufferedSubscriber { * overflow strategy, with signaling of the number of events that * were dropped. */ - def clearBufferAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T): Subscriber.Sync[T] = { + def clearBufferAndSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T]): Subscriber.Sync[T] = { require(bufferSize > 1, "bufferSize must be a strictly positive number, bigger than 1") diff --git a/monix-reactive/jvm/src/main/scala/monix/reactive/observers/buffers/DropNewBufferedSubscriber.scala b/monix-reactive/jvm/src/main/scala/monix/reactive/observers/buffers/DropNewBufferedSubscriber.scala index 4d4d4d29e..9e7f82d75 100644 --- a/monix-reactive/jvm/src/main/scala/monix/reactive/observers/buffers/DropNewBufferedSubscriber.scala +++ b/monix-reactive/jvm/src/main/scala/monix/reactive/observers/buffers/DropNewBufferedSubscriber.scala @@ -27,13 +27,12 @@ import scala.annotation.tailrec import scala.concurrent.Future import scala.util.control.NonFatal -/** - * A high-performance and non-blocking [[BufferedSubscriber]] implementation - * for the [[monix.reactive.OverflowStrategy.DropNew DropNew]] - * overflow strategy. - */ +/** A high-performance and non-blocking [[BufferedSubscriber]] implementation + * for the [[monix.reactive.OverflowStrategy.DropNew DropNew]] + * overflow strategy. + */ private[buffers] final class DropNewBufferedSubscriber[-T] private - (underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T = null) + (underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T] = null) extends BufferedSubscriber[T] with Subscriber.Sync[T] { self => require(bufferSize > 0, "bufferSize must be a strictly positive number") @@ -75,12 +74,14 @@ private[buffers] final class DropNewBufferedSubscriber[-T] private // composing the overflow message; we've got to do error handling // because this is a user supplied function val shouldContinue = try { - val message = onOverflow(state.eventsDropped) - queue.offer(message) - pushToConsumer(update) + onOverflow(state.eventsDropped) match { + case None => () + case Some(message) => + queue.offer(message) + pushToConsumer(update) + } true - } - catch { + } catch { case NonFatal(ex) => onError(ex) false @@ -147,12 +148,12 @@ private[buffers] final class DropNewBufferedSubscriber[-T] private // composing the overflow message; we've got to do error handling // because this is a user supplied function val ex = try { - val message = onOverflow(state.eventsDropped) - queue.offer(message) - pushToConsumer(state) + for (message <- onOverflow(state.eventsDropped)) { + queue.offer(message) + pushToConsumer(state) + } null - } - catch { + } catch { case NonFatal(ref) => ref } @@ -291,21 +292,19 @@ private[buffers] final class DropNewBufferedSubscriber[-T] private } private[monix] object DropNewBufferedSubscriber { - /** - * Returns an instance of a [[DropNewBufferedSubscriber]] - * for the [[monix.reactive.OverflowStrategy.DropNew DropNew]] - * overflowStrategy. - */ + /** Returns an instance of a [[DropNewBufferedSubscriber]] + * for the [[monix.reactive.OverflowStrategy.DropNew DropNew]] + * overflowStrategy. + */ def simple[T](underlying: Subscriber[T], bufferSize: Int): Subscriber.Sync[T] = { new DropNewBufferedSubscriber[T](underlying, bufferSize, null) } - /** - * Returns an instance of a [[DropNewBufferedSubscriber]] - * for the [[monix.reactive.OverflowStrategy.DropNew DropNew]] - * overflowStrategy. - */ - def withSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => T): Subscriber.Sync[T] = { + /** Returns an instance of a [[DropNewBufferedSubscriber]] + * for the [[monix.reactive.OverflowStrategy.DropNew DropNew]] + * overflowStrategy. + */ + def withSignal[T](underlying: Subscriber[T], bufferSize: Int, onOverflow: Long => Option[T]): Subscriber.Sync[T] = { new DropNewBufferedSubscriber[T](underlying, bufferSize, onOverflow) } @@ -318,24 +317,19 @@ private[monix] object DropNewBufferedSubscriber { isDoneInProgress: Boolean = false, errorThrown: Throwable = null) { - def upstreamShouldStop: Boolean = { + def upstreamShouldStop: Boolean = upstreamIsComplete || downstreamIsDone || isDoneInProgress - } - def downstreamComplete: State = { + def downstreamComplete: State = copy(itemsToPush = 0, downstreamIsDone = true) - } - def declareProcessed(processed: Int): State = { + def declareProcessed(processed: Int): State = copy(itemsToPush = itemsToPush - processed) - } - def incrementDropped: State = { + def incrementDropped: State = copy(eventsDropped = eventsDropped + 1) - } - def incrementItemsToPush: State = { + def incrementItemsToPush: State = copy(itemsToPush = itemsToPush + 1) - } } } diff --git a/monix-reactive/jvm/src/main/scala/monix/reactive/observers/buffers/EvictingBufferedSubscriber.scala b/monix-reactive/jvm/src/main/scala/monix/reactive/observers/buffers/EvictingBufferedSubscriber.scala index e4a21b7a7..f81fd03fe 100644 --- a/monix-reactive/jvm/src/main/scala/monix/reactive/observers/buffers/EvictingBufferedSubscriber.scala +++ b/monix-reactive/jvm/src/main/scala/monix/reactive/observers/buffers/EvictingBufferedSubscriber.scala @@ -26,12 +26,11 @@ import scala.annotation.tailrec import scala.util.Failure import scala.util.control.NonFatal -/** - * A [[BufferedSubscriber]] implementation for the - * [[monix.reactive.OverflowStrategy.DropNew DropNew]] overflow strategy. - */ +/** A [[BufferedSubscriber]] implementation for the + * [[monix.reactive.OverflowStrategy.DropNew DropNew]] overflow strategy. + */ private[buffers] final class EvictingBufferedSubscriber[-T] private - (underlying: Subscriber[T], buffer: EvictingQueue[AnyRef], onOverflow: Long => T = null) + (underlying: Subscriber[T], buffer: EvictingQueue[AnyRef], onOverflow: Long => Option[T] = null) extends BufferedSubscriber[T] with Subscriber.Sync[T] { self => implicit val scheduler = underlying.scheduler @@ -95,12 +94,15 @@ private[buffers] final class EvictingBufferedSubscriber[-T] private val count = if (eventsDropped > 0 && onOverflow != null) { try { - val message = onOverflow(eventsDropped).asInstanceOf[AnyRef] - eventsDropped = 0 - consumerBuffer(0) = message - 1 + buffer.pollMany(consumerBuffer, 1) - } - catch { + onOverflow(eventsDropped) match { + case Some(message) => + eventsDropped = 0 + consumerBuffer(0) = message.asInstanceOf[AnyRef] + 1 + buffer.pollMany(consumerBuffer, 1) + case None => + buffer.pollMany(consumerBuffer) + } + } catch { case NonFatal(ex) => errorThrown = ex upstreamIsComplete = true @@ -221,7 +223,7 @@ private[monix] object EvictingBufferedSubscriber { * were dropped. */ def dropOldAndSignal[A](underlying: Subscriber[A], - bufferSize: Int, onOverflow: Long => A): Subscriber.Sync[A] = { + bufferSize: Int, onOverflow: Long => Option[A]): Subscriber.Sync[A] = { require(bufferSize > 1, "bufferSize must be a strictly positive number, bigger than 1") @@ -250,7 +252,7 @@ private[monix] object EvictingBufferedSubscriber { * were dropped. */ def clearBufferAndSignal[A](underlying: Subscriber[A], - bufferSize: Int, onOverflow: Long => A): Subscriber.Sync[A] = { + bufferSize: Int, onOverflow: Long => Option[A]): Subscriber.Sync[A] = { require(bufferSize > 1, "bufferSize must be a strictly positive number, bigger than 1") diff --git a/monix-reactive/jvm/src/test/scala/monix/reactive/observers/BufferDropNewAndSignalConcurrencySuite.scala b/monix-reactive/jvm/src/test/scala/monix/reactive/observers/BufferDropNewAndSignalConcurrencySuite.scala index e5a09adfe..7c408ff7d 100644 --- a/monix-reactive/jvm/src/test/scala/monix/reactive/observers/BufferDropNewAndSignalConcurrencySuite.scala +++ b/monix-reactive/jvm/src/test/scala/monix/reactive/observers/BufferDropNewAndSignalConcurrencySuite.scala @@ -37,18 +37,18 @@ object BufferDropNewAndSignalConcurrencySuite } def buildNewForInt(bufferSize: Int, underlying: Observer[Int])(implicit s: Scheduler) = { - BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => nr.toInt)) + BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => Some(nr.toInt))) } def buildNewForLong(bufferSize: Int, underlying: Observer[Long])(implicit s: Scheduler) = { - BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => nr)) + BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => Some(nr))) } test("merge test should work") { implicit s => val num = 100000 val source = Observable.repeat(1L).take(num) val f = Observable.fromIterable(Seq(source, source, source)) - .mergeMap(x => x)(DropNewAndSignal(1000, dropped => dropped)) + .mergeMap(x => x)(DropNewAndSignal(1000, dropped => Some(dropped))) .sumF .runAsyncGetFirst diff --git a/monix-reactive/shared/src/main/scala/monix/reactive/OverflowStrategy.scala b/monix-reactive/shared/src/main/scala/monix/reactive/OverflowStrategy.scala index a94a81478..23907ee54 100644 --- a/monix-reactive/shared/src/main/scala/monix/reactive/OverflowStrategy.scala +++ b/monix-reactive/shared/src/main/scala/monix/reactive/OverflowStrategy.scala @@ -90,13 +90,20 @@ object OverflowStrategy { * the pipeline should begin dropping incoming events until the buffer * has room in it again and is free to process more elements. * + * The given `onOverflow` function get be used for logging the event + * and for sending a message to the downstream consumers to inform + * them of dropped messages. The function can return `None` in which + * case no message is sent and thus you can use it just to log a warning. + * * @param bufferSize specifies how many events our buffer can hold * before overflowing. + * * @param onOverflow is a function that can get called on overflow with * a number of messages that were dropped, a function that builds - * a new message that will be sent to downstream. + * a new message that will be sent to downstream. If it returns + * `None`, then no message gets sent to downstream. */ - final case class DropNewAndSignal[A](bufferSize: Int, onOverflow: Long => A) + final case class DropNewAndSignal[A](bufferSize: Int, onOverflow: Long => Option[A]) extends Evicted[A] { require(bufferSize > 1, "bufferSize should be strictly greater than 1") @@ -119,13 +126,20 @@ object OverflowStrategy { * the currently buffered events should start being dropped in a FIFO order, * so the oldest events from the buffer will be dropped first. * + * The given `onOverflow` function get be used for logging the event + * and for sending a message to the downstream consumers to inform + * them of dropped messages. The function can return `None` in which + * case no message is sent and thus you can use it just to log a warning. + * * @param bufferSize specifies how many events our buffer can hold * before overflowing + * * @param onOverflow is a function that can get called on overflow with * a number of messages that were dropped, a function that builds - * a new message that will be sent to downstream. + * a new message that will be sent to downstream. If it returns + * `None`, then no message gets sent to downstream. */ - final case class DropOldAndSignal[A](bufferSize: Int, onOverflow: Long => A) + final case class DropOldAndSignal[A](bufferSize: Int, onOverflow: Long => Option[A]) extends Evicted[A] { require(bufferSize > 1, "bufferSize should be strictly greater than 1") @@ -148,19 +162,24 @@ object OverflowStrategy { * the current buffer should be dropped completely to make room for * new events. * + * The given `onOverflow` function get be used for logging the event + * and for sending a message to the downstream consumers to inform + * them of dropped messages. The function can return `None` in which + * case no message is sent and thus you can use it just to log a warning. + * * @param bufferSize specifies how many events our buffer can hold * before overflowing + * * @param onOverflow is a function that can get called on overflow with * a number of messages that were dropped, a function that builds * a new message that will be sent to downstream. */ - final case class ClearBufferAndSignal[A](bufferSize: Int, onOverflow: Long => A) + final case class ClearBufferAndSignal[A](bufferSize: Int, onOverflow: Long => Option[A]) extends Evicted[A] { require(bufferSize > 1, "bufferSize should be strictly greater than 1") } - /** A category of [[OverflowStrategy]] for buffers that can be used * synchronously, without worrying about back-pressure concerns. */ diff --git a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferClearBufferThenSignalSuite.scala b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferClearBufferThenSignalSuite.scala index 216c35a0e..8f5812b30 100644 --- a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferClearBufferThenSignalSuite.scala +++ b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferClearBufferThenSignalSuite.scala @@ -18,8 +18,9 @@ package monix.reactive.observers import minitest.TestSuite -import monix.execution.Ack.{Stop, Continue} -import monix.execution.internal.{RunnableAction, Platform} +import monix.execution.Ack.{Continue, Stop} +import monix.execution.atomic.AtomicLong +import monix.execution.internal.{Platform, RunnableAction} import monix.execution.schedulers.TestScheduler import monix.execution.{Ack, Scheduler} import monix.reactive.Observer @@ -34,8 +35,18 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { "TestScheduler should have no pending tasks") } - def buildNew(bufferSize: Int, underlying: Observer[Int])(implicit s: Scheduler) = { - BufferedSubscriber(Subscriber(underlying, s), ClearBufferAndSignal(bufferSize, nr => nr.toInt)) + def buildNewWithSignal(bufferSize: Int, underlying: Observer[Int]) + (implicit s: Scheduler) = { + + BufferedSubscriber(Subscriber(underlying, s), + ClearBufferAndSignal(bufferSize, nr => Some(nr.toInt))) + } + + def buildNewWithLog(bufferSize: Int, underlying: Observer[Int], log: AtomicLong) + (implicit s: Scheduler) = { + + BufferedSubscriber[Int](Subscriber(underlying, s), + ClearBufferAndSignal(bufferSize, { nr => log.set(nr); None })) } test("should not lose events, test 1") { implicit s => @@ -57,7 +68,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { } } - val buffer = buildNew(1000, underlying) + val buffer = buildNewWithSignal(1000, underlying) for (i <- 0 until 1000) buffer.onNext(i) buffer.onComplete() @@ -86,7 +97,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { } } - val buffer = buildNew(1000, underlying) + val buffer = buildNewWithSignal(1000, underlying) def loop(n: Int): Unit = if (n > 0) @@ -103,7 +114,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { assertEquals(number, 10000) } - test("should drop old events when over capacity") { implicit s => + test("should drop old events when over capacity and signal") { implicit s => var received = 0 var wasCompleted = false val promise = Promise[Ack]() @@ -115,13 +126,12 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { } def onError(ex: Throwable) = () - def onComplete() = { wasCompleted = true } } - val buffer = buildNew(5, underlying) + val buffer = buildNewWithSignal(5, underlying) for (i <- 1 to 7) assertEquals(buffer.onNext(i), Continue) s.tick() @@ -140,9 +150,50 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { assert(wasCompleted, "wasCompleted should be true") } + test("should drop old events when over capacity and log") { implicit s => + var received = 0 + var wasCompleted = false + val promise = Promise[Ack]() + + val underlying = new Observer[Int] { + def onNext(elem: Int) = { + received += elem + if (elem < 7) Continue else promise.future + } + + def onError(ex: Throwable) = () + def onComplete() = { + wasCompleted = true + } + } + + val log = AtomicLong(0) + val buffer = buildNewWithLog(5, underlying, log) + + for (i <- 1 to 7) assertEquals(buffer.onNext(i), Continue) + s.tick() + assertEquals(received, 28) + + for (i <- 0 to 150) { + assertEquals(buffer.onNext(100 + i), Continue) + s.tick() + } + + assertEquals(received, 28) + promise.success(Continue); s.tick() + assertEquals(received, 28 + (247 to 250).sum) + assertEquals(log.get, 147) + + buffer.onNext(10); s.tick() + assertEquals(received, 28 + (247 to 250).sum + 10) + + buffer.onComplete(); s.tick() + assert(wasCompleted, "wasCompleted should be true") + } + test("should send onError when empty") { implicit s => var errorThrown: Throwable = null - val buffer = buildNew(5, new Observer[Int] { + val buffer = buildNewWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -161,7 +212,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { test("should send onError when in flight") { implicit s => var errorThrown: Throwable = null - val buffer = buildNew(5, new Observer[Int] { + val buffer = buildNewWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -180,7 +231,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { var errorThrown: Throwable = null val promise = Promise[Ack]() - val buffer = buildNew(5, new Observer[Int] { + val buffer = buildNewWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -202,7 +253,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { var wasCompleted = false val startConsuming = Promise[Continue]() - val buffer = buildNew(10000, new Observer[Int] { + val buffer = buildNewWithSignal(10000, new Observer[Int] { def onNext(elem: Int) = { sum += elem startConsuming.future @@ -224,7 +275,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { var sum = 0L var wasCompleted = false - val buffer = buildNew(10000, new Observer[Int] { + val buffer = buildNewWithSignal(10000, new Observer[Int] { def onNext(elem: Int) = { sum += elem Continue @@ -246,7 +297,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { var errorThrown: Throwable = null val startConsuming = Promise[Continue]() - val buffer = buildNew(10000, new Observer[Int] { + val buffer = buildNewWithSignal(10000, new Observer[Int] { def onNext(elem: Int) = { sum += elem startConsuming.future @@ -268,7 +319,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { var sum = 0L var errorThrown: Throwable = null - val buffer = buildNew(10000, new Observer[Int] { + val buffer = buildNewWithSignal(10000, new Observer[Int] { def onNext(elem: Int) = { sum += elem Continue @@ -289,7 +340,7 @@ object BufferClearBufferThenSignalSuite extends TestSuite[TestScheduler] { var received = 0L var wasCompleted = false - val buffer = buildNew(Platform.recommendedBatchSize * 3, new Observer[Int] { + val buffer = buildNewWithSignal(Platform.recommendedBatchSize * 3, new Observer[Int] { def onNext(elem: Int) = { received += 1 Continue diff --git a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropAllSuite.scala b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropAllSuite.scala index 2d46c0f7b..ca44aec90 100644 --- a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropAllSuite.scala +++ b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropAllSuite.scala @@ -298,7 +298,7 @@ object BufferDropAllSuite extends TestSuite[TestScheduler] { var received = 0L var wasCompleted = false - val buffer = buildNew(Platform.recommendedBatchSize * 3, new Observer[Int] { + val buffer = buildNewWithSignal(Platform.recommendedBatchSize * 3, new Observer[Int] { def onNext(elem: Int) = { received += 1 Continue diff --git a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropNewAndSignalSuite.scala b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropNewAndSignalSuite.scala index a056e2811..8e24595ed 100644 --- a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropNewAndSignalSuite.scala +++ b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropNewAndSignalSuite.scala @@ -18,8 +18,9 @@ package monix.reactive.observers import minitest.TestSuite -import monix.execution.Ack.{Stop, Continue} -import monix.execution.internal.{RunnableAction, Platform} +import monix.execution.Ack.{Continue, Stop} +import monix.execution.atomic.AtomicLong +import monix.execution.internal.{Platform, RunnableAction} import monix.execution.schedulers.TestScheduler import monix.execution.{Ack, Scheduler} import monix.reactive.Observer @@ -34,12 +35,16 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { "TestScheduler should have no pending tasks") } - def buildNewForInt(bufferSize: Int, underlying: Observer[Int])(implicit s: Scheduler) = { - BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => nr.toInt)) - } + def buildNewForIntWithSignal(bufferSize: Int, underlying: Observer[Int]) + (implicit s: Scheduler) = + BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => Some(nr.toInt))) + + def buildNewForIntWithLog(bufferSize: Int, underlying: Observer[Int], log: AtomicLong) + (implicit s: Scheduler) = + BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal[Int](bufferSize, { nr => log.set(nr); None })) - def buildNewForLong(bufferSize: Int, underlying: Observer[Long])(implicit s: Scheduler) = { - BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => nr)) + def buildNewForLongWithSignal(bufferSize: Int, underlying: Observer[Long])(implicit s: Scheduler) = { + BufferedSubscriber(Subscriber(underlying, s), DropNewAndSignal(bufferSize, nr => Some(nr))) } test("should not lose events, test 1") { implicit s => @@ -61,7 +66,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { } } - val buffer = buildNewForInt(1000, underlying) + val buffer = buildNewForIntWithSignal(1000, underlying) for (i <- 0 until 1000) buffer.onNext(i) buffer.onComplete() @@ -90,7 +95,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { } } - val buffer = buildNewForInt(1000, underlying) + val buffer = buildNewForIntWithSignal(1000, underlying) def loop(n: Int): Unit = if (n > 0) @@ -107,7 +112,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { assertEquals(number, 10000) } - test("should drop incoming when over capacity") { implicit s => + test("should drop incoming when over capacity and signal") { implicit s => var received = 0 var wasCompleted = false val promise = Promise[Ack]() @@ -125,7 +130,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { } } - val buffer = buildNewForInt(5, underlying) + val buffer = buildNewForIntWithSignal(5, underlying) assertEquals(buffer.onNext(1), Continue) assertEquals(buffer.onNext(2), Continue) @@ -155,9 +160,60 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { assert(wasCompleted, "wasCompleted should be true") } + test("should drop incoming when over capacity and log") { implicit s => + var received = 0 + var wasCompleted = false + val promise = Promise[Ack]() + + val underlying = new Observer[Int] { + def onNext(elem: Int) = { + received += elem + promise.future + } + + def onError(ex: Throwable) = () + + def onComplete() = { + wasCompleted = true + } + } + + val log = AtomicLong(0) + val buffer = buildNewForIntWithLog(5, underlying, log) + + assertEquals(buffer.onNext(1), Continue) + assertEquals(buffer.onNext(2), Continue) + assertEquals(buffer.onNext(3), Continue) + assertEquals(buffer.onNext(4), Continue) + assertEquals(buffer.onNext(5), Continue) + + s.tick() + assertEquals(received, 1) + + for (i <- 0 until 10) + assertEquals(buffer.onNext(6 + i), Continue) + + s.tick() + assertEquals(received, 1) + + promise.success(Continue); s.tick() + assert(received >= 15) + + for (i <- 0 until 4) assertEquals(buffer.onNext(6 + i), Continue) + + s.tick() + + assert(log.get > 0, "log should have happened") + assert(received >= 45 && received <= 60, + s"received should be either 45 or 60, but got $received") + + buffer.onComplete(); s.tick() + assert(wasCompleted, "wasCompleted should be true") + } + test("should send onError when empty") { implicit s => var errorThrown: Throwable = null - val buffer = buildNewForInt(5, new Observer[Int] { + val buffer = buildNewForIntWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -176,7 +232,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { test("should send onError when in flight") { implicit s => var errorThrown: Throwable = null - val buffer = buildNewForInt(5, new Observer[Int] { + val buffer = buildNewForIntWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -195,7 +251,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { var errorThrown: Throwable = null val promise = Promise[Ack]() - val buffer = buildNewForInt(5, new Observer[Int] { + val buffer = buildNewForIntWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -221,7 +277,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { var wasCompleted = false val startConsuming = Promise[Continue]() - val buffer = buildNewForLong(10000, new Observer[Long] { + val buffer = buildNewForLongWithSignal(10000, new Observer[Long] { def onNext(elem: Long) = { sum += elem startConsuming.future @@ -243,7 +299,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { var sum = 0L var wasCompleted = false - val buffer = buildNewForLong(10000, new Observer[Long] { + val buffer = buildNewForLongWithSignal(10000, new Observer[Long] { def onNext(elem: Long) = { sum += elem Continue @@ -265,7 +321,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { var errorThrown: Throwable = null val startConsuming = Promise[Continue]() - val buffer = buildNewForLong(10000, new Observer[Long] { + val buffer = buildNewForLongWithSignal(10000, new Observer[Long] { def onNext(elem: Long) = { sum += elem startConsuming.future @@ -287,7 +343,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { var sum = 0L var errorThrown: Throwable = null - val buffer = buildNewForLong(10000, new Observer[Long] { + val buffer = buildNewForLongWithSignal(10000, new Observer[Long] { def onNext(elem: Long) = { sum += elem Continue @@ -308,7 +364,7 @@ object BufferDropNewAndSignalSuite extends TestSuite[TestScheduler] { var received = 0L var wasCompleted = false - val buffer = buildNewForInt(Platform.recommendedBatchSize * 3, new Observer[Int] { + val buffer = buildNewForIntWithSignal(Platform.recommendedBatchSize * 3, new Observer[Int] { def onNext(elem: Int) = { received += 1 Continue diff --git a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropNewSuite.scala b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropNewSuite.scala index dde421974..dc3e235d8 100644 --- a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropNewSuite.scala +++ b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropNewSuite.scala @@ -312,7 +312,7 @@ object BufferDropNewSuite extends TestSuite[TestScheduler] { var received = 0L var wasCompleted = false - val buffer = buildNew(Platform.recommendedBatchSize * 3, new Observer[Int] { + val buffer = buildNewWithSignal(Platform.recommendedBatchSize * 3, new Observer[Int] { def onNext(elem: Int) = { received += 1 Continue diff --git a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropOldThenSignalSuite.scala b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropOldThenSignalSuite.scala index c12337686..96dde26dc 100644 --- a/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropOldThenSignalSuite.scala +++ b/monix-reactive/shared/src/test/scala/monix/reactive/observers/BufferDropOldThenSignalSuite.scala @@ -18,8 +18,9 @@ package monix.reactive.observers import minitest.TestSuite -import monix.execution.Ack.{Stop, Continue} -import monix.execution.internal.{RunnableAction, Platform} +import monix.execution.Ack.{Continue, Stop} +import monix.execution.atomic.AtomicLong +import monix.execution.internal.{Platform, RunnableAction} import monix.execution.schedulers.TestScheduler import monix.execution.{Ack, Scheduler} import monix.reactive.Observer @@ -34,9 +35,18 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { "TestScheduler should have no pending tasks") } - def buildNew(bufferSize: Int, underlying: Observer[Int])(implicit s: Scheduler) = { + def buildNewWithSignal(bufferSize: Int, underlying: Observer[Int]) + (implicit s: Scheduler) = { + BufferedSubscriber.synchronous( - Subscriber(underlying, s), DropOldAndSignal(bufferSize, nr => nr.toInt)) + Subscriber(underlying, s), DropOldAndSignal(bufferSize, nr => Some(nr.toInt))) + } + + def buildNewWithLog(bufferSize: Int, underlying: Observer[Int], log: AtomicLong) + (implicit s: Scheduler) = { + + BufferedSubscriber.synchronous[Int]( + Subscriber(underlying, s), DropOldAndSignal(bufferSize, { nr => log.set(nr); None })) } test("should not lose events, test 1") { implicit s => @@ -58,7 +68,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { } } - val buffer = buildNew(1000, underlying) + val buffer = buildNewWithSignal(1000, underlying) for (i <- 0 until 1000) buffer.onNext(i) buffer.onComplete() @@ -87,7 +97,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { } } - val buffer = buildNew(1000, underlying) + val buffer = buildNewWithSignal(1000, underlying) def loop(n: Int): Unit = if (n > 0) @@ -104,7 +114,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { assertEquals(number, 10000) } - test("should drop old events when over capacity") { implicit s => + test("should drop old events when over capacity and signal") { implicit s => var received = 0 var wasCompleted = false val promise = Promise[Ack]() @@ -122,7 +132,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { } } - val buffer = buildNew(5, underlying) + val buffer = buildNewWithSignal(5, underlying) for (i <- 1 to 7) assertEquals(buffer.onNext(i), Continue) s.tick() @@ -139,9 +149,46 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { assert(wasCompleted, "wasCompleted should be true") } + test("should drop old events when over capacity and log") { implicit s => + var received = 0 + var wasCompleted = false + val promise = Promise[Ack]() + + val underlying = new Observer[Int] { + def onNext(elem: Int) = { + received += elem + if (elem < 7) Continue else promise.future + } + + def onError(ex: Throwable) = () + + def onComplete() = { + wasCompleted = true + } + } + + val log = AtomicLong(0) + val buffer = buildNewWithLog(5, underlying, log) + + for (i <- 1 to 7) assertEquals(buffer.onNext(i), Continue) + s.tick() + assertEquals(received, 28) + + for (i <- 0 to 1000) assertEquals(buffer.onNext(100 + i), Continue) + s.tick() + assertEquals(received, 28) + + promise.success(Continue); s.tick() + assertEquals(received, (1094 to 1100).sum + 28) + assertEquals(log.get, 994) + + buffer.onComplete(); s.tick() + assert(wasCompleted, "wasCompleted should be true") + } + test("should send onError when empty") { implicit s => var errorThrown: Throwable = null - val buffer = buildNew(5, new Observer[Int] { + val buffer = buildNewWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -160,7 +207,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { test("should send onError when in flight") { implicit s => var errorThrown: Throwable = null - val buffer = buildNew(5, new Observer[Int] { + val buffer = buildNewWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -179,7 +226,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { var errorThrown: Throwable = null val promise = Promise[Ack]() - val buffer = buildNew(5, new Observer[Int] { + val buffer = buildNewWithSignal(5, new Observer[Int] { def onError(ex: Throwable) = { errorThrown = ex } @@ -201,7 +248,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { var wasCompleted = false val startConsuming = Promise[Continue]() - val buffer = buildNew(10000, new Observer[Int] { + val buffer = buildNewWithSignal(10000, new Observer[Int] { def onNext(elem: Int) = { sum += elem startConsuming.future @@ -223,7 +270,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { var sum = 0L var wasCompleted = false - val buffer = buildNew(10000, new Observer[Int] { + val buffer = buildNewWithSignal(10000, new Observer[Int] { def onNext(elem: Int) = { sum += elem Continue @@ -245,7 +292,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { var errorThrown: Throwable = null val startConsuming = Promise[Continue]() - val buffer = buildNew(10000, new Observer[Int] { + val buffer = buildNewWithSignal(10000, new Observer[Int] { def onNext(elem: Int) = { sum += elem startConsuming.future @@ -267,7 +314,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { var sum = 0L var errorThrown: Throwable = null - val buffer = buildNew(10000, new Observer[Int] { + val buffer = buildNewWithSignal(10000, new Observer[Int] { def onNext(elem: Int) = { sum += elem Continue @@ -288,7 +335,7 @@ object BufferDropOldThenSignalSuite extends TestSuite[TestScheduler] { var received = 0L var wasCompleted = false - val buffer = buildNew(Platform.recommendedBatchSize * 3, new Observer[Int] { + val buffer = buildNewWithSignal(Platform.recommendedBatchSize * 3, new Observer[Int] { def onNext(elem: Int) = { received += 1 Continue