Skip to content

Commit

Permalink
fixed bug ConnectableSubject
Browse files Browse the repository at this point in the history
  • Loading branch information
Maximiliano Rey committed Jan 16, 2020
1 parent 566ead7 commit 2d9ab10
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.iohk.scalanet.monix_subject


import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.{Observable, Observer}
import monix.reactive.observables.ConnectableObservable
Expand All @@ -11,7 +12,8 @@ import scala.concurrent.Future
class ConnectableSubject[T](source: Subject[T, T], subject: Subject[T, T])(implicit s: Scheduler)
extends ConnectableObservable[T]
with Observer[T] {
private val in = ConnectableObservable.cacheUntilConnect(source, subject)
private val in = ConnectableSubject.cacheUntilConnect(source, subject)


def connect(): Cancelable = in.connect()

Expand All @@ -20,6 +22,7 @@ class ConnectableSubject[T](source: Subject[T, T], subject: Subject[T, T])(impli
override def onError(ex: Throwable): Unit = source.onError(ex)

override def onComplete(): Unit = source.onComplete()


override def unsafeSubscribeFn(subscriber: Subscriber[T]): Cancelable = {
in.unsafeSubscribeFn(subscriber)
Expand Down

0 comments on commit 2d9ab10

Please sign in to comment.