Skip to content

Commit

Permalink
Add observable drop while inclusive
Browse files Browse the repository at this point in the history
  • Loading branch information
allantl committed Apr 29, 2019
1 parent 6a09cdd commit 9f7b623
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 3 deletions.
Expand Up @@ -1556,7 +1556,14 @@ abstract class Observable[+A] extends Serializable { self =>
* predicate and returns a new observable that emits the rest.
*/
final def dropWhile(p: A => Boolean): Observable[A] =
self.liftByOperator(new DropByPredicateOperator(p))
self.liftByOperator(new DropByPredicateOperator(p, inclusive = false))

/** Drops the longest prefix of elements that satisfy the given
* predicate, inclusive of the value that caused `predicate` to return `false` and
* returns a new observable that emits the rest.
*/
final def dropWhileInclusive(p: A => Boolean): Observable[A] =
self.liftByOperator(new DropByPredicateOperator(p, inclusive = true))

/** Drops the longest prefix of elements that satisfy the given
* function and returns a new observable that emits the rest. In
Expand Down
Expand Up @@ -24,7 +24,7 @@ import monix.reactive.Observable.Operator
import monix.reactive.observers.Subscriber
import scala.concurrent.Future

private[reactive] final class DropByPredicateOperator[A](p: A => Boolean)
private[reactive] final class DropByPredicateOperator[A](p: A => Boolean, inclusive: Boolean)
extends Operator[A, A] {

def apply(out: Subscriber[A]): Subscriber[A] =
Expand All @@ -46,7 +46,10 @@ private[reactive] final class DropByPredicateOperator[A](p: A => Boolean)
Continue
else {
continueDropping = false
out.onNext(elem)
if (inclusive)
Continue
else
out.onNext(elem)
}
} catch {
case NonFatal(ex) if streamError =>
Expand Down
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2014-2019 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.reactive.internal.operators

import monix.reactive.Observable
import scala.concurrent.duration._

object DropByPredicateInclusiveSuite extends BaseOperatorSuite {
def createObservable(sourceCount: Int) = {
require(sourceCount > 0, "sourceCount should be strictly positive")
Some {
val o = Observable.range(1, sourceCount * 2).dropWhileInclusive(_ < (sourceCount - 1))
Sample(o, count(sourceCount), sum(sourceCount), 0.seconds, 0.seconds)
}
}

def sum(sourceCount: Int): Long =
(1 until sourceCount * 2).drop(sourceCount-1).sum

def count(sourceCount: Int) =
sourceCount

def observableInError(sourceCount: Int, ex: Throwable) = {
require(sourceCount > 0, "sourceCount should be strictly positive")
Some {
val o = createObservableEndingInError(Observable.range(1, sourceCount + 2), ex)
.dropWhileInclusive(_ < 1)

Sample(o, count(sourceCount), sum(sourceCount), 0.seconds, 0.seconds)
}
}

def brokenUserCodeObservable(sourceCount: Int, ex: Throwable) = Some {
val o = Observable.range(1, sourceCount * 2).dropWhileInclusive { elem =>
if (elem < sourceCount) true else throw ex
}

Sample(o, 0, 0, 0.seconds, 0.seconds)
}

override def cancelableObservables() = {
val o = Observable.range(1, 1000).delayOnNext(1.second).dropWhileInclusive(_ < 100)
Seq(Sample(o, 0, 0, 0.seconds, 0.seconds))
}
}

0 comments on commit 9f7b623

Please sign in to comment.