New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Var type #279

Closed
cranst0n opened this Issue Dec 14, 2016 · 6 comments

Comments

Projects
None yet
4 participants
@cranst0n
Contributor

cranst0n commented Dec 14, 2016

Having a Var data type may be worth including in monix. Something along the lines of:

import monix.execution.{ Ack, Cancelable, Scheduler }
import monix.reactive.{ Observable, OverflowStrategy }
import monix.reactive.observers.Subscriber
import monix.reactive.subjects.ConcurrentSubject

final class Var[A] private (
  initial: A,
  strategy: OverflowStrategy.Synchronous[A]
)(implicit scheduler: Scheduler)
    extends Observable[A] { self =>

  private[this] var value: A   = initial
  private[this] val underlying = ConcurrentSubject.behavior(initial, strategy)

  def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable =
    underlying.unsafeSubscribeFn(subscriber)

  def apply(): A =
    self.synchronized(value)

  def `:=`(update: A): Ack =
    self.synchronized {
      value = update
      underlying.onNext(update)
    }

}

object Var {
  def apply[A](
    initial: A,
    strategy: OverflowStrategy.Synchronous[A] = OverflowStrategy.Unbounded
  )(implicit scheduler: Scheduler): Var[A] = {
    new Var[A](initial, strategy)
  }
}
@cranst0n

This comment has been minimized.

Contributor

cranst0n commented Jan 9, 2018

@alexandru Any chance this might make it into 3.0?

@alexandru alexandru added this to the 3.0.0 milestone Jan 10, 2018

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 10, 2018

Yes, but I'd appreciate a pull request.

Some considerations:

  1. it should be in monix.reactive.subjects
  2. we can't have a configurable OverflowStrategy unfortunately — we can't have dropped messages because the current value needs to match what is being sent over the wire, otherwise it isn't intuitive imo, therefore the strategy used needs to be Unbounded always
  3. I like code to come with nice ScalaDocs and tests 😀

cranst0n added a commit to cranst0n/monix that referenced this issue Jan 10, 2018

@alexandru alexandru closed this in 7a3d8d0 Jan 11, 2018

@joschuafink

This comment has been minimized.

joschuafink commented Jan 24, 2018

I'm a bit late to the party, but in my eyes it does make sense to introduce a intermediary read-only type and derive Var from that:

trait Stateful[A] extends Observable[A] {
  def apply(): A
}

class Var[A] extends Stateful[A] // [...]

I'm just not sure about an appropriate name. In Scala.Rx the Rx class does that. In reactify Vals hold that functionality. I like Val as name, but it implies immutability, which is not actually the case. If there is interest, I'll send a pull request.

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 24, 2018

@joschuafink I don't like the semantics and the existence of Var is already pushing it. For example:

val subject = ConcurrentSubject.behavior[Int](DropNew(bufferSize = 128))

out.delayOnNext(1.second).dump("O").subscribe()

for (i <- 1 to 99999) out.onNext(i)

In this context, due to one consumer being slow and due to the buffering policy chosen, about ~99870 events will get dropped on the floor, with that one subscriber seeing only the first 128 events or so.

After that, when you subscribe to our subject, you'll get the last successfully emitted value, which will be number 128 (or something like that). Yet the value of Stateful will be ... 99999, a value that the downstream has not and will never see.

Consequently this is why I decided for Var to have unbounded buffering without the possibility of tuning the buffering policy on input. You can still set a buffer per subscriber though. But this model is pushing it, because for a BehaviorSubject (which is what Var is), there's always going to be a tension on what the "current" value is ...

  • is it the value that was just sent on the producer side?
  • or the last value that was successfully streamed to subscribers, given the back-pressure protocol
  • and if the later, when will it become visible? Is that before or after acknowledgement from all subscribers?

Var was accepted due to popular demand and seeing that Rx.NET has something similar. But both Rx.NET and Scala.Rx has a disability in the fact that they don't do back-pressure and I don't want us to develop further in this direction, because it's not a good path to take.

@joschuafink

This comment has been minimized.

joschuafink commented Jan 24, 2018

Thank you for your very detailed response. You are right, Monix focuses more on asynchronity and throughput, while libraries like Scala.Rx focus from their syntax more on propagating changes of state. It is probably good if things stay that way :-)

@fdietze

This comment has been minimized.

fdietze commented Mar 20, 2018

Be aware that when using Var for reactive state management, you will run into frp-glitches like mentioned in #467.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment