-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Monix port #130
Monix port #130
Conversation
830048c
to
fd49a6d
Compare
} | ||
_ <- Task { | ||
list.childElementCount shouldBe 1 | ||
}.executeWithFork |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because Monix uses by default a batched execution model, so we need to "fork" the assertion to the end of the javascript execution queue, otherwise it might execute before all the effects of the event dispatch execute.
fd49a6d
to
eddc673
Compare
eddc673
to
20a2282
Compare
Would it be worth it to port directly to a preliminary Monix 3 Release? |
All that's needed to upgrade to Monix 3 is to update the dependency version, there seem to be no breaking changes in the API used. |
Interesting. Are |
They are! |
attributeReceivers.combineLatest(childStreamReceivers) | ||
attributeReceivers.startWith(Seq(Seq.empty)).combineLatest( | ||
childStreamReceivers.startWith(Seq(Seq.empty)) | ||
).dropWhile { case (a, c) => a.isEmpty && c.isEmpty } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this dropWhile
good for in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without doing startwith(Seq(Seq.empty))
and dropWhile
, the combinedLatest
would emit an element only when both the attributeReceivers
and childStreamReceivers
emit. This makes it emit when either of them emits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the behavior of combineLatest is different in monix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I thought too, but from reading the documentation now, it seems that combineLatest
in rxjs should behave the same as in Monix. Let me look into why I had to do this (or why it's not needed with rxjs/rxscala).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is between the Observable.combineLatest
rxscala companion object method and the Monix Observable.combineLatestList
. The Monix combineLatestList
doesn't emit an empty list if the argument list is empty, rxscala does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the code doesn't do .startWith(Seq(Seq.empty))....dropWhile { case (a, c) => a.isEmpty && c.isEmpty }
any more, instead it emits an Observable(Seq.empty[Attribute])
if the attributeReceivers
list is empty (just like in the rxscala version). There are several tests that fail if we don't do that.
|
||
import scala.concurrent.Future | ||
|
||
|
||
|
||
final case class Store[State, Action](initialState: State, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
STRef
is only used in Store. Is it possible to remove it entirely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
STRef
is used in cases where a global variable with a referential transparent API is wanted (such as the store). It's possible to remove it, but without it the store would need to be passed as an argument to all components that use it.
import scala.scalajs.js | ||
import scala.scalajs.js.typedarray.ArrayBuffer | ||
import scala.scalajs.js.| | ||
|
||
|
||
object Http { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like here is more going on than a simple port. Could you explain what you did here? I'm also not using the Http
module myself, so this should be reviewed by someone who does.
If its too different it should be a separate commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rxscala
has an Observable.ajax
method. Monix doesn't have that, so essentially I had to re-implement that functionality (based on org.scalajs.dom.ext.Ajax
).
@@ -418,8 +419,11 @@ class DomEventSpec extends JSDomSpec { | |||
|
|||
document.getElementById("input").dispatchEvent(inputEvt) | |||
|
|||
winClicked shouldBe true | |||
docClicked shouldBe true | |||
Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain, why this is neccessary here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because Monix uses by default a batched execution model, so we need to "fork" the assertion to the end of the javascript execution queue, otherwise it might execute before all the effects of the event dispatch execute.
src/main/scala/outwatch/Sink.scala
Outdated
import rxscalajs.facade.SubjectFacade | ||
import rxscalajs.subscription.Subscription | ||
import rxscalajs.{Observable, Observer, Subject} | ||
import monix.execution.Scheduler.Implicits.global |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to import the global execution scheduler everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is ok, but it makes the batched execution model the default everywhere. It would be better to have the scheduler an implicit argument where it's needed, but that would have been a PR with more changes vs the rxscala version (which essentially doesn't have an option of picking a different Scheduler).
document.getElementById("plus").dispatchEvent(event) | ||
document.getElementById("counter").innerHTML shouldBe i.toString | ||
} | ||
Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole block looks complicated compared to the old one. Is there a simpler way to write it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The simpler way would be to have the scheduler injected and use one with a synchronous execution model. Then the test code would be identical as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the simplest way would be to have the synchronous scheduler everywhere (then it would be the same execution model as rxjs and the test could have stayed the same). But I don't like that, the batched execution model is a better choice to keep the page responsive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the test fail with the batched execution model?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it was not the batched execution model that was causing the async boundary here, it was the Observable.merge
that uses a buffered subscriber to properly implement the back-pressure protocol. I've replaced it with a simple mergeSync
, so the test stays unchanged.
@mariusmuja do you know how the |
Because of the dead code elimination that Scala.js does, it's difficult to say precisely. But assuming that no other features of Monix are used in addition to what the library (and the tests) are using, the bundled size seems to be smaller. I did a comparison of the test bundle size a while ago and posted it in #96 , here it is: With RxJS:
Monix port:
|
I'm also a bit concerned that the users now have to deal with both Monix Tasks and IO. You said that they are compatible? But could outwatch be used with one of them only? How about IO and the batched execution context? |
f11cfd0
to
1776f41
Compare
Users don't have to deal with both. For example, |
The Client code will require an implicit |
…olinedScheduler in tests where we want synchronous behaviour.
dfee735
to
3a29dd0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I finished porting my project to this branch, and so far its working very well. Thank you for this PR!
@@ -16,16 +18,16 @@ object Pipe { | |||
* @tparam T the type parameter of the elements | |||
* @return the newly created Pipe. | |||
*/ | |||
def create[T](seeds: T*): IO[Pipe[T, T]] = create[T].map { pipe => | |||
def create[T](seeds: T*)(implicit s: Scheduler): IO[Pipe[T, T]] = create[T].map { pipe => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, since we now have both IO
and Task
in our project as dependencies, if we should continue using one or the other, or abstract over them with the type classes found in cats-effect
, i.e.
def create[F[_]: Sync, T](seeds: T*): F[Pipe[T, T]]
What do you guys think? :)
On an unrelated note, I also think we could get rid of the varargs here, I don't think it really ever makes sense to prepend items to a stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
I'm in favor of abstracting over IO and Task, this would also allow using outwatch without IO Monads again.
-
One seed item is definitely enough for state-management, which is actually what we need for reactive UIs ;) (Make Outwatch FRP-backend agnostic #96)
@LukaJCB could you please open PRs/Issues for this? I'll merge now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on abstracting over IO
and Task
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a small note, wanting to hear your opinion, overall I'm super happy with this, it looks really great!
} | ||
|
||
private[outwatch] final case class SubjectSink[T]() extends Subject[T](new SubjectFacade) with Sink[T] { | ||
override private[outwatch] def observer = this | ||
private[outwatch] final case class SubjectSink[T]()(implicit scheduler: Scheduler) extends Observable[T] with Sink[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why exactly do we need the scheduler here? Could we not add it to the methods? This would unclutter our code a bit, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, why don't we do:
def observer: Observer[T]
? So we wouldn't need the scheduler at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need it to create the Subscriber
inside the SubjectSink
(the Sink.observer
is actually a Subscriber
now) . If I would have left it an Observer
, all the Sink.redirect
operations would have required a Scheduler
(and also many operations in PipeOps
). This way they just use the Scheduler
from the source Sink
's Subscriber
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense, however, I think creating a new Handler
or Pipe
is going to be a much much more common operation, then redirect
, so I think it'd be better if the individual methods required the implicit evidence, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite, EmitterBuilder
's -->
would require an implicit Scheduler
in that case.
This is a port of Outwatch to the Monix library.