Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap Sink.create in IO #165

Merged
merged 1 commit into from
Jan 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/scala/outwatch/SideEffects.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import monix.execution.Ack.Continue
import monix.execution.Scheduler

trait SideEffects {
def sideEffect[T](f: T => Unit)(implicit s: Scheduler): Sink[T] = Sink.create[T] { e => f(e); Continue }(s)
def sideEffect[S, T](f: (S, T) => Unit)(implicit s: Scheduler): Sink[(S, T)] = Sink.create[(S, T)] { e => f(e._1, e._2); Continue }(s)
def sideEffect(f: => Unit)(implicit s: Scheduler): Sink[Any] = Sink.create[Any] { e => f; Continue }(s)
def sideEffect[T](f: T => Unit)(implicit s: Scheduler): Sink[T] = Sink.create[T] { e => f(e); Continue }(s).unsafeRunSync()
def sideEffect[S, T](f: (S, T) => Unit)(implicit s: Scheduler): Sink[(S, T)] = Sink.create[(S, T)] { e => f(e._1, e._2); Continue }(s).unsafeRunSync()
def sideEffect(f: => Unit)(implicit s: Scheduler): Sink[Any] = Sink.create[Any] { e => f; Continue }(s).unsafeRunSync()
}
19 changes: 10 additions & 9 deletions src/main/scala/outwatch/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,16 @@ object Sink {
def create[T](next: T => Future[Ack],
error: Throwable => Unit = _ => (),
complete: () => Unit = () => ()
)(implicit s: Scheduler): Sink[T] = {
val sink = ObserverSink(
new Observer[T] {
override def onNext(t: T): Future[Ack] = next(t)
override def onError(ex: Throwable): Unit = error(ex)
override def onComplete(): Unit = complete()
}
)
sink
)(implicit s: Scheduler): IO[Sink[T]] = {
IO{
ObserverSink(
new Observer[T] {
override def onNext(t: T): Future[Ack] = next(t)
override def onError(ex: Throwable): Unit = error(ex)
override def onComplete(): Unit = complete()
}
)
}
}


Expand Down
9 changes: 5 additions & 4 deletions src/main/scala/outwatch/dom/ManagedSubscriptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,28 @@ import cats.effect.IO
import monix.execution.Ack.Continue
import monix.execution.cancelables.CompositeCancelable
import monix.execution.{Cancelable, Scheduler}
import org.scalajs.dom
import outwatch.dom.dsl.attributes.lifecycle

trait ManagedSubscriptions {

def managed(subscription: IO[Cancelable])(implicit s: Scheduler): VDomModifier = {
subscription.flatMap { sub: Cancelable =>
lifecycle.onDestroy --> Sink.create{_ =>
Sink.create[dom.Element] { _ =>
sub.cancel()
Continue
}
}.flatMap( sink => lifecycle.onDestroy --> sink)
}
}

def managed(sub1: IO[Cancelable], sub2: IO[Cancelable], subscriptions: IO[Cancelable]*)(implicit s: Scheduler): VDomModifier = {

(sub1 :: sub2 :: subscriptions.toList).sequence.flatMap { subs: Seq[Cancelable] =>
val composite = CompositeCancelable(subs: _*)
lifecycle.onDestroy --> Sink.create{_ =>
Sink.create[dom.Element]{ _ =>
composite.cancel()
Continue
}
}.flatMap(sink => lifecycle.onDestroy --> sink)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/outwatch/util/WebSocket.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package outwatch.util

import cats.effect.IO
import monix.execution.Ack.Continue
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.OverflowStrategy.Unbounded
Expand All @@ -8,7 +9,7 @@ import outwatch.Sink
import outwatch.dom.Observable

object WebSocket {
implicit def toSink(socket: WebSocket): Sink[String] = socket.sink
implicit def toSink(socket: WebSocket): IO[Sink[String]] = socket.sink
implicit def toSource(socket: WebSocket): Observable[MessageEvent] = socket.source
}

Expand Down
123 changes: 89 additions & 34 deletions src/test/scala/outwatch/LifecycleHookSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(onInsert --> sink)
val node = sink.flatMap { sink =>
div(onInsert --> sink)
}

switch shouldBe false

Expand All @@ -41,7 +43,11 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(onInsert --> sink)(onInsert --> sink2)
val node = for {
sink <- sink
sink2 <- sink2
node <- div(onInsert --> sink)(onInsert --> sink2)
} yield node

switch shouldBe false
switch2 shouldBe false
Expand All @@ -62,7 +68,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable(span(onDestroy --> sink), div("Hasdasd")))
val node = sink.flatMap { sink =>
div(child <-- Observable(span(onDestroy --> sink), div("Hasdasd")))
}

switch shouldBe false

Expand All @@ -85,7 +93,11 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable(span(onDestroy --> sink)(onDestroy --> sink2), div("Hasdasd")))
val node = for {
sink <- sink
sink2 <- sink2
node <- div(child <-- Observable(span(onDestroy --> sink)(onDestroy --> sink2), div("Hasdasd")))
} yield node

switch shouldBe false
switch2 shouldBe false
Expand All @@ -109,7 +121,11 @@ class LifecycleHookSpec extends JSDomSpec {
}

val message = PublishSubject[String]
val node = div(child <-- message, onUpdate --> sink1)(onUpdate --> sink2)
val node = for {
sink1 <- sink1
sink2 <- sink2
node <- div(child <-- message, onUpdate --> sink1)(onUpdate --> sink2)
} yield node

OutWatch.renderInto("#app", node).unsafeRunSync()
switch1 shouldBe false
Expand All @@ -129,7 +145,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable(span(onUpdate --> sink, "Hello"), span(onUpdate --> sink, "Hey")))
val node = sink.flatMap { sink =>
div(child <-- Observable(span(onUpdate --> sink, "Hello"), span(onUpdate --> sink, "Hey")))
}

switch shouldBe false

Expand All @@ -147,7 +165,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable(span("Hello")), span(attributes.key := "1", onPrePatch --> sink, "Hey"))
val node = sink.flatMap { sink =>
div(child <-- Observable(span("Hello")), span(attributes.key := "1", onPrePatch --> sink, "Hey"))
}

