Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handler/Sink refactor, added Pipe trait #91

Merged
merged 12 commits into from
Nov 21, 2017
44 changes: 44 additions & 0 deletions src/main/scala/outwatch/Pipe.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package outwatch

import rxscalajs.Observable

import scala.language.implicitConversions

trait Pipe[-I, +O] {
def sink: Sink[I]

def source: Observable[O]

def mapSink[I2](f: I2 => I): Pipe[I2, O] = Pipe(sink.redirectMap(f), source)

def collectSink[I2](f: PartialFunction[I2, I]): Pipe[I2, O] = Pipe(sink.redirect(_.collect(f)), source)

def mapSource[O2](f: O => O2): Pipe[I, O2] = Pipe(sink, source.map(f))

def collectSource[O2](f: PartialFunction[O, O2]): Pipe[I, O2] = Pipe(sink, source.collect(f))

def filterSource(f: O => Boolean): Pipe[I, O] = Pipe(sink, source.filter(f))

def mapPipe[I2, O2](f: I2 => I)(g: O => O2): Pipe[I2, O2] = Pipe(sink.redirectMap(f), source.map(g))

def collectPipe[I2, O2](f: PartialFunction[I2, I])(g: PartialFunction[O, O2]): Pipe[I2, O2] = Pipe(
sink.redirect(_.collect(f)), source.collect(g)
)
}

object Pipe {
implicit def toSink[I](pipe: Pipe[I, _]): Sink[I] = pipe.sink
implicit def toSource[O](pipe: Pipe[_, O]): Observable[O] = pipe.source

private[outwatch] def apply[I, O](_sink: Sink[I], _source: Observable[O]): Pipe[I, O] = new Pipe[I, O] {
def sink: Sink[I] = _sink
def source: Observable[O] = _source
}

implicit class FilterPipe[I, +O](pipe: Pipe[I, O]) {

def filterSink(f: I => Boolean): Pipe[I, O] = Pipe(pipe.sink.redirect(_.filter(f)), pipe.source)
}
}


43 changes: 8 additions & 35 deletions src/main/scala/outwatch/Sink.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package outwatch

import cats.effect.IO
import outwatch.dom.Handler
import rxscalajs.facade.SubjectFacade
import rxscalajs.subscription.Subscription
import rxscalajs.{Observable, Observer, Subject}
Expand Down Expand Up @@ -47,13 +46,10 @@ sealed trait Sink[-T] extends Any {
}

