Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fromIteratorBuffered #1307

Merged
merged 5 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2014-2020 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.benchmarks

import java.util.concurrent.TimeUnit

import monix.benchmarks
import monix.execution.Ack.Continue
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import org.openjdk.jmh.annotations._

import scala.collection.immutable.IndexedSeq
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Promise}

/** To do comparative benchmarks between versions:
*
* benchmarks/run-benchmark ChunkedMapFilterSumBenchmark
*
* This will generate results in `benchmarks/results`.
*
* Or to run the benchmark from within SBT:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 monix.benchmarks.ObservableIteratorBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread".
* Please note that benchmarks should be usually executed at least in
* 10 iterations (as a rule of thumb), but more is better.
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class ObservableIteratorBenchmark {
@Param(Array("1000"))
var chunkCount: Int = _

@Param(Array("1000"))
var chunkSize: Int = _

// All events that need to be streamed
var allElements: IndexedSeq[Int] = _
var chunks: IndexedSeq[Array[Int]] = _

@Setup
def setup(): Unit = {
chunks = (1 to chunkCount).map(i => Array.fill(chunkSize)(i))
allElements = chunks.flatten
}

@Benchmark
def bufferTumbling(): Int = {
val stream = Observable
.fromIteratorUnsafe(allElements.iterator)
.bufferTumbling(chunkSize)
.map(sumIntScala)

sum(stream)
}

@Benchmark
def fromIteratorBufferedUnsafe(): Int = {
val stream = Observable
.fromIteratorBufferedUnsafe(allElements.iterator, chunkSize)
.map(sumIntScala)

sum(stream)
}

def sum(stream: Observable[Int]): Int = {
val p = Promise[Int]()
stream.unsafeSubscribeFn(new Subscriber.Sync[Int] {
val scheduler = benchmarks.scheduler
private[this] var sum: Int = 0

def onError(ex: Throwable): Unit =
p.failure(ex)
def onComplete(): Unit =
p.success(sum)
def onNext(elem: Int) = {
sum += elem
Continue
}
})
Await.result(p.future, Duration.Inf)
}

def sumIntScala(seq: Iterable[Int]): Int = {
val cursor = seq.iterator
var sum = 0
while (cursor.hasNext) {
sum += cursor.next()
}
sum
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ abstract class Observable[+A] extends Serializable { self =>
* then it also emits the given element (appended to the stream).
*/
final def append[B >: A](elem: B): Observable[B] =
self appendAll Observable.now(elem)
self.appendAll(Observable.now(elem))

/** Given the source observable and another `Observable`, emits all of
* the items from the first of these Observables to emit an item
Expand Down Expand Up @@ -1261,8 +1261,7 @@ abstract class Observable[+A] extends Serializable { self =>
final def doOnEarlyStopF[F[_]](task: F[Unit])(implicit F: TaskLike[F]): Observable[A] =
doOnEarlyStop(F(task))

/**
* Executes the given callback when the connection is being cancelled,
/** Executes the given callback when the connection is being cancelled,
* via the [[monix.execution.Cancelable Cancelable]] reference returned
* on subscribing to the created observable.
*
Expand Down Expand Up @@ -1725,7 +1724,7 @@ abstract class Observable[+A] extends Serializable { self =>
* then it also emits the given elements (appended to the stream).
*/
final def endWith[B >: A](elems: Seq[B]): Observable[B] =
self appendAll Observable.fromIterable(elems)
self.appendAll(Observable.fromIterable(elems))

/** Concatenates the source with another observable.
*
Expand All @@ -1743,13 +1742,11 @@ abstract class Observable[+A] extends Serializable { self =>
*
* result: a1, a2, a3, a4, b1, b2, b3, b4
* </pre>
*
*/
final def ++[B >: A](other: => Observable[B]): Observable[B] =
appendAll(Observable.defer(other))

