Skip to content
This repository has been archived by the owner on Mar 2, 2022. It is now read-only.

Commit

Permalink
Upgraded to reactor 3.3.1.RELEASE
Browse files Browse the repository at this point in the history
To mirror what was done in the core library:
* Added transformDeferred(...) to SFlux/SMono
* Deprecated compose(...) in SFlux/SMono
  • Loading branch information
rs017991 authored and sinwe committed Dec 9, 2019
1 parent dbec2f9 commit 9c2b022
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 7 deletions.
3 changes: 3 additions & 0 deletions src/main/scala/reactor/core/scala/publisher/SFlux.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,11 @@ trait SFlux[T] extends SFluxLike[T, SFlux] with MapablePublisher[T] with ScalaCo

final def collectSortedSeq(ordering: Ordering[T] = None.orNull): SMono[Seq[T]] = new ReactiveSMono[Seq[T]](coreFlux.collectSortedList(ordering).map((l: JList[T]) => l.asScala))

@deprecated("will be removed, use transformDeferred() instead", since="reactor-scala-extensions 0.5.0")
final def compose[V](transformer: SFlux[T] => Publisher[V]): SFlux[V] = new ReactiveSFlux[V](coreFlux.compose[V](transformer))

final def transformDeferred[V](transformer: SFlux[T] => Publisher[V]): SFlux[V] = new ReactiveSFlux[V](coreFlux.transformDeferred[V](transformer))

final def concatMapDelayError[V](mapper: T => Publisher[_ <: V], delayUntilEnd: Boolean = false, prefetch: Int = XS_BUFFER_SIZE): SFlux[V] =
new ReactiveSFlux[V](coreFlux.concatMapDelayError[V](mapper, delayUntilEnd, prefetch))

Expand Down
12 changes: 8 additions & 4 deletions src/main/scala/reactor/core/scala/publisher/SMono.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,24 @@ trait SMono[T] extends SMonoLike[T, SMono] with MapablePublisher[T] with ScalaCo
* target [[SMono]] type. A transformation will occur for each
* [[org.reactivestreams.Subscriber]].
*
* `flux.compose(SMono::fromPublisher).subscribe()`
* `mono.transformDeferred(SMono::fromPublisher).subscribe()`
*
* @param transformer the function to immediately map this [[SMono]] into a target [[SMono]]
* @param transformer the function to lazily map this [[SMono]] into a target [[SMono]]
* instance.
* @tparam V the item type in the returned [[org.reactivestreams.Publisher]]
* @return a new [[SMono]]
* @see [[SMono.as]] for a loose conversion to an arbitrary type
*/
final def compose[V](transformer: SMono[T] => Publisher[V]): SMono[V] = {
final def transformDeferred[V](transformer: SMono[T] => Publisher[V]): SMono[V] = {
val transformerFunction = new Function[JMono[T], Publisher[V]] {
override def apply(t: JMono[T]): Publisher[V] = transformer(SMono.this)
}
coreMono.compose(transformerFunction).asScala
coreMono.transformDeferred(transformerFunction).asScala
}

@deprecated("will be removed, use transformDeferred() instead", since="reactor-scala-extensions 0.5.0")
final def compose[V](transformer: SMono[T] => Publisher[V]): SMono[V] = transformDeferred(transformer)

/**
* Concatenate emissions of this [[SMono]] with the provided [[Publisher]]
* (no interleave).
Expand Down Expand Up @@ -324,6 +327,7 @@ trait SMono[T] extends SMonoLike[T, SMono] with MapablePublisher[T] with ScalaCo
* @param afterTerminate the callback to call after [[org.reactivestreams.Subscriber.onNext]], [[org.reactivestreams.Subscriber.onComplete]] without preceding [[org.reactivestreams.Subscriber.onNext]] or [[org.reactivestreams.Subscriber.onError]]
* @return a new [[SMono]]
*/
@deprecated("prefer using `doAfterTerminate` or `doFinally`. will be removed", since="reactor-scala-extensions 0.5.0")
final def doAfterSuccessOrError(afterTerminate: Try[_ <: T] => Unit): SMono[T] = {
val biConsumer = (t: T, u: Throwable) => Option(t) match {
case Some(s) => afterTerminate(Success(s))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ class SParallelFlux[T] private(private val jParallelFlux: JParallelFlux[T]) exte
case (Some(fn), Some(fe), None, Some(fs)) => jParallelFlux.subscribe(fn, fe, null, fs)
case (Some(fn), Some(fe), None, None) => jParallelFlux.subscribe(fn, fe)
case (Some(fn), None, Some(fe), Some(fs)) => jParallelFlux.subscribe(fn, null, fe, fs)
case (Some(fn), None, Some(fe), None) => jParallelFlux.subscribe(fn, null, fe, null)
case (Some(fn), None, Some(fe), None) => jParallelFlux.subscribe(fn, null, fe)
case (Some(fn), None, None, Some(fs)) => jParallelFlux.subscribe(fn, null, null, fs)
case (Some(fn), None, None, None) => jParallelFlux.subscribe(fn)
case (None, Some(fe), Some(fc), Some(fs)) => jParallelFlux.subscribe(null, fe, fc, fs)
case (None, Some(fe), Some(fc), None) => jParallelFlux.subscribe(null, fe, fc)
case (None, Some(fe), None, Some(fs)) => jParallelFlux.subscribe(null, fe, null, fs)
case (None, Some(fe), None, None) => jParallelFlux.subscribe(null, fe)
case (None, None, Some(fc), Some(fs)) => jParallelFlux.subscribe(null, null, fc, fs)
case (None, None, Some(fc), None) => jParallelFlux.subscribe(null, null, fc, null)
case (None, None, Some(fc), None) => jParallelFlux.subscribe(null, null, fc)
case (None, None, None, Some(fs)) => jParallelFlux.subscribe(null, null, null, fs)
case (None, None, None, None) => jParallelFlux.subscribe()
}
Expand Down
5 changes: 5 additions & 0 deletions src/test/scala/reactor/core/scala/publisher/SFluxTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,11 @@ class SFluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wi
.expectNext(1)
.verifyComplete()
}
".transformDeferred should defer transformation of this flux to another publisher" in {
StepVerifier.create(SFlux.just(1, 2, 3).transformDeferred(SMono.fromPublisher))
.expectNext(1)
.verifyComplete()
}

".concatMap" - {
"with mapper should map the element sequentially" in {
Expand Down
5 changes: 5 additions & 0 deletions src/test/scala/reactor/core/scala/publisher/SMonoTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ class SMonoTest extends FreeSpec with Matchers with TestSupport {
.expectNext("1")
.verifyComplete()
}
".transformDeferred should defer creating the target mono type" in {
StepVerifier.create(SMono.just(1).transformDeferred[String](m => SFlux.fromPublisher(m.map(_.toString))))
.expectNext("1")
.verifyComplete()
}

".concatWith should concatenate mono with another source" in {
StepVerifier.create(SMono.just(1).concatWith(SMono.just(2)))
Expand Down
2 changes: 1 addition & 1 deletion versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ext {
logbackVersion = "1.3.0-alpha4"
mockitoVersion = "3.1.0"
pegdownVersion = "1.6.0"
reactorVersion = "3.2.9.RELEASE"
reactorVersion = "3.3.1.RELEASE"
scalaLoggingVersion = "3.9.0"
scalatestVersion = "3.0.5-M1"
scoverageVersion = "1.4.0-M3"
Expand Down

0 comments on commit 9c2b022

Please sign in to comment.