Skip to content

Commit

Permalink
Have Sink.create return IO
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusmuja committed Jan 22, 2018
1 parent af73639 commit e279219
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 52 deletions.
6 changes: 3 additions & 3 deletions src/main/scala/outwatch/SideEffects.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import monix.execution.Ack.Continue
import monix.execution.Scheduler

trait SideEffects {
def sideEffect[T](f: T => Unit)(implicit s: Scheduler): Sink[T] = Sink.create[T] { e => f(e); Continue }(s)
def sideEffect[S, T](f: (S, T) => Unit)(implicit s: Scheduler): Sink[(S, T)] = Sink.create[(S, T)] { e => f(e._1, e._2); Continue }(s)
def sideEffect(f: => Unit)(implicit s: Scheduler): Sink[Any] = Sink.create[Any] { e => f; Continue }(s)
def sideEffect[T](f: T => Unit)(implicit s: Scheduler): Sink[T] = Sink.create[T] { e => f(e); Continue }(s).unsafeRunSync()
def sideEffect[S, T](f: (S, T) => Unit)(implicit s: Scheduler): Sink[(S, T)] = Sink.create[(S, T)] { e => f(e._1, e._2); Continue }(s).unsafeRunSync()
def sideEffect(f: => Unit)(implicit s: Scheduler): Sink[Any] = Sink.create[Any] { e => f; Continue }(s).unsafeRunSync()
}
19 changes: 10 additions & 9 deletions src/main/scala/outwatch/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,16 @@ object Sink {
def create[T](next: T => Future[Ack],
error: Throwable => Unit = _ => (),
complete: () => Unit = () => ()
)(implicit s: Scheduler): Sink[T] = {
val sink = ObserverSink(
new Observer[T] {
override def onNext(t: T): Future[Ack] = next(t)
override def onError(ex: Throwable): Unit = error(ex)
override def onComplete(): Unit = complete()
}
)
sink
)(implicit s: Scheduler): IO[Sink[T]] = {
IO{
ObserverSink(
new Observer[T] {
override def onNext(t: T): Future[Ack] = next(t)
override def onError(ex: Throwable): Unit = error(ex)
override def onComplete(): Unit = complete()
}
)
}
}


Expand Down
9 changes: 5 additions & 4 deletions src/main/scala/outwatch/dom/ManagedSubscriptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,28 @@ import cats.effect.IO
import monix.execution.Ack.Continue
import monix.execution.cancelables.CompositeCancelable
import monix.execution.{Cancelable, Scheduler}
import org.scalajs.dom
import outwatch.dom.dsl.attributes.lifecycle

trait ManagedSubscriptions {

def managed(subscription: IO[Cancelable])(implicit s: Scheduler): VDomModifier = {
subscription.flatMap { sub: Cancelable =>
lifecycle.onDestroy --> Sink.create{_ =>
Sink.create[dom.Element] { _ =>
sub.cancel()
Continue
}
}.flatMap( sink => lifecycle.onDestroy --> sink)
}
}

def managed(sub1: IO[Cancelable], sub2: IO[Cancelable], subscriptions: IO[Cancelable]*)(implicit s: Scheduler): VDomModifier = {

(sub1 :: sub2 :: subscriptions.toList).sequence.flatMap { subs: Seq[Cancelable] =>
val composite = CompositeCancelable(subs: _*)
lifecycle.onDestroy --> Sink.create{_ =>
Sink.create[dom.Element]{ _ =>
composite.cancel()
Continue
}
}.flatMap(sink => lifecycle.onDestroy --> sink)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/outwatch/util/WebSocket.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package outwatch.util

import cats.effect.IO
import monix.execution.Ack.Continue
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.Observable
Expand All @@ -8,7 +9,7 @@ import org.scalajs.dom.{CloseEvent, ErrorEvent, MessageEvent}
import outwatch.Sink

object WebSocket {
implicit def toSink(socket: WebSocket): Sink[String] = socket.sink
implicit def toSink(socket: WebSocket): IO[Sink[String]] = socket.sink
implicit def toSource(socket: WebSocket): Observable[MessageEvent] = socket.source
}

Expand Down
123 changes: 89 additions & 34 deletions src/test/scala/outwatch/LifecycleHookSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(onInsert --> sink)
val node = sink.flatMap { sink =>
div(onInsert --> sink)
}

switch shouldBe false

Expand All @@ -41,7 +43,11 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(onInsert --> sink)(onInsert --> sink2)
val node = for {
sink <- sink
sink2 <- sink2
node <- div(onInsert --> sink)(onInsert --> sink2)
} yield node

switch shouldBe false
switch2 shouldBe false
Expand All @@ -62,7 +68,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable(span(onDestroy --> sink), div("Hasdasd")))
val node = sink.flatMap { sink =>
div(child <-- Observable(span(onDestroy --> sink), div("Hasdasd")))
}

switch shouldBe false

Expand All @@ -85,7 +93,11 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable(span(onDestroy --> sink)(onDestroy --> sink2), div("Hasdasd")))
val node = for {
sink <- sink
sink2 <- sink2
node <- div(child <-- Observable(span(onDestroy --> sink)(onDestroy --> sink2), div("Hasdasd")))
} yield node

switch shouldBe false
switch2 shouldBe false
Expand All @@ -109,7 +121,11 @@ class LifecycleHookSpec extends JSDomSpec {
}

val message = PublishSubject[String]
val node = div(child <-- message, onUpdate --> sink1)(onUpdate --> sink2)
val node = for {
sink1 <- sink1
sink2 <- sink2
node <- div(child <-- message, onUpdate --> sink1)(onUpdate --> sink2)
} yield node

OutWatch.renderInto("#app", node).unsafeRunSync()
switch1 shouldBe false
Expand All @@ -129,7 +145,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable(span(onUpdate --> sink, "Hello"), span(onUpdate --> sink, "Hey")))
val node = sink.flatMap { sink =>
div(child <-- Observable(span(onUpdate --> sink, "Hello"), span(onUpdate --> sink, "Hey")))
}

switch shouldBe false

Expand All @@ -147,7 +165,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable(span("Hello")), span(attributes.key := "1", onPrePatch --> sink, "Hey"))
val node = sink.flatMap { sink =>
div(child <-- Observable(span("Hello")), span(attributes.key := "1", onPrePatch --> sink, "Hey"))
}