/**
* A strict variant of [[++]].
/** A strict variant of [[++]].
*/
final def appendAll[B >: A](other: Observable[B]): Observable[B] =
new ConcatObservable[B](self, other)
Expand Down Expand Up @@ -2944,8 +2941,7 @@ abstract class Observable[+A] extends Serializable { self =>
final def scan[S](seed: => S)(op: (S, A) => S): Observable[S] =
new ScanObservable[A, S](self, () => seed, op)

/**
* Applies a binary operator to a start value and all elements of
/** Applies a binary operator to a start value and all elements of
* this Observable, going left to right and returns a new
* Observable that emits on each step the result element of
* the applied function.
Expand Down Expand Up @@ -3170,7 +3166,7 @@ abstract class Observable[+A] extends Serializable { self =>
* it also emits the events of the source (prepend operation).
*/
final def startWith[B >: A](elems: Seq[B]): Observable[B] =
Observable.fromIterable(elems) appendAll self
Observable.fromIterable(elems).appendAll(self)

/** Returns a new Observable that uses the specified `Scheduler` for
* initiating the subscription.
Expand Down Expand Up @@ -4912,8 +4908,7 @@ object Observable extends ObservableDeprecatedBuilders {
def from[F[_], A](fa: F[A])(implicit F: ObservableLike[F]): Observable[A] =
F.apply(fa)

/**
* Converts any `Iterable` into an [[Observable]].
/** Converts any `Iterable` into an [[Observable]].
*/
def fromIterable[A](iterable: Iterable[A]): Observable[A] =
new builders.IterableAsObservable[A](iterable)
Expand Down Expand Up @@ -4991,8 +4986,66 @@ object Observable extends ObservableDeprecatedBuilders {
def fromIteratorUnsafe[A](iterator: Iterator[A]): Observable[A] =
new builders.IteratorAsObservable[A](iterator)

/**
* Transforms any `cats.effect.Resource` into an [[Observable]].
/** Wraps a [[scala.Iterator]] into an `Observable` that emits events in `chunkSize` batches.
*
* This function uses [[monix.eval.Task Task]] in order to suspend
* the creation of the `Iterator`, because reading from an `Iterator`
* is a destructive process. The `Task` is being used as a "factory",
* in pace of [[scala.Iterable]].
*
* Example:
* {{{
* import monix.eval.Task
*
* Observable.fromIteratorBuffered(Task(Iterator.from(1)), 2)
* }}}
*
* @see [[fromIterable]]
*
* @see [[fromIteratorBuffered[A](resource* fromIteratorBuffered(Resource)]] for a version
* that uses `cats.effect.Resource`
*
* @see [[fromIteratorBufferedUnsafe]] for the unsafe version that can wrap an
* iterator directly
*/
def fromIteratorBuffered[A](task: Task[Iterator[A]], bufferSize: Int): Observable[Seq[A]] =
Observable.fromTask(task.map(fromIteratorBufferedUnsafe(_, bufferSize))).flatten

/** Wraps a [[scala.Iterator]] into an `Observable` in the context of a
* [[https://typelevel.org/cats-effect/datatypes/resource.html cats.effect.Resource]],
* which allows for specifying a finalizer.
*
* @see [[fromIterable]]
*
* @see [[fromIteratorBuffered[A](task* fromIteratorBuffered(task)]] for a version
* that uses [[monix.eval.Task Task]] for suspending side effects
*
* @see [[fromIteratorBufferedUnsafe]] for the unsafe version that can wrap an
* iterator directly
*/
def fromIteratorBuffered[A](resource: Resource[Task, Iterator[A]], bufferSize: Int): Observable[Seq[A]] =
Observable.fromResource(resource).flatMap(fromIteratorBufferedUnsafe(_, bufferSize))

/** Converts any `Iterator` into an observable that emits events in `bufferSize` batches.
*
* '''UNSAFE WARNING''': reading from an `Iterator` is a destructive process.
* Therefore only a single subscriber is supported, the result being
* a single-subscriber observable. If multiple subscribers are attempted,
* all subscribers, except for the first one, will be terminated with a
* [[monix.execution.exceptions.APIContractViolationException APIContractViolationException]].
*
* @see [[fromIteratorBuffered[A](task* fromIteratorBuffered(task)]] or
* [[fromIteratorBuffered[A](resource* fromIteratorBuffered(resource)]]
* for safe alternatives
*
* @param iterator to transform into an observable
*/
@UnsafeProtocol
@UnsafeBecauseImpure
def fromIteratorBufferedUnsafe[A](iterator: Iterator[A], bufferSize: Int): Observable[Seq[A]] =
new builders.BufferedIteratorAsObservable[A](iterator, bufferSize)

/** Transforms any `cats.effect.Resource` into an [[Observable]].
*
* See the
* [[https://typelevel.org/cats-effect/datatypes/resource.html documentation for Resource]].
Expand Down Expand Up @@ -5298,17 +5351,15 @@ object Observable extends ObservableDeprecatedBuilders {
case Failure(e) => Observable.raiseError(e)
}

/**
* Builds an `Observable` instance out of a Scala `Either`.
/** Builds an `Observable` instance out of a Scala `Either`.
*/
def fromEither[E <: Throwable, A](a: Either[E, A]): Observable[A] =
a match {
case Right(v) => Observable.now(v)
case Left(ex) => Observable.raiseError(ex)
}

/**
* Builds a [[Observable]] instance out of a Scala `Either`.
/** Builds a [[Observable]] instance out of a Scala `Either`.
*/
def fromEither[E, A](f: E => Throwable)(a: Either[E, A]): Observable[A] =
a match {
Expand Down Expand Up @@ -5369,8 +5420,7 @@ object Observable extends ObservableDeprecatedBuilders {
def fromTask[A](task: Task[A]): Observable[A] =
new builders.TaskAsObservable(task)

/**
* Returns a `F ~> Coeval` (`FunctionK`) for transforming any
/** Returns a `F ~> Coeval` (`FunctionK`) for transforming any
* supported data-type into [[Observable]].
*/
def liftFrom[F[_]](implicit F: ObservableLike[F]): F ~> Observable = F
Expand Down Expand Up @@ -5483,7 +5533,6 @@ object Observable extends ObservableDeprecatedBuilders {

/** Repeats the evaluation of given effectful value, emitting
* the results indefinitely.
*
*/
def repeatEvalF[F[_], A](fa: F[A])(implicit F: TaskLike[F]): Observable[A] =
repeat(()).mapEvalF(_ => fa)(F)
Expand Down Expand Up @@ -6154,7 +6203,7 @@ object Observable extends ObservableDeprecatedBuilders {
override def pure[A](a: A): Observable[A] =
Observable.now(a)
override def combineK[A](x: Observable[A], y: Observable[A]): Observable[A] =
x appendAll y
x.appendAll(y)
override def flatMap[A, B](fa: Observable[A])(f: (A) => Observable[B]): Observable[B] =
fa.flatMap(f)
override def flatten[A](ffa: Observable[Observable[A]]): Observable[A] =
Expand Down Expand Up @@ -6224,8 +6273,7 @@ object Observable extends ObservableDeprecatedBuilders {
}
}

/**
* Exposes extension methods for deprecated [[Observable]] methods.
/** Exposes extension methods for deprecated [[Observable]] methods.
*/
implicit final class DeprecatedExtensions[+A](val self: Observable[A])
extends AnyVal with ObservableDeprecatedMethods[A]
Expand Down
Loading