object Sink {
private final case class SubjectSink[T]() extends Subject[T](new SubjectFacade) with Sink[T] {

private[outwatch] final case class SubjectSink[T]() extends Subject[T](new SubjectFacade) with Sink[T] {
override private[outwatch] def observer = this
}
private final case class ObservableSink[T]
(oldSink: Sink[T], stream: Observable[T]) extends Observable[T](stream.inner) with Sink[T] {
override private[outwatch] def observer = oldSink.observer
}

/**
* Creates a new Sink from Scratch.
Expand All @@ -78,34 +74,11 @@ object Sink {
sink
}

/**
* Creates a Handler that is both Observable and Sink.
* An Observable with Sink is an Observable that can also receive Events, i.e. it’s both a Source and a Sink of events.
* If you’re familiar with Rx, they’re very similar to Subjects.
* This function also allows you to create initial values for your newly created Handler.
* This is equivalent to calling `startWithMany` with the given values.
* @param seeds a sequence of initial values that the Handler will emit.
* @tparam T the type parameter of the elements
* @return the newly created Handler.
*/
def createHandler[T](seeds: T*): IO[Handler[T]] = IO {
val handler = new SubjectSink[T]

if (seeds.nonEmpty) {
ObservableSink[T](handler, handler.startWithMany(seeds: _*))
}
else {
handler
}
}


private def completionObservable[T](sink: Sink[T]): Option[Observable[Unit]] = {
sink match {
case subject@SubjectSink() =>
Some(subject.ignoreElements.defaultIfEmpty(()))
case observable@ObservableSink(_, _) =>
Some(observable.ignoreElements.defaultIfEmpty(()))
case ObserverSink(_) =>
None
}
Expand All @@ -122,7 +95,7 @@ object Sink {
* @return the resulting sink, that will forward the values
*/
def redirect[T,R](sink: Sink[T])(project: Observable[R] => Observable[T]): Sink[R] = {
val forward = Sink.createHandler[R]().unsafeRunSync()
val forward = SubjectSink[R]()

completionObservable(sink)
.fold(project(forward))(completed => project(forward).takeUntil(completed))
Expand All @@ -144,8 +117,8 @@ object Sink {
* @return the two resulting sinks, that will forward the values
*/
def redirect2[T,U,R](sink: Sink[T])(project: (Observable[R], Observable[U]) => Observable[T]): (Sink[R], Sink[U]) = {
val r = Sink.createHandler[R]().unsafeRunSync()
val u = Sink.createHandler[U]().unsafeRunSync()
val r = SubjectSink[R]()
val u = SubjectSink[U]()

completionObservable(sink)
.fold(project(r, u))(completed => project(r, u).takeUntil(completed))
Expand All @@ -170,9 +143,9 @@ object Sink {
def redirect3[T,U,V,R](sink: Sink[T])
(project: (Observable[R], Observable[U], Observable[V]) => Observable[T])
:(Sink[R], Sink[U], Sink[V]) = {
val r = Sink.createHandler[R]().unsafeRunSync()
val u = Sink.createHandler[U]().unsafeRunSync()
val v = Sink.createHandler[V]().unsafeRunSync()
val r = SubjectSink[R]()
val u = SubjectSink[U]()
val v = SubjectSink[V]()

completionObservable(sink)
.fold(project(r, u, v))(completed => project(r, u, v).takeUntil(completed))
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/outwatch/advanced/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package outwatch

import rxscalajs.Observable

package object advanced {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need an advanced package, or how these methods could be unsafe. I find these methods especially useful to write correct, bug-free code.

Do you have an example of an unsafe usage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safe as long the original Observable is transformed, but a different Observable could be returned.

For example, let's say one would like to emit two elements every time a button is clicked and writes this (incorrect) code:

Handler.create[Int] { handler =>
  val many = handler.transformSink[MouseEvent](_ => Observable.of(1,2))
  button(click --> many)
}

However, this doesn't work as expected because the handler's observable is closed immediately on creation and the click handler doesn't do anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is that unsafe? The whole observable is replaced by another one. We cannot protect the user to use the wrong operators. Am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safe was not the correct term to use. What I was trying to say is that in the above example the button click doesn't do anything because the observable is completed after emitting (1, 2) , which is probably not what is intended, instead it should have been:

Handler.create[Int] { handler =>
  val many = handler.transformSink[MouseEvent](_.concatMap(Observable.of(1,2)))
  button(click --> many)
}

Often the transform... methods will require the use of higher order observables. There's nothing wrong with that, but they are easier to use incorrectly, which is why I placed them under the outwatch.advanced._ import. It also aligns with one of the Outwatch goal: "Removing or restricting the need for Higher Order Observables".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But aren't we already importing higher-order Observables from https://lukajcb.github.io/rxscala-js/latest/api/rxscalajs/Observable.html?

@LukaJCB ?

Copy link
Collaborator Author

@mariusmuja mariusmuja Nov 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've actually changed my mind. We should place the transform... methods in the HandlerOps trait. They are no more "advanced" than Sink.redirect, which are available to the user.


implicit class TransformPipe[-I, +O](pipe: Pipe[I, O]) {

def transformSink[I2](f: Observable[I2] => Observable[I]): Pipe[I2, O] = Pipe(pipe.sink.redirect(f), pipe.source)

def transformSource[O2](f: Observable[O] => Observable[O2]): Pipe[I, O2] = Pipe(pipe.sink, f(pipe.source))

def transformPipe[I2, O2](f: Observable[I2] => Observable[I])(g: Observable[O] => Observable[O2]): Pipe[I2, O2] =
Pipe(pipe.sink.redirect(f), g(pipe.source))
}

}
7 changes: 2 additions & 5 deletions src/main/scala/outwatch/dom/Handlers.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package outwatch.dom

import cats.effect.IO
import org.scalajs.dom.MouseEvent
import org.scalajs.dom.raw.{ClipboardEvent, DragEvent, KeyboardEvent}
import outwatch.Sink
import outwatch.dom.helpers.InputEvent
import rxscalajs.Observable
import cats.effect.IO

/**
* Trait containing event handlers, so they can be mixed in to other objects if needed.
*/
trait Handlers {
type Handler[A] = Observable[A] with Sink[A]

def createInputHandler() = createHandler[InputEvent]()
def createMouseHandler() = createHandler[MouseEvent]()
Expand All @@ -22,7 +19,7 @@ trait Handlers {
def createBoolHandler(defaultValues: Boolean*) = createHandler[Boolean](defaultValues: _*)
def createNumberHandler(defaultValues: Double*) = createHandler[Double](defaultValues: _*)

def createHandler[T](defaultValues: T*): IO[Handler[T]] = Sink.createHandler[T](defaultValues: _*)
def createHandler[T](defaultValues: T*): IO[Handler[T]] = Handler.create[T](defaultValues: _*)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, this was there before. but why have two methods for creating a handler? This is Handler.create and createHandler.

Copy link
Collaborator Author

@mariusmuja mariusmuja Nov 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added Handler.create because it think the place to create a handler should be in Handler (somewhat analogous to Sink.create, Observable.create...). I kept Handlers.createHandler for backwards compatibility, maybe it should be deprecated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Deprecating the old one sounds like a good idea. What about the createNumberHandler, createBoolHandler, ...? Not sure whether, they really add value or whether something like Handler.create[Boolean] is good enough.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think something like Handler.create[Boolean] is good enough. The ones that add some value is the ones taking event types, such as createInputHandler, createMouseHandler as one doesn't need to import the respective events.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add them to Handler then? Makes them more consistent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think prefer Handler completely generic. Plus Handler is in the outwatch package (not in outwatch.dom package), so it shouldn't "know" about dom types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right!

}

object Handlers extends Handlers
2 changes: 1 addition & 1 deletion src/main/scala/outwatch/dom/VDomModifier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.IO
import org.scalajs.dom._
import outwatch.dom.helpers.DomUtils
import scala.scalajs.js.|
import rxscalajs.{Observable, Observer}
import rxscalajs.Observer
import snabbdom.{VNodeProxy, h}

import scala.scalajs.js
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/outwatch/dom/helpers/EmitterBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package outwatch.dom.helpers

import cats.effect.IO
import org.scalajs.dom._
import outwatch.Sink
import outwatch.dom.{DestroyHook, Emitter, Hook, InsertHook, UpdateHook}
import outwatch.{Pipe, Sink}
import rxscalajs.Observable


Expand All @@ -13,6 +13,8 @@ trait EmitterBuilder[E <: Event, O] extends Any {

def apply[T](value: T): TransformingEmitterBuilder[E, T] = map(_ => value)

def apply[T](latest: Pipe[_, T]): TransformingEmitterBuilder[E, T] = apply(latest.source)

def apply[T](latest: Observable[T]): TransformingEmitterBuilder[E, T] = transform(_.withLatestFromWith(latest)((_, u) => u))

@deprecated("Deprecated, use '.map' instead", "0.11.0")
Expand Down
9 changes: 9 additions & 0 deletions src/main/scala/outwatch/dom/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ package object dom extends Attributes with Tags with Handlers {
type VNode = IO[VNode_]
type VDomModifier = IO[VDomModifier_]

type Observable[+A] = rxscalajs.Observable[A]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the direction of outwatch becoming frp-backend agnostic. 👍

val Observable = rxscalajs.Observable

type Sink[-A] = outwatch.Sink[A]
val Sink = outwatch.Sink

type Handler[A] = outwatch.Handler[A]
val Handler = outwatch.Handler

implicit def stringNode(string: String): VDomModifier = IO.pure(StringNode(string))

implicit def optionIsEmptyModifier(opt: Option[VDomModifier]): VDomModifier = opt getOrElse IO.pure(EmptyVDomModifier)
Expand Down
35 changes: 35 additions & 0 deletions src/main/scala/outwatch/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import cats.effect.IO
import outwatch.Sink.SubjectSink
import outwatch.advanced._

package object outwatch {

type >-->[-I, +O] = Pipe[I, O]
val >--> = Pipe

type Handler[A] = Pipe[A, A]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, we can call mapSink on a Handler and get back a Pipe[A, B] not sure if that makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see what you mean. How do you feel about changing Handler to have two type parameters? I shouldn't break almost any client code, Handler type is not typically used much in client code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I think the solution below is probably the best :)


object Handler {

/**
* This function also allows you to create initial values for your newly created Handler.
* This is equivalent to calling `startWithMany` with the given values.
* @param seeds a sequence of initial values that the Handler will emit.
* @tparam T the type parameter of the elements
* @return the newly created Handler.
*/
def create[T](seeds: T*): IO[Handler[T]] = create[T].map { handler =>
if (seeds.nonEmpty) {
handler.transformSource(_.startWithMany(seeds: _*))
}
else {
handler
}
}

def create[T]: IO[Handler[T]] = IO {
val sink = SubjectSink[T]()
Pipe(sink, sink)
}
}
}
12 changes: 4 additions & 8 deletions src/main/scala/outwatch/util/Store.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package outwatch.util

import cats.effect.IO
import outwatch.Sink
import outwatch.{Handler, Pipe, Sink}
import outwatch.dom._
import outwatch.dom.helpers.STRef
import rxscalajs.Observable
import rxscalajs.subscription.Subscription

import scala.language.implicitConversions


final case class Store[State, Action](initialState: State,
reducer: (State, Action) => (State, Option[IO[Action]]),
handler: Observable[Action] with Sink[Action]) {
handler: Handler[Action]) extends Pipe[Action, State] {
val sink: Sink[Action] = handler
val source: Observable[State] = handler
.scan(initialState)(fold)
Expand All @@ -35,19 +33,17 @@ final case class Store[State, Action](initialState: State,
}

object Store {
implicit def toSink[Action](store: Store[_, Action]): Sink[Action] = store.sink
implicit def toSource[State](store: Store[State, _]): Observable[State] = store.source

private val storeRef = STRef.empty

def renderWithStore[S, A](initialState: S, reducer: (S, A) => (S, Option[IO[A]]), selector: String, root: VNode): IO[Unit] = for {
handler <- createHandler[A]()
handler <- Handler.create[A]()
store <- IO(Store(initialState, reducer, handler))
_ <- storeRef.asInstanceOf[STRef[Store[S, A]]].put(store)
_ <- OutWatch.render(selector, root)
} yield ()

def getStore[S, A]: IO[Store[S, A]] =
def getStore[S, A]: IO[Store[S, A]] =
storeRef.asInstanceOf[STRef[Store[S, A]]].getOrThrow(NoStoreException)

private object NoStoreException extends
Expand Down
8 changes: 4 additions & 4 deletions src/test/scala/outwatch/DomEventSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.scalajs.dom._
import org.scalajs.dom.raw.{HTMLInputElement, MouseEvent}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.prop.PropertyChecks
import rxscalajs.{Observable, Subject}
import rxscalajs.Subject

class DomEventSpec extends UnitSpec with BeforeAndAfterEach with PropertyChecks {

Expand Down Expand Up @@ -87,7 +87,7 @@ class DomEventSpec extends UnitSpec with BeforeAndAfterEach with PropertyChecks
document.getElementById("child").innerHTML shouldBe ""

val firstMessage = "First"
messages.asInstanceOf[Subject[String]].next(firstMessage)
messages.sink.observer.next(firstMessage)

val event = document.createEvent("Events")
event.initEvent("click", canBubbleArg = true, cancelableArg = false)
Expand All @@ -101,7 +101,7 @@ class DomEventSpec extends UnitSpec with BeforeAndAfterEach with PropertyChecks
document.getElementById("child").innerHTML shouldBe firstMessage

val secondMessage = "Second"
messages.asInstanceOf[Subject[String]].next(secondMessage)
messages.sink.observer.next(secondMessage)

document.getElementById("click").dispatchEvent(event)

Expand Down Expand Up @@ -296,7 +296,7 @@ class DomEventSpec extends UnitSpec with BeforeAndAfterEach with PropertyChecks
val node = createBoolHandler().flatMap { stream =>
div(
button(id := "input", tpe := "checkbox", click(true) --> stream),
span(id := "toggled", stream ?= (className := someClass))
span(id := "toggled", stream.source ?= (className := someClass))
)
}

Expand Down