switch shouldBe false

Expand All @@ -168,7 +188,11 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}
val message = PublishSubject[String]()
val node = div(child <-- message, onPrePatch --> sink1)(onPrePatch --> sink2)
val node = for {
sink1 <- sink1
sink2 <- sink2
node <- div(child <-- message, onPrePatch --> sink1)(onPrePatch --> sink2)
} yield node

OutWatch.renderInto("#app", node).unsafeRunSync()
switch1 shouldBe false
Expand All @@ -188,7 +212,9 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}

val node = div(child <-- Observable("message"), onPostPatch --> sink, "Hey")
val node = sink.flatMap { sink =>
div(child <-- Observable("message"), onPostPatch --> sink, "Hey")
}

switch shouldBe false

Expand All @@ -210,7 +236,11 @@ class LifecycleHookSpec extends JSDomSpec {
Continue
}
val message = PublishSubject[String]()
val node = div(child <-- message, onPostPatch --> sink1)(onPostPatch --> sink2)
val node = for {
sink1 <- sink1
sink2 <- sink2
node <- div(child <-- message, onPostPatch --> sink1)(onPostPatch --> sink2)
} yield node

OutWatch.renderInto("#app", node).unsafeRunSync()
switch1 shouldBe false
Expand Down Expand Up @@ -248,13 +278,20 @@ class LifecycleHookSpec extends JSDomSpec {
}

val message = PublishSubject[String]()
val node = div(child <-- message,
onInsert --> insertSink,
onPrePatch --> prepatchSink,
onUpdate --> updateSink,
onPostPatch --> postpatchSink,
onDestroy --> destroySink
)
val node = for {
insertSink <- insertSink
updateSink <- updateSink
destroySink <- destroySink
prepatchSink <- prepatchSink
postpatchSink <- postpatchSink
node <- div(child <-- message,
onInsert --> insertSink,
onPrePatch --> prepatchSink,
onUpdate --> updateSink,
onPostPatch --> postpatchSink,
onDestroy --> destroySink
)
} yield node

hooks shouldBe empty

Expand All @@ -279,10 +316,14 @@ class LifecycleHookSpec extends JSDomSpec {
}

val messageList = PublishSubject[Seq[String]]()
val node = div("Hello", children <-- messageList.map(_.map(span(_))),
onInsert --> insertSink,
onUpdate --> updateSink
)
val node = for {
insertSink <- insertSink
updateSink <- updateSink
node <- div("Hello", children <-- messageList.map(_.map(span(_))),
onInsert --> insertSink,
onUpdate --> updateSink
)
} yield node

hooks shouldBe empty

Expand All @@ -307,9 +348,14 @@ class LifecycleHookSpec extends JSDomSpec {
}

val message = PublishSubject[String]()
val node = div(span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink),
child <-- message.map(span(_))
)
val node = for {
insertSink <- insertSink
updateSink <- updateSink
destroySink <- destroySink
node <- div(span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink),
child <-- message.map(span(_))
)
} yield node

hooks shouldBe empty

Expand All @@ -336,9 +382,14 @@ class LifecycleHookSpec extends JSDomSpec {
}

val messageList = PublishSubject[Seq[String]]()
val node = div(children <-- messageList.map(_.map(span(_))),
span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink)
)
val node = for {
insertSink <- insertSink
updateSink <- updateSink
destroySink <- destroySink
node <- div(children <-- messageList.map(_.map(span(_))),
span("Hello", onInsert --> insertSink, onUpdate --> updateSink, onDestroy --> destroySink)
)
} yield node

hooks shouldBe empty

Expand All @@ -364,9 +415,11 @@ class LifecycleHookSpec extends JSDomSpec {

val sub = PublishSubject[String]

val node = div(child <-- nodes.startWith(Seq(
span(managed(sink <-- sub))
)))
val node = sink.flatMap { sink =>
div(child <-- nodes.startWith(Seq(
span(managed(sink <-- sub))
)))
}

OutWatch.renderInto("#app", node).unsafeRunSync()

Expand All @@ -393,10 +446,12 @@ class LifecycleHookSpec extends JSDomSpec {

val divTagName = onInsert.map(_.tagName.toLowerCase).filter(_ == "div")

val node = div(onInsert("insert") --> sink,
div(divTagName --> sink),
span(divTagName --> sink)
)
val node = sink.flatMap { sink =>
div(onInsert("insert") --> sink,
div(divTagName --> sink),
span(divTagName --> sink)
)
}

OutWatch.renderInto("#app", node).unsafeRunSync()

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/outwatch/OutWatchDomSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OutWatchDomSpec extends JSDomSpec {
Seq(
div(),
attributes.`class` := "blue",
attributes.onClick(1) --> Sink.create[Int](_ => Continue),
attributes.onClick(1) --> Sink.create[Int](_ => Continue).unsafeRunSync(),
attributes.hidden <-- Observable(false)
).map(_.unsafeRunSync())
),
Expand Down

0 comments on commit e279219

Please sign in to comment.