switch shouldBe false

Expand All @@ -168,7 +188,11 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}
val message = PublishSubject[String]()
val node = div(child <-- message, onPrePatch --> sink1)(onPrePatch --> sink2)
val node = for {
sink1 <- sink1
sink2 <- sink2
node <- div(child <-- message, onPrePatch --> sink1)(onPrePatch --> sink2)
} yield node

OutWatch.renderInto("#app", node).unsafeRunSync()
switch1 shouldBe false
Expand All @@ -188,7 +212,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable("message"), onPostPatch --> sink, "Hey")
val node = sink.flatMap { sink =>
div(child <-- Observable("message"), onPostPatch --> sink, "Hey")
}

switch shouldBe false

Expand All @@ -210,7 +236,11 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}
val message = PublishSubject[String]()
val node = div(child <-- message, onPostPatch --> sink1)(onPostPatch --> sink2)
val node = for {
sink1 <- sink1
sink2 <- sink2
node <- div(child <-- message, onPostPatch --> sink1)(onPostPatch --> sink2)
} yield node

OutWatch.renderInto("#app", node).unsafeRunSync()
switch1 shouldBe false
Expand Down Expand Up @@ -248,13 +278,20 @@ class LifecycleHookSpec extends JSDomSpec {
}

val message = PublishSubject[String]()
val node = div(child <-- message,
onInsert --> insertSink,
onPrePatch --> prepatchSink,
onUpdate --> updateSink,
onPostPatch --> postpatchSink,
onDestroy --> destroySink
)
val node = for {
insertSink <- insertSink
updateSink <- updateSink
destroySink <- destroySink
prepatchSink <- prepatchSink
postpatchSink <- postpatchSink
node <- div(child <-- message,
onInsert --> insertSink,
onPrePatch --> prepatchSink,
onUpdate --> updateSink,
onPostPatch --> postpatchSink,
onDestroy --> destroySink
)
} yield node

hooks shouldBe empty

Expand All @@ -279,10 +316,14 @@ class LifecycleHookSpec extends JSDomSpec {
}

val messageList = PublishSubject[Seq[String]]()
val node = div("Hello", children <-- messageList.map(_.map(span(_))),
onInsert --> insertSink,
onUpdate --> updateSink
)
val node = for {
insertSink <- insertSink
updateSink <- updateSink
node <- div("Hello", children <-- messageList.map(_.map(span(_))),
onInsert --> insertSink,
onUpdate --> updateSink
)
} yield node

hooks shouldBe empty

Expand All @@ -307,9 +348,14 @@ class LifecycleHookSpec extends JSDomSpec {
}

val message = PublishSubject[String]()
val node = div(span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink),
child <-- message.map(span(_))
)
val node = for {
insertSink <- insertSink
updateSink <- updateSink
destroySink <- destroySink
node <- div(span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink),
child <-- message.map(span(_))
)
} yield node

hooks shouldBe empty

Expand All @@ -336,9 +382,14 @@ class LifecycleHookSpec extends JSDomSpec {
}

val messageList = PublishSubject[Seq[String]]()
val node = div(children <-- messageList.map(_.map(span(_))),
span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink)
)
val node = for {
insertSink <- insertSink
updateSink <- updateSink
destroySink <- destroySink
node <- div(children <-- messageList.map(_.map(span(_))),
span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink)
)
} yield node

hooks shouldBe empty

Expand All @@ -364,9 +415,11 @@ class LifecycleHookSpec extends JSDomSpec {

val sub = PublishSubject[String]

val node = div(child <-- nodes.startWith(Seq(
span(managed(sink <-- sub))
)))
val node = sink.flatMap { sink =>
div(child <-- nodes.startWith(Seq(
span(managed(sink <-- sub))
)))
}

OutWatch.renderInto("#app", node).unsafeRunSync()

Expand All @@ -393,10 +446,12 @@ class LifecycleHookSpec extends JSDomSpec {

val divTagName = onInsert.map(_.tagName.toLowerCase).filter(_ == "div")

val node = div(onInsert("insert") --> sink,
div(divTagName --> sink),
span(divTagName --> sink)
)
val node = sink.flatMap { sink =>
div(onInsert("insert") --> sink,
div(divTagName --> sink),
span(divTagName --> sink)
)
}

OutWatch.renderInto("#app", node).unsafeRunSync()

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/outwatch/OutWatchDomSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OutWatchDomSpec extends JSDomSpec {
Seq(
div(),
attributes.`class` := "blue",
attributes.onClick(1) --> Sink.create[Int](_ => Continue),
attributes.onClick(1) --> Sink.create[Int](_ => Continue).unsafeRunSync(),
attributes.hidden <-- Observable(false)
).map(_.unsafeRunSync())
),
Expand Down