diff --git a/src/main/scala/outwatch/SideEffects.scala b/src/main/scala/outwatch/SideEffects.scala index 90fcbef58..965b40d28 100644 --- a/src/main/scala/outwatch/SideEffects.scala +++ b/src/main/scala/outwatch/SideEffects.scala @@ -4,7 +4,7 @@ import monix.execution.Ack.Continue import monix.execution.Scheduler trait SideEffects { - def sideEffect[T](f: T => Unit)(implicit s: Scheduler): Sink[T] = Sink.create[T] { e => f(e); Continue }(s) - def sideEffect[S, T](f: (S, T) => Unit)(implicit s: Scheduler): Sink[(S, T)] = Sink.create[(S, T)] { e => f(e._1, e._2); Continue }(s) - def sideEffect(f: => Unit)(implicit s: Scheduler): Sink[Any] = Sink.create[Any] { e => f; Continue }(s) + def sideEffect[T](f: T => Unit)(implicit s: Scheduler): Sink[T] = Sink.create[T] { e => f(e); Continue }(s).unsafeRunSync() + def sideEffect[S, T](f: (S, T) => Unit)(implicit s: Scheduler): Sink[(S, T)] = Sink.create[(S, T)] { e => f(e._1, e._2); Continue }(s).unsafeRunSync() + def sideEffect(f: => Unit)(implicit s: Scheduler): Sink[Any] = Sink.create[Any] { e => f; Continue }(s).unsafeRunSync() } diff --git a/src/main/scala/outwatch/Sink.scala b/src/main/scala/outwatch/Sink.scala index 96bd347ef..fdfbdc66f 100644 --- a/src/main/scala/outwatch/Sink.scala +++ b/src/main/scala/outwatch/Sink.scala @@ -79,15 +79,16 @@ object Sink { def create[T](next: T => Future[Ack], error: Throwable => Unit = _ => (), complete: () => Unit = () => () - )(implicit s: Scheduler): Sink[T] = { - val sink = ObserverSink( - new Observer[T] { - override def onNext(t: T): Future[Ack] = next(t) - override def onError(ex: Throwable): Unit = error(ex) - override def onComplete(): Unit = complete() - } - ) - sink + )(implicit s: Scheduler): IO[Sink[T]] = { + IO{ + ObserverSink( + new Observer[T] { + override def onNext(t: T): Future[Ack] = next(t) + override def onError(ex: Throwable): Unit = error(ex) + override def onComplete(): Unit = complete() + } + ) + } } diff --git a/src/main/scala/outwatch/dom/ManagedSubscriptions.scala b/src/main/scala/outwatch/dom/ManagedSubscriptions.scala index 48e57a045..4d8559032 100644 --- a/src/main/scala/outwatch/dom/ManagedSubscriptions.scala +++ b/src/main/scala/outwatch/dom/ManagedSubscriptions.scala @@ -4,16 +4,17 @@ import cats.effect.IO import monix.execution.Ack.Continue import monix.execution.cancelables.CompositeCancelable import monix.execution.{Cancelable, Scheduler} +import org.scalajs.dom import outwatch.dom.dsl.attributes.lifecycle trait ManagedSubscriptions { def managed(subscription: IO[Cancelable])(implicit s: Scheduler): VDomModifier = { subscription.flatMap { sub: Cancelable => - lifecycle.onDestroy --> Sink.create{_ => + Sink.create[dom.Element] { _ => sub.cancel() Continue - } + }.flatMap( sink => lifecycle.onDestroy --> sink) } } @@ -21,10 +22,10 @@ trait ManagedSubscriptions { (sub1 :: sub2 :: subscriptions.toList).sequence.flatMap { subs: Seq[Cancelable] => val composite = CompositeCancelable(subs: _*) - lifecycle.onDestroy --> Sink.create{_ => + Sink.create[dom.Element]{ _ => composite.cancel() Continue - } + }.flatMap(sink => lifecycle.onDestroy --> sink) } } diff --git a/src/main/scala/outwatch/util/WebSocket.scala b/src/main/scala/outwatch/util/WebSocket.scala index b510b2fc7..f0c598b61 100644 --- a/src/main/scala/outwatch/util/WebSocket.scala +++ b/src/main/scala/outwatch/util/WebSocket.scala @@ -1,5 +1,6 @@ package outwatch.util +import cats.effect.IO import monix.execution.Ack.Continue import monix.execution.{Cancelable, Scheduler} import monix.reactive.Observable @@ -8,7 +9,7 @@ import org.scalajs.dom.{CloseEvent, ErrorEvent, MessageEvent} import outwatch.Sink object WebSocket { - implicit def toSink(socket: WebSocket): Sink[String] = socket.sink + implicit def toSink(socket: WebSocket): IO[Sink[String]] = socket.sink implicit def toSource(socket: WebSocket): Observable[MessageEvent] = socket.source } diff --git a/src/test/scala/outwatch/LifecycleHookSpec.scala b/src/test/scala/outwatch/LifecycleHookSpec.scala index 4bb13228f..dd75e42f5 100644 --- a/src/test/scala/outwatch/LifecycleHookSpec.scala +++ b/src/test/scala/outwatch/LifecycleHookSpec.scala @@ -19,7 +19,9 @@ class LifecycleHookSpec extends JSDomSpec { Continue } - val node = div(onInsert --> sink) + val node = sink.flatMap { sink => + div(onInsert --> sink) + } switch shouldBe false @@ -41,7 +43,11 @@ class LifecycleHookSpec extends JSDomSpec { Continue } - val node = div(onInsert --> sink)(onInsert --> sink2) + val node = for { + sink <- sink + sink2 <- sink2 + node <- div(onInsert --> sink)(onInsert --> sink2) + } yield node switch shouldBe false switch2 shouldBe false @@ -62,7 +68,9 @@ class LifecycleHookSpec extends JSDomSpec { Continue } - val node = div(child <-- Observable(span(onDestroy --> sink), div("Hasdasd"))) + val node = sink.flatMap { sink => + div(child <-- Observable(span(onDestroy --> sink), div("Hasdasd"))) + } switch shouldBe false @@ -85,7 +93,11 @@ class LifecycleHookSpec extends JSDomSpec { Continue } - val node = div(child <-- Observable(span(onDestroy --> sink)(onDestroy --> sink2), div("Hasdasd"))) + val node = for { + sink <- sink + sink2 <- sink2 + node <- div(child <-- Observable(span(onDestroy --> sink)(onDestroy --> sink2), div("Hasdasd"))) + } yield node switch shouldBe false switch2 shouldBe false @@ -109,7 +121,11 @@ class LifecycleHookSpec extends JSDomSpec { } val message = PublishSubject[String] - val node = div(child <-- message, onUpdate --> sink1)(onUpdate --> sink2) + val node = for { + sink1 <- sink1 + sink2 <- sink2 + node <- div(child <-- message, onUpdate --> sink1)(onUpdate --> sink2) + } yield node OutWatch.renderInto("#app", node).unsafeRunSync() switch1 shouldBe false @@ -129,7 +145,9 @@ class LifecycleHookSpec extends JSDomSpec { Continue } - val node = div(child <-- Observable(span(onUpdate --> sink, "Hello"), span(onUpdate --> sink, "Hey"))) + val node = sink.flatMap { sink => + div(child <-- Observable(span(onUpdate --> sink, "Hello"), span(onUpdate --> sink, "Hey"))) + } switch shouldBe false @@ -147,7 +165,9 @@ class LifecycleHookSpec extends JSDomSpec { Continue } - val node = div(child <-- Observable(span("Hello")), span(attributes.key := "1", onPrePatch --> sink, "Hey")) + val node = sink.flatMap { sink => + div(child <-- Observable(span("Hello")), span(attributes.key := "1", onPrePatch --> sink, "Hey")) + } switch shouldBe false @@ -168,7 +188,11 @@ class LifecycleHookSpec extends JSDomSpec { Continue } val message = PublishSubject[String]() - val node = div(child <-- message, onPrePatch --> sink1)(onPrePatch --> sink2) + val node = for { + sink1 <- sink1 + sink2 <- sink2 + node <- div(child <-- message, onPrePatch --> sink1)(onPrePatch --> sink2) + } yield node OutWatch.renderInto("#app", node).unsafeRunSync() switch1 shouldBe false @@ -188,7 +212,9 @@ class LifecycleHookSpec extends JSDomSpec { Continue } - val node = div(child <-- Observable("message"), onPostPatch --> sink, "Hey") + val node = sink.flatMap { sink => + div(child <-- Observable("message"), onPostPatch --> sink, "Hey") + } switch shouldBe false @@ -210,7 +236,11 @@ class LifecycleHookSpec extends JSDomSpec { Continue } val message = PublishSubject[String]() - val node = div(child <-- message, onPostPatch --> sink1)(onPostPatch --> sink2) + val node = for { + sink1 <- sink1 + sink2 <- sink2 + node <- div(child <-- message, onPostPatch --> sink1)(onPostPatch --> sink2) + } yield node OutWatch.renderInto("#app", node).unsafeRunSync() switch1 shouldBe false @@ -248,13 +278,20 @@ class LifecycleHookSpec extends JSDomSpec { } val message = PublishSubject[String]() - val node = div(child <-- message, - onInsert --> insertSink, - onPrePatch --> prepatchSink, - onUpdate --> updateSink, - onPostPatch --> postpatchSink, - onDestroy --> destroySink - ) + val node = for { + insertSink <- insertSink + updateSink <- updateSink + destroySink <- destroySink + prepatchSink <- prepatchSink + postpatchSink <- postpatchSink + node <- div(child <-- message, + onInsert --> insertSink, + onPrePatch --> prepatchSink, + onUpdate --> updateSink, + onPostPatch --> postpatchSink, + onDestroy --> destroySink + ) + } yield node hooks shouldBe empty @@ -279,10 +316,14 @@ class LifecycleHookSpec extends JSDomSpec { } val messageList = PublishSubject[Seq[String]]() - val node = div("Hello", children <-- messageList.map(_.map(span(_))), - onInsert --> insertSink, - onUpdate --> updateSink - ) + val node = for { + insertSink <- insertSink + updateSink <- updateSink + node <- div("Hello", children <-- messageList.map(_.map(span(_))), + onInsert --> insertSink, + onUpdate --> updateSink + ) + } yield node hooks shouldBe empty @@ -307,9 +348,14 @@ class LifecycleHookSpec extends JSDomSpec { } val message = PublishSubject[String]() - val node = div(span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink), - child <-- message.map(span(_)) - ) + val node = for { + insertSink <- insertSink + updateSink <- updateSink + destroySink <- destroySink + node <- div(span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink), + child <-- message.map(span(_)) + ) + } yield node hooks shouldBe empty @@ -336,9 +382,14 @@ class LifecycleHookSpec extends JSDomSpec { } val messageList = PublishSubject[Seq[String]]() - val node = div(children <-- messageList.map(_.map(span(_))), - span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink) - ) + val node = for { + insertSink <- insertSink + updateSink <- updateSink + destroySink <- destroySink + node <- div(children <-- messageList.map(_.map(span(_))), + span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink) + ) + } yield node hooks shouldBe empty @@ -364,9 +415,11 @@ class LifecycleHookSpec extends JSDomSpec { val sub = PublishSubject[String] - val node = div(child <-- nodes.startWith(Seq( - span(managed(sink <-- sub)) - ))) + val node = sink.flatMap { sink => + div(child <-- nodes.startWith(Seq( + span(managed(sink <-- sub)) + ))) + } OutWatch.renderInto("#app", node).unsafeRunSync() @@ -393,10 +446,12 @@ class LifecycleHookSpec extends JSDomSpec { val divTagName = onInsert.map(_.tagName.toLowerCase).filter(_ == "div") - val node = div(onInsert("insert") --> sink, - div(divTagName --> sink), - span(divTagName --> sink) - ) + val node = sink.flatMap { sink => + div(onInsert("insert") --> sink, + div(divTagName --> sink), + span(divTagName --> sink) + ) + } OutWatch.renderInto("#app", node).unsafeRunSync() diff --git a/src/test/scala/outwatch/OutWatchDomSpec.scala b/src/test/scala/outwatch/OutWatchDomSpec.scala index d3acd56a1..7c497a44a 100644 --- a/src/test/scala/outwatch/OutWatchDomSpec.scala +++ b/src/test/scala/outwatch/OutWatchDomSpec.scala @@ -51,7 +51,7 @@ class OutWatchDomSpec extends JSDomSpec { Seq( div(), attributes.`class` := "blue", - attributes.onClick(1) --> Sink.create[Int](_ => Continue), + attributes.onClick(1) --> Sink.create[Int](_ => Continue).unsafeRunSync(), attributes.hidden <-- Observable(false) ).map(_.unsafeRunSync()) ),