Airstream is a small state propagation and streaming library for Scala.js. Primary differences from other solutions:
-
Mandatory ownership of leaky resources – it is impossible to create a subscription without specifying when it shall be destroyed. This helps prevent memory leaks and unexpected behaviour.
-
No FRP glitches – neither observables themselves nor their observers will ever see inconsistent state within a transaction, at no runtime cost.
-
One integrated system for two core types of observables
- EventStream for events (lazy, no current value)
- Signal for state (lazy, has current value, only state-safe operators)
- Seamless interop between the two types
-
Small size, simple implementation – easy to understand, easy to create custom observables. Does not bloat your Scala.js bundle size.
Airstream has a very generic design, but is primarily intended to serve as a reactive layer for unidirectional dataflow architecture in UI components. As such, it is not burdened by features that cause more problems than they solve in frontend development, such as backpressure and typed effects.
I created Airstream because I found existing solutions were not suitable for building reactive UI components. My original need for Airstream was to replace the previous reactive layer of Laminar, but I'll be happy to see it used by other reactive UI libraries as well. Another piece of Laminar you can reuse is Scala DOM Types.
"com.raquo" %%% "airstream" % "<version>" // Requires Scala.js 1.16.0+
- Community
- Contributing
- Documentation
- Limitations
- My Related Projects
- Discord for chat and random questions (Airstream shares this server with Laminar)
- Github discussions for more in-depth discussions
- Github issues for bugs, feature requests
Please run sbt +test
and sbt scalafmtAll
locally before submitting the PR.
Note that existing tests print this compiler warning in Scala 3:
- [E029] Pattern Match Exhaustivity Warning: /Users/raquo/code/scala/airstream/src/test/scala-3/com/raquo/airstream/split/SplitMatchOneSpec.scala
This is expected. Ideally I would assert that this warning exists instead of printing it, but I don't think that's possible. I don't want to hide such warnings wholesale, but suggestions for improvement are welcome.
This documentation explains not only the functionality that Airstream offers, but also how it works, and the design tradeoffs involved. Nevertheless, if you need a primer on reactive programming using streams, consider this guide by André Staltz or its video adaptation.
This documentation is intended to be read top to bottom, sections further down the line assume knowledge of concepts and behaviours introduced in earlier sections.
For examples of Airstream usage, see Laminar Demo, Laminar source code, as well as Laminar's and Airstream's test suites.
EventStream is a reactive variable that represents a stream of discrete events.
EventStream has no concept of "current value". It is a stream of discrete events, and there is no such thing as a "current event".
EventStream is a lazy observable. That means that it will not receive or process events unless it has at least one Observer listening to it (more on this below).
Generally, when you add an Observer to a stream, it starts to send events to the observer from that point on.
The result of calling observable.addObserver(observer)(owner)
or observable.foreach(onNext)(owner)
is a Subscription. To remove the observer manually, you can call subscription.kill()
, but usually it's the owner
's job to do that. Hold that thought for now, read about owners later in the Ownership section.
Before exploring Signals, the other kind of Observable, let's outline how exactly laziness works in Airstream. All Airstream Observables are lazy, but we will use EventStream-s here to make our explanation less abstract.
Every Observable has zero or more observers – both "external" observers that you add manually using addObserver
or foreach
methods, and InternalObserver-s representing dependant observables. More on those soon.
When a stream acquires its first observer (does not matter if external or internal), it is said to be started. So when you call addObserver
on a stream for the first time, you start the stream. Airstream will then call the stream.onStart
method, which must ensure that this stream wakes up and starts working. Someone started observing (caring about the output of) this stream – and so the stream must ensure that the events start coming in.
Usually the stream accomplishes that by adding an InternalObserver to the parent (upstream) stream – the stream on which this one depends. For example, let's consider this scenario:
val foo: EventStream[Foo] = ???
val bar: EventStream[Bar] = foo.map(fooToBar)
val baz: EventStream[Baz] = bar.map(barToBaz)
val qux: EventStream[Qux] = baz.map(bazToQux)
val rap: EventStream[Rap] = qux.map(quxToRap)
baz.addObserver(bazObserver)
Until baz.addObserver(bazObserver)
was called, these streams would not be receiving or emitting any events because they have no observers, internal or external. After baz.addObserver
is called, an external observer bazObserver
is added to baz
, starting it. Then, baz.onStart
is called, adding baz
as an InternalObserver to bar
. baz
will now receive and process any events emitted by bar
.
But this means that bar
just got its first observer – even if an internal one, it still matters – someone started caring, even if indirectly. So its onStart
method is called, and it adds bar
as the first InternalObserver to foo
. Now, foo
is started as well, and its onStart
method does something to ensure that foo
will now start sending out events. We don't actually know what foo
's onStart
method does because we didn't define foo
's implementation. For example, it could be adding a DOM listener to a DOM element.
Now we see how adding an observer resulted in a chain of activations of all upstream streams that were required, directly or indirectly, to get the events out of the stream we actually wanted to observe. The onStart
method ensured – recursively – that the observed stream is now running.
Adding another observer to the now already running streams – foo
or bar
or baz
– would not need to cause such a chain reaction because the stream to which it is being added already has observers (internal or external).
Lastly, notice that the qux
and rap
streams are untouched by all this. No one cares for their output yet, so those streams will not receive any events, and neither bazToQux
nor quxToRap
will ever run (well, not until we add observers that need them to run, directly or indirectly).
On a lower level, how exactly is it that qux
will not run? Put simply, it needs to be getting events from baz
to process them and produce its own events, but it's getting nothing from baz
simply because at this point baz
does not know that qux
exists. baz
sends out its events to all of its observers, but so far nothing added qux
as an observer to baz
.
For extra clarity, while the stream rap
does depend on qux
, rap
itself has no observers, so it is stopped. Nothing started it yet, and so nothing triggered its onStart
method which would have added rap
as an InternalObserver to qux
, starting qux
recursively as described above.
Just like Observers can be added to streams, they can also be removed, e.g. with subscription.kill()
. When you remove the last observer (internal or external) from a stream, the stream is said to be stopped. The same domino effect as when starting streams applies, except the onStop
method recursively undoes everything that was done by onStart
– instead of adding an InternalObserver to parent stream, we remove it, and if that causes the grand-parent stream to be stopped, we call its onStop
method, and the chain continues upstream.
When the dust settles, streams that are now without observers (internal or external) will be stopped, and those that still have observers will otherwise be untouched, except they will stop referencing the now-stopped observables in their lists of internal observers.
Very often, an observable is started, used for a while, then stopped, and is discarded afterwards, never to be used again. However, Airstream does support restarting previously stopped observables, and Laminar has good use cases for that. Restarting Observables is an advanced topic that you can read about after you get a good understanding of other Airstream concepts like transactions.
Every observable that depends on another – parent, or upstream observable, – always has a reference to that parent, regardless of whether it's started or stopped.
However, the parent/upstream observable has no references to its child/downstream observable(s) until the child observable is started. Only then does the parent obtain a reference to the child, adding it to the list of its internal observers.
This has straightforward memory management implications: nothing in Airstream is keeping references to stopped observables. So, if you don't have any of your own references to a stopped Observable, it will be garbage collected, as expected.
However, a started observable has additional references to it from:
- The parent/upstream observable on which this observable depends (via the parent's list of internal observers)
- The Subscription objects created by
addObserver
orforeach
calls on this observable, if this observable has external observers. Those subscriptions are in turn referenced by their Owner-s (more on those later)
Remember that if a given observable is started, its parent is also guaranteed to be started, and so on. This creates a potentially long chain of observables that typically terminate with external observers on the downstream end, and some kind of event producer on the upstream end. All of these reference each other, directly or indirectly, and so will not be garbage collected unless there are no more references in your program to any observable or observer in this whole graph.
Now imagine that in the chain of activated observables mentioned above the most downstream observable is related to a UI component that has since then been destroyed. You would want that now-irrelevant observable to be stopped in order for it to be garbage collected, since it's not needed anymore, but it will continue to run for as long as it has its observer. And if you forgot to remove that observer when you destroyed the UI component it related to, you got yourself a memory leak.
This is a common memory management pattern for almost all streaming libraries out there, so this should come as no surprise to anyone familiar with event streams.
Some reactive UI libraries such as Outwatch give you a way to bind the lifecycle of subscriptions to the lifecycle of corresponding UI components, and that automatically kills the subscription (removes the observer) when the UI component it relates to is destroyed. However, the underlying streaming libraries that such UI libraries use have no concept of such binding, and so in those libraries you can manually call stream.addObserver
and create a subscription that will not be automatically killed when the UI component that it conceptually relates to is unmounted.
What makes Airstream special is a built-in concept of ownership. When creating a leaky resource, e.g. when calling addObserver
, you have to also provide a reference to an Owner who will eventually kill the subscription. For example, that owner could be a UI component to which the subscription relates, and it could automatically kill all subscriptions that it owns when it is destroyed, allowing the now-irrelevant observables to be stopped and garbage collected. This is essentially how Laminar's ReactiveElement
works. For more details, see the Ownership section.
Signal is a reactive variable that represents a time-varying value, or an accumulated value. In other words, "state".
Similar to EventStream, Signal is lazy, so everything in the Laziness section applies to Signals as well.
Unlike EventStream, a Signal always has a current value. For instance, you could create a Signal by calling val signal = eventStream.startWith(initialValue)
. In that example, signal
's current value would first equal to initialValue
, and then any time eventStream
emits a value, signal
's current value would be updated to the emitted value, and then signal
would emit this new current value.
However, all of that would only happen if signal
had one or more observers (because of Laziness). If signal
had no observers, its current value would be stuck at the last current value it saved while it had observers, or at initialValue
if it never had observers.
When adding an Observer to a Signal, the observer will immediately receive the signal's current value, as well as any future values. If you don't want the observer to receive the current value, observe the stream signal.changes
instead.
Note: Signal's initial value is evaluated lazily. For example:
val fooStream: EventStream[Foo] = ???
val fooSignal: Signal[Foo] = fooStream.startWith(myFoo)
val barSignal: Signal[Bar] = fooSignal.map(fooToBar)
In this example, barSignal
's initial value would be equal to fooToBar(myFoo)
, but that expression will not be evaluated until it is needed (i.e. until barSignal
acquires an observer). And once evaluated, it will not be re-evaluated again.
Similarly, myFoo
expression will not be evaluated immediately as it is passed by name. It will only be evaluated if and when it is needed (e.g. to pass it down to an observer of barSignal
).
Note: before Airstream 15, Signal only fired an event when its next value was different from its current value. The comparison was made using Scala's ==
operator. If you see references to "signals' ==
checks" in past issues / discussions, this is what they're talking about. In v15.0.0, this built-in auto-distinction filter is eliminated (see blog post), and you need to explicitly use one of the distinction operators to achieve such behaviour.
See relevant RFC: signal.peekNow()
Signal's laziness means that its current value might get stale / inconsistent in the absence of observers. Airstream therefore does not allow you to access a Signal's current value without proving that it has observers.
You can use stream.withCurrentValueOf(signal).mapN((lastStreamEvent, signalCurrentValue) => ???)
to access signal
's current value. The resulting stream will still be lazy, but this way the processing of currentValue
is just as lazy as currentValue
itself, so there is no risk of looking at a stale currentValue
.
If you don't need lastStreamEvent, use stream.sample(signal).map(signalCurrentValue => ???)
instead. Note: both of these output streams will emit only when stream
emits, as documented in the code. If you want updates from signal to also trigger an event, look into the combineWith
operator.
Note: withCurrentValueOf
and sample
operators are also available on signals, not just streams.
If you want to get a Signal's current value without the complications of sampling, or even if you just want to make sure that a Signal is started, just call observe
on it. That will add a noop observer to the signal, and return an OwnedSignal
instance which being a StrictSignal
, does expose now()
and tryNow()
methods that safely provide you with its current value. However, you will need to provide an Owner
to do that. More on those later.
Signals and EventStreams are distinct concepts with different use cases as described above, but both are Observables.
You can scanLeft(initialValue)(fn)
an EventStream into a Signal, or make a Signal directly with stream.startWith(initialValue)
, or stream.startWithNone
(which creates a "weak" signal, one that initially starts out with None
, and has events wrapped in Some
).
You can get an EventStream of changes from a Signal – signal.changes
– this stream will re-emit whatever the parent signal emits (subject to laziness of the stream), minus the Signal's initial value.
If you have an observable, you can refine it to a Signal with Observable#toWeakSignal
or Observable#toSignalIfStream(ifStream = streamToSignal)
, and to a Stream with Observable#toStreamIfSignal(ifSignal = signalToStream)
. For example, if you want to convert Observable[String]
into Signal[String]
with empty string as initial value in case this Observable is a stream, use observable.toSignalIfStream(_.startWith(""))
.
See also: Sources & Sinks
Observer[A]
is a modest wrapper around an onNext: A => Unit
callback that represents an external observer (see sections above for the distinction with InternalObserver-s). Observers have no knowledge of which observables, if any, they're observing, they have no power to choose whether they want to observe a given observable, etc.
Observers are intended to contain side effects, and to trigger evaluation of observables by their presence (remember, all Observables are lazy).
You usually create observers with Observer.apply
or myObservable.foreach
. There are a few more methods on Observer that support error handling.
Observers have a few convenience methods:
def contramap[B](project: B => A): Observer[B]
– This is useful for separation of concerns. For example your Ajax service might expose an Observer[Request]
, but you don't want a simple UserProfile
component to know about your Ajax implementation details (Request
), so you can instead provide it with requestObserver.contramap(makeUpdateRequest)
which is a Observer[User]
.
def filter(passes: A => Boolean): Observer[A]
– useful if you have an Observable
that you need to observe while filtering out some events (there is no Observable.filter
method, only EventStream.filter
).
def contramapSome
is just an easy way to get Observer[A]
from Observer[Option[A]]
def contracollect[B](pf: PartialFunction[B, A]): Observer[B]
– when you want to both contramap
and filter
at once.
def contracollectOpt[B](project: B => Option[A]): Observer[B]
– like contracollect
but designed for APIs that return Options, such as NonEmptyList.fromList
.
delay(ms: Int)
– creates an observer that calls the original observer after the specified delay (for both events and errors)
Observer.combine[A](observers: Observer[A])
creates an observer that triggers all of the observers provided to it. Unlike Observer[A](nextValue => observers.foreach(_.onNext(nextValue)))
, the combined observer will also trigger its child observers in case of .onError
(more about that in Error Handling).
Alright, this is it. By now you've read enough to have many questions about how ownership works. This assumes you've read all the docs above, but to recap the core problem that ownership solves:
- Adding an
Observer
to the lazily evaluatedObservable
is a leaky operation. That is, these resources will not be garbage collected even if the observable and the observer are both unreachable to user code. This is because the observable's parent observables will keep an internal reference to it for as long as it has observers. - Therefore, without Ownership you would have needed to remember to remove the observers that you added when the observers are no longer needed.
- But doing that manually is insane, you will eventually forget and cause memory leaks and undesired behaviour. You should not need to take out your own garbage in a garbage collected language.
If any of the above does not make sense, the rest of this section might be confusing. Make sure you at least understand the entirety of the Laziness section before proceeding.
Without further ado:
Subscription is a resource that must be killed in order to release memory or prevent some other leak. You can get it by calling observable.addObserver(observer)
, writeBus.addSource(stream)
, or other similar methods that all take an implicit owner
param.
Every Subscription has an Owner. An Owner is an object that keeps track of its subscriptions and knows when to kill them, and kills them when it's time (determined at its sole discretion). Airstream does not offer any concrete Owner classes (aside from the very basic ManualOwner
), just the base trait. Unless you use Dynamic Ownership, you need to instantiate (and thus implement) your own Owner-s.
For example, until v0.8, my reactive UI library Laminar's ReactiveElement
(a wrapper class for managing a JS DOM Element) used to implement Owner
. When a ReactiveElement
was discarded (unmounted from the DOM), it would kill all of its subscriptions
, i.e. all the Subscriptions that were bound to its lifetime. That would remove the observers that those subscriptions installed on the observables, stopping them if they have no other observers. Note: Laminar switched to Dynamic Ownership in v0.8 (more on that later).
When creating a Subscription, you can perform whatever leaky operations you wanted, and just provide the cleanup
method to perform any required cleanup.
Subscriptions are bound to a specific Owner upon the creation of the Subscription, and this link stays unchanged for the lifetime of the Subscription.
Subscriptions are normally killed by their Owner, but you can also .kill()
the subscription manually. The Owner will be notified about this via owner.onKilledExternally(subscription)
so that it can drop the reference to the killed subscription
from its list.
Killing the same Subscription more than once throws an exception, don't do it.
Built-in Owner carefully tracks a list of its subscriptions, making sure to call the right hooks, and create and dispose the right references for memory management. If you extend Owner and change that logic, memory management of that owner and its subscriptions is on you. Generally you shouldn't need to mess with any of that logic though, just make sure to call killSubscriptions
when it's time.
In broad terms, ownership solves memory leaks by tying the lifecycle of Subscriptions which would be otherwise hard to track manually to the lifecycle of an Owner which is expected to be tracked automatically by a UI library like Laminar.
In practice, Airstream's memory management has no magic to it. It uses Javascript's standard garbage collection, same as the rest of your Scala.js code. You just need to understand what references what, and the documentation here explains it.
For example, a Subscription created by observable.addObserver
method keeps references to both the Observable and the Observer (via the function passed as its cleanup
param). That means that if you're keeping a reference to a Subscription, you're also keeping those references. Given that the Subscription has a kill
method that lets you remove the observer from the observable, the presence of these references should be obvious. So like I said – no magic, you just need to internalize the basic ideas of lazy observables, just like you've already internalized the basic ideas of classes and functions.
The basic Owner
trait provides a high degree of flexibility, and therefore lacks some behaviour that you might expect in Owners.
For example, you can kill an Owner
multiple times. Every time you do, its subscriptions will be killed, and the list of subscriptions cleared, but the Owner
will remain usable after that, letting you add more subscriptions and kill them again later.
If you want an Owner that can only be killed once, and does not let you add subscriptions to it after it was killed, use the OneTimeOwner
class instead. DynamicOwner below uses OneTimeOwner, and that is how Laminar provides element Owners that can not be used after the element is unmounted and its owner is killed.
If you try to create a subscription that uses a OneTimeOwner, the subscription will be killed immediately, and OneTimeOwner
's onAccessAfterKilled
callback will be fired. You can throw in that callback, then subscription initialization will throw too.
Note that the subscription itself does not contain any activation logic (i.e. what needs to happen when subscription is activated), that user-provided logic is external to subscription initialization, and is typically run before the subscription is initialized, so before OneTimeOwner
can prevent that from happening. So when try to use a dead OneTimeOwner, instead of completely ignoring the effective payload of the subscription, unless you take special measures, it will still execute, but the subscription will be cancelled and cleaned up immediately. But if the subscription's payload was to e.g. make a network request, you can't put that back in the bottle.
Bottom line, you should not be deliberately sending events to dead OneTimeOwner
-s. They just fix what otherwise could be a memory leak, not completely prevent your code from running. They report the error so that you can fix your code that's doing this.
The basic Owner
trait also doesn't allow external code to kill it, because some owners are supposed to manage themselves. All you need to overcome that is expose the killSubscriptions
method to the public, or just use the ManualOwner
class that does this.
Dynamic Ownership is not a replacement for standard Ownership described above. Rather, it is a self-contained feature built on top of regular Ownership. No APIs in Airstream itself require Dynamic Ownership, it is intended to be consumed by the user or by other libraries depending on Airstream.
The premise of Dynamic Ownership is similar to that of regular ownership: you can create DynamicSubscription
-s owned by DynamicOwner
-s. Here is what's different:
Regular Subscription
-s can never recover from being kill()
-ed, whereas DynamicSubscription
-s can be activated and deactivated, and then activated again, and so on, as many times as their DynamicOwner
wants. For example:
val stream: EventStream[Int] = ???
val observer: Observer[Int] = ???
val dynOwner = new DynamicOwner
val dynSub = DynamicSubscription.unsafe(
dynOwner,
activate = (owner: Owner) => stream.addObserver(observer)(owner)
)
// Run dynSub's activate method and save the resulting non-dynamic Subscription
dynOwner.activate()
// Kill the regular Subscription that we saved
dynOwner.deactivate()
// Run dynSub's activate method again, obtaining a new Subscription
dynOwner.activate()
// Kill the new Subscription that we saved
dynOwner.deactivate()
Every time a DynamicOwner
is activate()
-d, it creates a new OneTimeOwner
, and uses it to activate
every DynamicSubscription
that it owns. It saves the resulting non-dynamic Subscription
, which the DynamicOwner
later kills()
when it's deactivate()
-d.
Now you can see how this integrates with regular ownership. Anything that requires a non-dynamic Owner
produces a Subscription
. So to create a DynamicSubscription
you need to provide an activate
method that does this. That could be a call to addObserver
, addSource
, etc.
As a result, we have a dynamic owner that can add or remove observer
from stream
at any time. In Laminar starting with v0.8 every ReactiveElement has a DynamicOwner
. When the element is mounted, that owner is activated, activating all dynamic subscriptions using a newly created non-dynamic owner. Then when the element is later unmounted, those subscriptions are deactivated, removing observers from observables, sources from event buses, etc.
Previously in Laminar v0.7 every ReactiveElement used to extend the non-dynamic Owner
, so once it was unmounted, all the subscriptions were killed forever, so if the user re-mounted that element, its subscriptions would not have come back to life. But now that Laminar uses Dynamic Ownership, you can re-mount previously unmounted elements, and their dynamic subscriptions will spring back to life.
Note that a DynamicSubscription
is not automatically activated upon creation. Its DynamicOwner controls its activation and deactivation. You can still permanently kill()
a DynamicSubscription manually – it will be deactivated if it's currently active, and removed from its DynamicOwner.
I created Dynamic Ownership specifically to solve this long-standing Laminar memory management issue: if a non-dynamic Subscription is created when ReactiveElement is initialized, and is killed when that element is unmounted, what happens to elements that get initialized but are never mounted into the DOM? That's right, their subscriptions are never killed (because they are technically never unmounted) and so they are essentially never garbage collected.
Laminar v0.8 had to fix this by creating Subscription
-s every time the element is mounted, and killing them when the element was unmounted. Long story short, Dynamic Ownership is exactly this, slightly generalized for wider use.
There is really nothing special in Dynamic Ownership memory management. It's just a helper to create and destroy subscriptions repeatedly. In practice DynamicSubscription's activate method generally contains the same references that Subscription
's cleanup method would, so it's all the same considerations as before.
What, a helper for subscription helpers? Yes, indeed. This one behaves like a DynamicSubscription
that lets you transfer it from one active DynamicOwner
to another active DynamicOwner
without deactivating and re-activating the subscription.
The API is simple:
class TransferableSubscription(
activate: () => Unit,
deactivate: () => Unit
) {
def setOwner(nextOwner: DynamicOwner): Unit
def clearOwner(): Unit
}
Note that you don't get access to Owner
in activate
. This is the tradeoff required to achieve this flexibility safely. TransferableSubscription
is useful in very specific cases when you only care about continuity of active ownership, such as when moving an element from one mounted parent to another mounted parent in Laminar (you wouldn't expect Unmount / Mount events to fire in this case).
We now understand how events propagate through streams and signals, but the events in Airstream have to originate somewhere, right?
EventStream.fromFuture[A]
creates a stream that emits the value that the future completes with, when that happens.
- The event is emitted asynchronously relative to the future's completion
- Creating a stream from an already completed future results in a stream that emits the future's value when it starts.
Signal.fromFuture[A]
creates a Signal of Option[A]
that emits the value that the future completes with, wrapped in Some()
.
- The initial value of this signal is always equal to
None
– even if the future has already completed when the initial value was evaluated. In that case, the initialNone
will be quickly (but asynchronously) followed bySome(resolvedValue)
. - If the Signal was created from a not yet completed future, the completion event is emitted asynchronously relative to when the future completes, because that is how
future.onComplete
works. - Being a
StrictSignal
, this signal exposesnow
andtryNow
methods that provide its current value. However, note that there is a short asynchronous delay between the completion of the Future and this signal's current value updating, as explained above.
Signal.fromFuture(future, initialValue)
is a variation of this method that returns a Signal[A]
instead of Signal[Option[A]]
. Otherwise, it behaves just as described above, with the initial None
replaced by initialValue
.
Note that all observables created from futures fire their events in a new transaction because they don't have a parent observable to be synchronous with.
If you have an Observable[Future[A]]
, you can flatten it into Observable[A]
in a few ways, see Flattening Observables.
A failed future results in an error (see Error Handling).
EventStream.fromPublisher[A]
creates a stream that subscribes to a Flow.Publisher, and emits the values that it produces.
Flow.Publisher
is a Java Reactive Streams interface that is useful for interoperating between streaming APIs. For example, you can transform an FS2 Stream[IO, A]
into an Airstream EventStream[A]
.
The resulting EventStream
creates a new Flow.Subscriber
and subscribes it to the publisher every time the EventStream
is started, and cancels the subscription when the stream is stopped.
import cats.effect.unsafe.implicits._ // imports implicit IORuntime
EventStream.fromPublisher(fs2Stream.unsafeToPublisher())
Behave the same as fromFuture
above, but accept js.Promise
instead. Useful for integration with JS libraries and APIs.
object EventStream {
def fromSeq[A](events: Seq[A]): EventStream[A] = ...
...
}
This method creates an event stream that synchronously emits events from the provided sequence to any newly added observer.
Each event is emitted in a separate transaction, meaning that the propagation of the previous event will fully complete before the propagation of the new event starts.
Note: you should avoid using this factory, at least with multiple events. You generally shouldn't need to emit more than one event at the same time like this stream does. If you do, I think your model is likely abusing the concept of "event". This method is provided as a kludge until I can make a more confident determination.
Like EventStream.fromSeq
(see right above), but only allows for a single event.
Like EventStream.fromValue
(see right above), but also allows an error.
Fires a Unit
, or another value, if provided, ms
milliseconds after the stream is started.
An event stream that emits events at an interval. EventStream.periodic
emits the index of the event, starting with 0
for the initial event that's emitted without delay. If you want to skip the initial event, use .drop(1)
. The resetOnStop
option (false
by default) determines whether the index will be reset to 0
when the stream is stopped due to lack of observers. You can also reset the stream to any index manually by calling resetTo(value)
on it. This will immediately emit this new index.
The underlying PeriodicStream
class offers more functionality, including the ability to emit values other than index, set a custom interval for every subsequent event, and stop the stream while it still has observers.
A stream that never emits any events.
EventStream.withCallback[A]
creates and returns a tuple of a stream and an A => Unit
callback that, when called, passes the callback's parameter to that stream. Of course, as streams are lazy, the stream will only emit if it has observers.
val (stream, callback) = EventStream.withCallback[Int]
callback(1) // nothing happens because stream has no observers
stream.foreach(println)
callback(2) // `2` will be printed
EventStream.withJsCallback[A]
works similarly except it returns a js.Function for easier integration with Javascript libraries.
EventStream.withUnitCallback
works similarly except it provides a callback that accepts no arguments, and a stream that emits Unit
.
EventStream.withObserver[A]
works similarly but creates an observer, which among other conveniences passes the errors that it receives into the stream.
new EventBus[MyEvent]
is a more powerful way to create a stream on which you can manually trigger events. The resulting EventBus exposes two properties:
events
is the stream of events emitted by the EventBus.
writer
is a WriteBus object that lets you trigger EventBus events in a few ways.
WriteBus extends Observer, so you can call onNext(newEventValue)
on it, or pass it as an observer to another stream's addObserver
method. This will cause the event bus to emit newEventValue
in a new transaction.
Or you can just call eventBus.emit(newEvent)
for the same effect.
What sets EventBus apart from e.g. EventStream.withObserver
is that you can also call eventBus.addSource(otherStream)(owner)
, and the event bus will re-emit every event emitted by that stream. This is somewhat similar to adding writer
as an observer to otherStream
, except this will not cause otherStream
to be started unless/until the EventBus's own stream is started (see Laziness).
You've probably noticed that addSource
takes owner
as an implicit param – this is for memory management purposes. You would typically pass a WriteBus to a child component if you want the child to send any events to the parent. Thus, we want addSource
to be automatically undone when said child is discarded (see Ownership), even if writer.stream
is still being observed.
Note: if using Laminar, you can create an EventBus and send events into it with source --> eventBus
– that way you don't need to manage owners manually, the parent element of this -->
call will effectively be the owner.
An EventBus can have multiple sources simultaneously. In that case it will emit events from all of those sources in the order in which they come in. EventBus always emits every event in a new Transaction. Note that EventBus lets you create loops of Observables. It is up to you to make sure that a propagation of an event through such loops eventually terminates (via a proper .filter(passes)
gate for example, or the implicit ==
equality filter in Signal).
You can manually remove a previously added source stream by calling kill()
on the Subscription object returned by the addSource call.
EventBus is particularly useful to get a single stream of events from a dynamic list of child components. You basically pass down the writer
to every child component, and inside the child component you can add a source stream to it, or add the writer
as an observer to some stream. Then when any given child component is discarded (i.e. its owner kills its subscriptions), its connection to the event bus will also be severed.
Typically you don't pass EventBus itself down to child components as it provides both read and write access. Instead, you pass down either the writer or the event stream, depending on what is needed. This separation of concerns is the reason why EventBus doesn't just extend WriteBus and EventStream, by the way.
WriteBus comes with a few ways to create new writers. Consider this:
val eventBus = new EventBus[Foo]
val barWriter: WriteBus[Bar] = eventBus.writer
.filterWriter(isGoodFoo)
.contramapWriter(barToFoo)
Now you can send Bar
events to barWriter
, and they will appear in eventBus
processed with barToFoo
then and filtered by isGoodFoo
. This is useful when you want to get events from a child component, but the child component does not or should not know what Foo
is. Generally if you don't need such separation of concerns, you can just map
/filter
the stream that's feeding the EventBus instead.
WriteBus also offers a powerful contracomposeWriter
method, which is like contramapWriter
but with compose
rather than map
as the underlying transformation.
EventBus emits every event in a new transaction. However, similar to Var batch updates you can call EventBus.emit
or EventBus.emitTry
to send values into several EventBus-es simultaneously, within the same transaction, to avoid glitches downstream.
val valuesEventBus = new EventBus[Int]
val labelsEventBus = new EventBus[String]
EventBus.emit(
valuesEventBus -> 100,
labelsEventBus -> "users"
)
Similar to Vars, you can't emit more than one event into the same EventBus in the same transaction. Airstream will throw if you attempt to do this, so you can't have duplicate inputs like EventBus.emit(bus1 -> ev1, bus1 -> ev2, bus2 -> ev3)
. If you need to emit more than one event into the same EventBus, just call the method twice, and they will be sent in separate transactions.
Var is a reactive variable that you can update manually, and that exposes its current value at all times, as well as a .signal
of its current value.
Creating a Var is straightforward: Var(initialValue)
, Var.fromTry(tryValue)
.
You can update a Var using one of its methods: set(value)
, setTry(Try(value))
, update(currentValue => nextValue)
, tryUpdate(currentValueTry => Try(nextValue))
. Note that update
will send a VarError into unhandled errors if the Var's current value is an error. Use set*
or tryUpdate
methods to update failed Vars.
Every Var provides a writer
which is an Observer that writes the values it receives into the Var.
In addition to writer
, Var also offers updater
s, making it easy to create an Observer that updates the Var based on both the Observer's input value and the Var's current value:
val v = Var(List(1, 2, 3))
val adder = v.updater[Int]((currValue, nextInput) => currValue :+ nextInput)
adder.onNext(4)
v.now() // List(1, 2, 3, 4)
val inputStream: EventStream[Int] = ???
inputStream.foreach(adder)
div(inputStream --> adder) // Laminar syntax
updater
will send a VarError into unhandled errors if you ask it to update a Var that is in a failed state. In such cases, use writer
or tryUpdater
instead.
Vars of Options, i.e. Var[Option[A]]
, also offer someWriter: Observer[A]
for convenience.
You can get the Var's current value using now()
and tryNow()
. now
throws if the current value is an error. Var also exposes a signal
of its values.
SourceVar, i.e. any Var that you create with Var(...)
, follows strict (not lazy) execution – it will update its current value as instructed even if its signal has no observers. Unlike most other signals, the Var's signal is also strict – its current value matches the Var's current value at all times regardless of whether it has observers. Of course, any downstream observables that depend on the Var's signal are still lazy as usual.
Being a StrictSignal
, the Var's signal also exposes now
and tryNow
methods, so if you need to provide your code with read-only access to a Var, sharing only its signal is the way to go.
Var emits every event in a new transaction. This has important ramifications when writing to and reading from a Var. Consider the following code:
val myVar = Var(0)
println("Start")
myVar.set(1)
println(s"After set: ${myVar.now()}")
myVar.update(_ + 1)
println(s"After update: ${myVar.now()}")
Transaction { _ =>
println(s"After trx: ${myVar.now()}")
}
println("Done")
If you put this code in your app's main
method or inside a setTimeout
callback, it will print:
Start
After set: 1
After update: 2
After trx: 2
Done
But if you try to run this same code while another transaction is being executed, for example inside one of your observers, in response to an incoming stream event, this is what will be printed:
Start
Done
After set: 0
After update: 0
After trx: 2
Why the difference? Var's current value exposed by now()
only updates when the Var emits the updated value, and as we now know, this always happens in a new transaction. But we ran our code inside an observer, that is, while another transaction was running. And the new transaction will only run after the current transaction has finished propagating, so the Var' current value will not update until then.
This is why reading myVar.now()
after calling myVar.set(1)
gives you a stale value in this case. Var tries very hard to do the right thing though. While you can't expect to see the new value in now()
, the update
method does provide the updated value, 1
, as the input to its callback. This is because the update callback is also scheduled for a new transaction, and so it is executed after the transaction in which the Var's value was set to 1
has finished propagating.
Finally, in both cases the code prints "After trx: 2". This is because that println is only executed in a new transaction. Similar to the update callback, this only gets run when the previously scheduled transactions have finished propagating, so it will always see the final Var value.
So there you have it, you have two ways to read the Var's new current value: either call now()
inside a new transaction, or use update
. And of course you can also listen to the Var's signal.
Keep in mind that transaction scheduling is fully synchronous, we do not introduce an asynchronous delay anywhere, we merely order the execution chunks to make the maximum amount of sense possible. Read more about transaction scheduling in the Transactions section.
If you have a Var[A]
, you can get zoomed / derived Var[B]
by providing a lens: A => B
and (A, B) => A
. The result is a LazyDerivedVar
, essentially a combination of var.signal.map
and writer.contramap
packaged in a Var.
The value of the derived var is linked two-way to its parent var. Updating one updates the other.
Example:
case class FormData(num: Int, str: String)
val formDataVar = Var(FormData(0, "a"))
val strVar = formDataVar.zoomLazy(_.str)((formData, newStr) => formData.copy(str = newStr))
// strVar.now() == "a"
formDataVar.update(_.copy(str = "b"))
// formDataVar.now() == FormData(0, "b")
// strVar.now() == "b"
strVar.set("c")
// formDataVar.now() == FormData(0, "c")
// strVar.now() == "c"
As the name implies, LazyDerivedVar
is evaluated lazily, unlike other Vars. That is, the zoomIn
function you provide (A => B
) will not be called until and unless you actually read the value from this Var (whether by calling .now()
or subscribing to its signal). Generally it's not a problem as zoomIn
is usually just a pure field selection function (e.g. it's just _.str
in the example above).
Before the introduction of zoomLazy
, Airstream also offered a strict zoom
method, which is now considered inferior, because it requires an Owner
. Note that derived vars created with the old zoom
method could only be updated if their owner remained active, or if they had any other subscribers. Otherwise, attempting to update the var would cause Airstream to emit an unhandled error. The old zoom
method will be deprecated in 18.0.0.
// TODO[18.0.0] - Reorganize this section, split out for every operator.
bimap
is an isomorphic (one-to-one, reversible) transformation of Var. For example, if you have val fooVar: Var[Foo]
, you could create a Var with the JSON representation of the Foo in that Var, and as any other derived Var, the values in these two vars would stay synced:
val jsonVar: Var[String] = fooVar.bimap(getThis = Foo.toJson)(getParent = Foo.fromJson)
Remember that updates to derived vars such as this jsonVar
are routed via the parent Var. So, if you say jsonVar.set(newJsonStr)
, we don't directly set this value to jsonVar
, we do more or less the following:
// Real implementation does proper error handling
val newFoo = Foo.fromJson(newJsonStr)
Var.set(
fooVar -> newFoo,
jsonVar -> Foo.toJson(newFoo)
)
We may optimize this in the future to avoid calling Foo.toJson
, but for now this is the simplest implementation using the same mechanism that we used for zoomLazy
. This indirection should not be observable in practice – as long as your getThis
/ getParent
callbacks are pure and don't throw. If they do throw, errors will be propagated as expected, given the indirection.
Just as we can filter values emitted by observables by distinct-ness (from the last emitted value), we can filter Vars in a similar manner:
case class Foo(id: Int, label: String)
val fooVar: Var[Foo] = Var(Foo(1, "hello"))
val distinctFooVar: Var[Foo] = fooVar.distinct
In this code snippet, distinctFooVar
is derived from fooVar
, and matches its value exactly (much like Var.bimap(identity)(identity)
would), unless fooVar
emits a value that is not distinct from its current value – in that case, only fooVar
emits the update event, and distinctFooVar
does not emit – it retains its previous value.
The default .distinct
filter uses a ==
check, but other operators like .distinctBy(_.id)
are also available.
So far this works just like these distinction operators would work on signals – you could achieve the same with e.g. fooVar.signal.distinct
. What's special about distinctFooVar
is that you can also write into it, and the writes are also filtered for distinctness. For example, if you try to write Foo(1, "hello")
into distinctFooVar
(same as its current value), it will not emit anything, and neither will this update be propagated to fooVar
. On the other hand, if you write Foo(2, "bye")
(different value), then both vars would get updated.
Distinct Vars may be useful when you're working with state that you always want distinct
-ed – then you simply discard the original Var, and always use the distinct Var, e.g.:
val selectedId = Var(1).distinct
selectedId.set(2) // new value – updated
selectedId.set(2) // value same as current – ignored
Similar to EventBus, Var emits each event in a new transaction. And, similar to EventBus.emit
, you can put values into multiple Vars "at the same time", in the same transaction, to avoid glitches downstream. To do that, use the set
/ setTry
/ update
/ tryUpdate
methods on the Var companion object. For example:
val value = Var(1)
val isEven = Var(false)
val sumSignal = x.signal.combineWith(y.signal)
// batch updates!
Var.set(x -> 2, y -> true)
With such a batched update, sumSignal
will only emit (1, false)
and (2, true)
. It will not emit an inconsistent value like (1, true)
or (2, false)
.
Batch updates are also atomic in the following ways:
update
andtryUpdate
will only execute the provided mods when the transaction is actually executed, not immediately as it's scheduled. This ensures that the mods operate on the latest available Var state.- Similar to
Var#update
,Var.update
sends an error into unhandled if you try to applymod
to a failed Var. In the batch case none of the input Vars will be updated, although some of the mod functions will be executed. For this reason, mod functions should be pure of side effects. UsetryUpdate
when you need more control over error handling. - Similar to
Var#tryUpdate
,Var.tryUpdate
sends an error into unhandled if any of the provided mods throw. None of the Vars will update in this case. You should return Failure() from your mod instead of throwing if this is not what you want.
Also, since an Airstream observable can't emit more than once per transaction, the inputs to batch Var methods must have no duplicate vars. For example, you can't do this: Var.set(var1 -> 1, var1 -> 2, var2 -> 3)
. Airstream will detect that you're attempting to put two events into var1
in the same transaction, and will send an error into unhandled. Use two separate calls if you want to send two updates into the same Var.
Keep in mind that derived vars count as the underlying source vars for duplicate detection purposes, so you can't update vars var1
and var1.zoom(fa)(fb)
in the same transaction.
Those are the only ways in which setting / updating a Var can trigger an error. If any of those happen when batch-updating Var values, Airstream will none of the involved Vars will fail to update, keeping their current value.
Remember that this atomicity guarantee only applies to failures which would have caused an individual update
/ tryUpdate
call to throw. For example, if the mod
function provided to update
throws, update
will not throw, it will instead successfully set that Var Failure(err)
.
For extra clarity, note that "sending error into unhandled" simply reports the error and cancels the update of the Var, it does not stop the execution of the program like a real throw
could.
Val(value)
/ Val.fromTry(tryValue)
is a Signal "constant" – a Signal that never changes its value. Unlike other Signals, its value is evaluated immediately upon creation, and is exposed in public now()
and tryNow()
methods.
Val is useful when a component wants to accept either a Signal or a constant value as input. You can just wrap your constant in a Val, and make the component accept a Signal
(or a StrictSignal
) instead.
Airstream has a convenient interface to make network requests using the modern Fetch browser API:
FetchStream.get(
url,
_.redirect(_.follow),
_.referrerPolicy(_.`no-referrer`),
_.abortStream(...)
) // EventStream[String] of response body
You can also get a stream of raw dom.Response
-s, or use a custom codec for requests and responses, all with the same API:
FetchStream.raw.get(url) // EventStream[dom.Response]
val Fetch = FetchStream.withCodec(encodeRequest, decodeResponse)
Fetch.post(url, _.body(myRequest)) // EventStream[MyResponse]
Ajax (XMLHttpRequest) is a legacy web technology that was largely replaced by the Fetch API (see above). Nevertheless, Airstream has a built-in way to perform Ajax requests:
AjaxStream
.get("/api/kittens") // EventStream[dom.XMLHttpRequest]
.map(req => req.responseText) // EventStream[String]
Methods for POST, PUT, PATCH, and DELETE are also available.
The request is made every time the stream is started. If the stream is stopped while the request is pending, the old request will not be cancelled, but its result will be discarded.
If the request times out, is aborted, returns an HTTP status code that isn't 2xx or 304, or fails in any other way, the stream will emit an AjaxStreamError
.
If you want a stream that never fails, a stream that emits an event regardless of all those errors, call .completeEvents
on your ajax stream.
You can listen for progress
or readyStateChange
events by passing in the corresponding observers to AjaxEventStream.get
et al, for example:
val (progressObserver, progressS) = EventStream.withObserver[(dom.XMLHttpRequest, dom.ProgressEvent)]
val requestS = AjaxEventStream.get(
url = "/api/kittens",
progressObserver = progressObserver
)
val bytesLoadedS = progressS.mapN((xhr, ev) => ev.loaded)
In a similar manner, you can pass a requestObserver
that will be called with the newly created dom.XMLHttpRequest
just before the request is sent. This way you can save the pending request into a Var and e.g. abort()
it if needed.
Warning: dom.XmlHttpRequest is an ugly, imperative JS construct. We set event callbacks for onload
, onerror
, onabort
, ontimeout
, and if requested, also for onprogress
and onreadystatechange
. Make sure you don't override Airstream's listeners in your own code, or this stream will not work properly.
Local Storage is a browser API that lets you persist data to a key-value client-side storage. This storage is shared between and is available to all tabs and frames from the same origin within the same browser.
Airstream offers persistent Vars backed by LocalStorage, accessed via WebStorageVar.localStorage
:
val themeVar: WebStorageVar[String] = WebStorageVar
.localStorage(key = "themeName", syncOwner = None)
.text(default = "light")
val tabIxVar: WebStorageVar[Int] = WebStorageVar
.localStorage(key = "selectedTabIndex", syncOwner = None)
.int(default = 0)
val showSidebarVar: WebStorageVar[Boolean] = WebStorageVar
.localStorage(key = "showSidebar", syncOwner = None)
.bool(default = true)
See live LocalStorage Var demo in the laminar demo project.
As the underlying LocalStorage API can only store string values, Airstream Vars offer an easy way to specify custom encoding/decoding functions, so that you can e.g. JSON-encode your case classes:
val fooVar: WebStorageVar[Foo] = WebStorageVar
.localStorage(key = "foo", syncOwner = None)
.withCodec(
encode = Foo.toJson,
decode = Foo.fromJson,
default = Success(Foo(1, "name"))
)
With JSON encoding, be careful to keep the schema compatible over time. As your code evolves, at a minimum, your new code should always be able to parse JSON strings written to LocalStorage by your old code. Your JSON library may help with that, with optional field encodings etc. You can also amend your decode
function to reset the user to the default
value instead of returning a Failure – rough as that would be, it often would be better than breaking the app with a failed Var.
Please note that you should only have at most one Var managing a given localStorage key in a given document. If you have multiple instances of Var in the same document / browser tab, both looking at the same e.g. key = "foo"
, they will go out of sync – updates to one of these Var-s will not propagate to the other Var.
However! It is perfectly fine to have two documents in separate browser tabs managing the same LocalStorage key, using one Var each, if you specify some syncOwner
. In most cases, you will want to make those Var-s global in your code, such that they never need to be garbage collected (until the tab is closed). In such cases, simply specify syncOwner = Some(unsafeWindowOwner)
(from Laminar), and your Var-s will magically sync across the tabs – you update the Var in one tab, and the Var in the other tab will immediately update as well. For example, switching theme from light to dark across multiple tabs can work this way.
When do you need to use a different, non-global syncOwner
? In short – when you're creating ephemeral Var-s that need to be garbage-collected at some point. For example, if you are rendering a list of items, and for each item, you want to remember its isExpanded
state in a separate LocalStorage key (e.g. item_<id>_isExpanded
) – then you will want to use an element-specific owner
provided by Laminar's onMount*
callbacks, so that the Var's syncing resources are released when you unmount the element that the Var is related to (e.g. because you stopped rendering that particular item).
This may be inconvenient, as you may not have the owner by the time you need the Var, so you can specify syncOwner = None
to create the Var, and then call syncFromExternalUpdates
on it from inside onMountCallback
:
def renderItem(item: Item): Div = {
val isExpandedVar = WebStorageVar
.localStorage(key = s"item_${id}_isExpanded", syncOwner = None)
.bool(default = false)
div(
onMountCallback { ctx => isExpandedVar.syncFromExternalUpdates(ctx.owner) },
div(
onClick.mapToUnit --> isExpandedVar.invertWriter,
s"Item ${item.id}: ${item.label}"
),
div(
cls("-details"),
display <-- isExpandedVar.signal.map(if (_) "block" else "hidden"),
"..."
)
)
}
As always, needing to mess with custom owners manually should give you a hint that there is likely a better way to accomplishing your goal. Consider that instead of having N local Var-s (one for each item id) that need their lifetime individually managed like in the snippet the above, you could just have one global LocalStorage Var for key = "isExpanded"
, containing a list of item IDs that were expanded – that one you could just use with unsafeWindowOwner
.
To be extra clear about memory management – just as with the usual Var-s, creating a WebStorageVar
with syncOwner = None
does not require cleanup – such a Var would be garbage-collected when it goes out of scope. It's the syncing part that needs cleanup, if you want to discard the Var before the user closes the browser tab.
The user's browser configuration may not allow you to use LocalStorage and SessionStorage. For example, if the user disabled cookies and site data, you will not be able to read or write to LocalStorage.
In such cases, the WebStorageVar will not throw an error, but will default to working as a regular non-persisted Var.
If you need the storage to work, you can check whether LocalStorage is enabled with WebStorageVar.isLocalStorageAvailable()
and similar methods, and ask the user to enable it if it's disabled. This method will attempt to write-then-delete a small piece of data to LocalStorage, and will report whether that succeeds.
- When creating the Var, it will try to read the current value from the underlying LocalStorage key. If the key was not yet set, it will initialize to the provided
default
value, and write that to LocalStorage as well. encode
anddecode
functions must not throw. If thedecode
function returns aFailure
, the Var will be set to its error value. But:- Whenever the Var is set to an error value, the underlying LocalStorage will not be updated, as we have no way to encode arbitrary exceptions. Thus, error states are not synced between tabs.
- When using
withCodec
withsyncOwner
, we de-duplicate Var updates coming from the other tabs using==
to prevent an infinite loop of two tabs re-sending the same update to each other. You can specify a customisSame(v1, v2)
function by passing it assyncDistinctByFn
param towithCodec
. - Additional values and methods on WebStorageVar for more complex use cases:
externalUpdates
stream,rawStorageValues
signal,pullOnce
andsetFromStoredValue
. See scaladoc for those.
Session Storage is a data persistence API that is very similar to Local Storage, but is more ephemeral. Its data is only available within one tab's session (roughly speaking, each tab gets its own session storage), so typically you would use it without syncing – just with syncOwner = None
.
How is SessionStorage Var different from a regular Airstream Var? Unlike simple JS variables that are discarded when you close or reload the current document, a SessionStorage Var's value survives page reloads, browser navigation, etc. One real life example of such persistence you can see on Github, when entering a PR comment. If you accidentally navigate away from that page, you can press the browser's back button, and your comment draft will still be there.
Airstream's SessionStorage Vars have exactly the same API as LocalStorage Vars:
val themeVar: WebStorageVar[String] = WebStorageVar
.sessionStorage(key = "comment", syncOwner = None)
.text(default = "")
See live SessionStorage Var demo in the laminar demo project.
While SessionStorage is not shared across multiple tabs, it is still shared across multiple frames of the same origin within one tab, so if some of your web app's content is isolated in an <iframe>
, and that iframe runs its own JS script with its own instance of Airstream, you can use the syncing functionality to sync the SessionStorage Vars across that boundary.
For more details on using the WebStorageVar, see LocalStorage section right above.
Airstream has no official websockets integration yet.
For several users' implementations, search the old Laminar gitter room, and the issues in this repo.
val element: dom.Element = ???
DomEventStream[dom.MouseEvent](element, "click") // EventStream[dom.MouseEvent]
This stream, when started, registers a click
event listener on element
, and emits all events the listener receives until the stream is stopped, at which point the listener is removed.
Airstream does not know the names & types of DOM events, so you need to manually specify both. You can get those manually from MDN or programmatically from event props such as onClick
available in Laminar.
DomEventStream
works not just on elements but on any dom.EventTarget
. However, make sure to check browser compatibility for weird EventTarget-s such as XMLHttpRequest.
If simpler event sources (see above) do not suit your needs, consider using CustomSource
. This mechanism lets you create a custom stream or signal as long as it does not depend on other Airstream observables. So, it's good for bringing third party sources of events into Airstream.
You can create custom event sources using EventStream.fromCustomSource
and Signal.fromCustomSource
, which are convenience wrappers over the underlying CustomEventSource
and CustomSignalSource
classes. This section will explain how to use those underlying classes, and after that the understanding of fromCustomSource
methods should come naturally.
Airstream's DomEventStream.apply
creates a stream of events by wrapping the DOM API into CustomStreamSource
. Let's see how it works:
def apply[Ev <: dom.Event](
eventTarget: dom.EventTarget,
eventKey: String,
useCapture: Boolean = false
): EventStream[Ev] = {
CustomStreamSource[Ev]( (fireValue, fireError, getStartIndex, getIsStarted) => {
val eventHandler: js.Function1[Ev, Unit] = fireValue
CustomSource.Config(
onStart = () => {
eventTarget.addEventListener(eventKey, eventHandler, useCapture)
},
onStop = () => {
eventTarget.removeEventListener(eventKey, eventHandler, useCapture)
}
)
})
}
When we create a CustomStreamSource
, we need to provide a callback that accepts some useful arguments and returns an instance of CustomSource.Config
, which is essentially a bundle of two callbacks: onStart
which fires when your stream is started, and onStop
which fires when your stream is stopped (see Laziness).
Here we see that DomEventStream registers fireValue
as an event listener on the DOM element when the stream starts, and unregisters that listener when the stream stops. This way the resulting stream will properly clean up its resources.
Side note: val eventHandler
is cached to avoid implicitly creating a new instance of js.Function1
. We need to keep this exact reference to be able to unregister the listener. Just a bit of Scala-vs-js friction here.
Let's look at the methods that CustomStreamSource
makes available to us:
- fireValue - call this with a value to make the custom stream emit that value in a new transaction
- fireError - call this with a Throwable to make the custom stream emit an error (see Error Handling)
- getStartIndex – call this to check how many times the custom stream has been started. Airstream uses this for the
emitOnce
param in streams likeEventStream.fromSeq
. - getIsStarted – call this to check if the custom stream is currently started
CustomSource.Config
instances have a when(passes: () => Boolean)
method that returns a config that, when the predicate does not pass, will not call your onStart
callback when the stream starts, and will not call your onStop
callback when the stream is subsequently stopped (we assume that your onStop
code cleans up after your onStart
code). To clarify, the predicate is evaluated when the custom stream is about to start. And the stream *will actually start – you can't break this part of Airstream contract – the predicate only controls whether your callbacks defined in the config will be run. You can see this predicate being useful to implement the emitOnce
param in streams like EventStream.fromSeq
.
CustomSignalSource is the Signal version of CustomStreamSource, and works similarly, just with a slightly different set of params:
class CustomSignalSource[A] (
getInitialValue: => Try[A],
makeConfig: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => CustomSource.Config
)
fireValue
and fireError
are merged into one setCurrentValue
callback that expects a Try[A]
, and this being a Signal, we also provide a getCurrentValue
param to check the custom signal's current value.
Generally signals need to be started in order for their current value to update. Stopped signals generally don't update without listeners, unless they are a StrictSignal
like Var#signal
. CustomSignalSource
is not a StrictSignal
so there is no expectation for it to keep updating its value when it's stopped. Users should keep listening to signals that they care about.
Your use case and stylistic preferences may call for creating a bona fide Var
rather than a CustomSignalSource
as shown above. In those cases, you can simply subclass SourceVar
– all you need is to provide the initial value to its constructor. You can add your own methods that update the Var's value in special ways, or provide custom streams / signals, etc.
For inspiration, see Airstream's own WebStorageVar – it's a Var backed by LocalStorage.
If you don't want to expose the underlying Var, you can also try to create a derived Var – see how Var's zoomLazy
/ bimap
/ distinct
methods work, for example. But that requires some understanding of Airstream internals.
Alternatively, you could also create a custom class that hides the underlying Var, and only exposes custom signals / observers / etc. For a nicer integration with Laminar and Airstream, such a class could extend SignalSource
and Sink
Airstream traits (see Sources & Sinks).
If you need a custom observable that depends on another Airstream observable, you can subclass WritableEventStream
or WritableSignal
. See existing classes for inspiration, such as MapSignal
and MapEventStream
.
You will likely want to mix in either SingleParentSignal with InternalTryObserver
or SingleParentStream with InternalNextErrorObserver
. Then you will just need to implement onTry
(for signals) or onNext
/ onError
(for streams) methods, which will be triggered when the parent observable emits. In turn, those methods should call fireValue
, fireError
or fireTry
to make your custom observable emit its own value. Also, for signals you will need to implement initialValue
which you should derive from the parent observable's current value (NOT from the parent observable's initialValue
).
If you want to put asynchronous logic in your observable, make sure to have a good understanding of Airstream transactions and topoRank, and consult with other asynchronous observables implementations such as DelayEventStream
.
If your custom observable does not depend on any Airstream observables, e.g. if you're writing a compatibility layer for a third party library, you generally should be able to use the simpler Custom Sources API.
Some values and methods that you might want to access on observables are protected
. That means that the compiler will only let you access those values and methods on the same instance. So, you can read this.topoRank
, but you can't read parentObservable.topoRank
. To get around this, use the Protected
object: Protected.topoRank(parentObservable)
.
Aside from topoRank
, you will need to access tryNow()
and now()
this way, e.g. when implementing a custom signal's initialValue
. These methods require an implicit evidence of type Protected
, which is automatically in scope if you're calling these methods from inside your custom observable. You're not supposed to access a signal's current value from the outside, without proving that the signal is running (e.g. by subscribing to it), otherwise you might get a stale value.
Honestly all this "protected" business smells funny to me, but I couldn't figure out a better way to allow third party extensions without making these protected members public.
A Source[A]
in Airstream is something that exposes a toObservable
method, something that can be (explicitly, not implicitly) converted into an Observable[A]
. For example, the observables themselves are Sources, but so are EventBus-es (def toObservable = this.events
) and Var-s (def toObservable = this.signal
).
Source is further subtyped into – EventSource
(EventStream, EventBus) and SignalSource
(Signal, Var). Predictably, eventSource.toObservable
returns an EventStream, whereas signalSource.toObservable
returns a Signal.
These types are useful when you want to create a method that can accept "anything that you can get a stream from". For example, it's used in Laminar:
val textBus = new EventBus[String]
div(value <-- textBus.events)
div(value <-- textBus) // Also works because this <-- accepts Source[String]
The counterparty to Source
in Airstream is Sink
. Sink[A]
is something that exposes a toObserver
method that can be explicitly (not implicitly) convert a Sink to an Observer. So Observers are sinks, as are EventBus-es and Var-s, and even js.Function1[A, Unit]
has an implicit conversion to Sink[A]
.
However, there is no implicit conversion from A => Unit
to Sink
because unfortunately Scala requires a lambda's type param to have a type ascription to implicitly convert it into a Sink[A], so syntax like div(value <-- (_ => println("x"))
would not be possible with such an implicit defined. In Laminar we get around this by overloading the <--
method to accept either a Sink[A]
or A => Unit
. If you need this conversion, just wrap your function in Observer()
. You'll still need to ascribe the types though.
Speaking of implicits, why don't we have EventBus extend both EventStream[A]
and Observer[A]
instead of having separate Source and Sink types? On a technical level simply because Observable and Observer have overlapping methods defined, such as filter
and delay
, but more importantly, it would just be confusing. The whole point of Source and Sink is to not expose any methods other than toObservable, so that these types are only used as input types to methods that the developer wants to be flexible.
A glitch in Functional Reactive Programming is a situation where inconsistent state is allowed to exist and exposed to either an observable or an observer. For example, consider the typical diamond case:
val numbers: EventStream[Int] = ???
val isPositive: EventStream[Boolean] = numbers.map(_ > 0)
val doubledNumbers: EventStream[Int] = numbers.map(_ * 2)
val combinedStream: EventStream[(Int, Boolean)] = doubledNumbers.combineWith(isPositive)
combinedStream.addObserver(combinedStreamObserver)(owner)
Now, without thinking too hard, what do you think combinedStream
will emit when numbers
emits 1
, assuming -1
was previously emitted? You might expect that isPositive
would emit true
, doubledNumbers
would emit 2
, and then combinedStream would emit a tuple (2, true)
. That would make sense, and this is how Airstream works at no cost to you, and yet this is not how most streaming and state propagation libraries behave.
Most streaming libraries will introduce a glitch in this scenario, as they are implemented with unconditional depth-first propagation. So in other libraries when the event from numbers
(1
) propagates, it goes to isPositive
(true
), then to combinedStream
((-1, true)
). And that's a glitch. (-1, true)
is not a valid state, as -1 is not a positive number. Immediately afterwards, doubledNumbers
will emit 2
, and finally combinedStream would emit (2, true)
, the correct event.
Such behaviour is problematic in a few ways – first, you are now propagating two events on equal standing. Any observables (and in most other libraries, even observers!) downstream of combinedStream
will see two events come in, the first one carrying invalid/incorrect state, and they will probably perform incorrect calculations or side effects because of that.
In general, glitches happen when you have an observable that synchronously depends on multiple observables that synchronously depend on a common ancestor or one of themselves. I'm using the term synchronously depends
to describe a situation where emitting an event to a parent observable might result in the child observable also emitting it – synchronously. So map
and filter
would fall into this category, but delay
wouldn't.
In the diamond-combine case described above Airstream avoids a glitch because CombineObservable-s (those created using the combineWith
method) do not propagate downstream immediately. Instead, they are put into a pendingObservables
queue in the current Transaction (we'll get to those soon). When the rest of the propagation within a transaction finishes, the propagation of the first pending observable is resumed. When that is finished, we propagate the first remaining pending observable, and so on.
So in our example, what happens in Airstream: after isPositive
emits true
, combinedStream
is notified that one of its parents emitted a new event. Instead of emitting its own event, it adds itself to the list of pending observables. Then, as the isPositive
branch finished propagating (for now), doubledNumbers
emits 2
, and then again notifies combinedStream
about this. combinedStream
is already pending, so it just grabs and remembers the new value from this parent. At this point the propagation of numbers
is complete (assuming no other branches exist), and Airstream checks pendingObservables
on the current transaction. It finds only one – combinedStream
, and re-starts the propagation from there. The only thing left to do in our example is to send the new event – (2, true)
to combinedStreamObserver
.
Now, only this simple example could work with such logic. The important bit that makes this work for complex observable graphs is topological rank. Topological rank in Airstream is defined as follows: if observable A synchronously depends (see definition above) on observable B, its topological rank will be greater than that of B. In practical terms, doubledNumbers.topoRank = numbers.topoRank + 1
and combinedStream.topoRank == max(isPositive.topoRank, doubledNumbers.topoRank) + 1
.
In case of combineWith
, Airstream uses topological rank for one thing – do determine which of the pending observables to resolve first. So when I said that Airstream continues the propagation of the "first" pending observable, I meant the one with the lowest topoRank
among pending observables. This ensures that if you have more than one combined observable pending, that the one that doesn't depend on the other one will be propagated first.
So this is how Airstream avoids the glitch in the diamond-combine case.
Before we dive into other kinds of glitches (ha! you thought that was it!?), we need to know what a Transaction is.
Philosophically, a Transaction in Airstream encapsulates a part of the propagation that 1) happens synchronously, and 2) contains no loops of observables. Within the confines of a single Transaction Airstream guarantees a) no glitches, and b) that no observable will emit more than once.
Async streams such as stream.delay(500)
emit their events in a new transaction because Airstream executes transactions sequentially – and there is no sense in keeping other transactions blocked until some Promise or Future decides to resolve itself.
Events that come from outside of Airstream – see Sources of Events – each come in a new Transaction, and those source observables have a topoRank
of 1. I guess it makes sense why EventStream.periodic
would behave that way, but why wouldn't EventBus
reuse the transaction of whatever event came in from one of its source streams?
And the answer is the limitation of our topological ranking approach: it does not work for loops of observables. A topoRank is a property of an observable, not of the event coming in. And an observable's topoRank is static, determined at its creation. EventBus on its creation has no sources, and allows you to fire events into it manually, so its stream needs to emit all those events in a new Transaction because there is no way to guarantee correct topological ranking to avoid glitches.
That said, in practice this is not a big deal because the events that an EventBus receives from different sources should be usually independent of each other because they are coming from different child components or from different browser events.
Apart from EventBus there is another way to create a loop – the eventStream.flatten
method. And that one too, produces an event stream that emits all events in a new transaction, for all the same reasons.
Loops and potentially-loopy constructs necessarily require a new transaction as a tradeoff. Some other libraries do some kinds of dynamic topological sorting which is less predictable and whose performance worsens as your observables graph gets more complicated, but with Airstream there are no such costs. The only – and tiny – cost is when Airstream inserts a CombineObservable into the list of pending observables – that list is sorted by a static topoRank
field, so it takes O(n) where n is the number of currently pending observables, which is usually zero or not much more than that.
Lastly, keep in mind that emitting events inside Observer-s will necessarily happen in a new transaction as you will need to use EventBus / Var APIs that create new transactions. Observers are generally intended for side effects. Those effects might be emitting other events, but in that case we consider them independent events, not a continuation of the current transaction. Philosophically, Observers should not know what they're observing (and they can observe multiple things at a time).
Consider this:
val numbers: EventStream[Int] = ???
val tens: EventStream[Int] = numbers.map(_ * 10)
val hundreds: EventStream[Int] = tens.map(_ * 10)
val multiples: EventStream[Int] = EventStream.merge(hundreds, tens)
multiples.addObserver(multiplesObserver)(owner)
What do you expect multiples
to emit when numbers
emits 1
? I expect it to emit 10
, and then 100
. Two important considerations here:
-
On a high level, the order of output events is determined by the order of input events:
hundreds
emits100
aftertens
emits10
, so the merged stream does the same. On a technical level, the order of events emitted in the same transaction is determined by the parent observables' topological rank. -
The merged stream can, by design, emit multiple events per one origination event (the
1
event), as shown in our example above. This means that it can't always emit all of the events in the same incoming transaction, because any observable can only emit one event per transaction. At the same time, in cases when the merge stream depends only on mutually unrelated observables (that never emit in the same transaction), we don't want to force the merge stream to fire all of its events in a new transaction, as this could cause FRP glitches down the road. And so, the merge stream takes a compromise: in every transaction, it emits the first parent observable's event as-is, but if any other parent observable also emits in the same transaction (liketens
andhundreds
do in our example), the merge stream re-emits that event in a new transaction. So, in our example, it would emit10
in the same transaction as the original event, and then emit100
in a new transaction.
Such handling of transactions might seem arbitrary, but it actually matches the semantics of merge streams. As a result, such a mechanism produces desired behaviour. Even though we're emitting some events in new transactions, which would normally increase the chance of FRP glitches downstream, we only do it when it's necessary (when the merge stream emits more than one event per one originating event), and so in practice we don't see glitches. In fact, any other behaviour is guaranteed to cause glitches (unexpected behaviour). This might sound handwavy, but there's actually a lot of real life experience and unit tests behind that principle. See Operators vs Transactions for more on that.
When you call methods like Var#set
, EventBus.emit
, etc. we create a new transaction. If another transaction is currently executing, which is often the case (e.g. if you're doing this inside a stream.foreach
callback), this transaction will not be executed immediately, but will be scheduled to be executed later, because to avoid glitches, the current transaction needs to finish first before any other transaction can put more events onto the observable graph.
So if you set a Var's value, you will not be able to read it in the same transaction, because this instruction will only be executed after the current transaction finishes:
val logVar: Var[List[Event]] = ???
stream.foreach { ev =>
logVar.set(logVar.now() :+ ev)
logVar.set(logVar.now() :+ ev)
println(logVar.now())
// NONE of the logVar.now() calls here will contain any `ev`
// because they are all executed before the .set transaction executes.
// Because of this, after all of the transactions are executed,
// logVar will only contain one instance of `ev`, not two.
}
If you need to read of Var after writing to it, you can use Var#update
, which will evaluate its mod only when its transaction runs, so it will always look at the freshest state of the Var:
val logVar = Var(List[Event]())
stream.foreach { ev =>
logVar.update(_ :+ ev)
logVar.update(_ :+ ev)
// After both transactions execute, logVar will have two `ev`-s in it
}
Let's expand our example above:
val bus = new EventBus[Event]
val logVar = Var(List[Event]())
val countVar = Var(0)
bus.events.foreach { ev =>
logVar.update(_ :+ ev)
logVar.update(_ :+ ev)
// After both transactions execute, logVar will have two `ev`-s in it
}
logVar.signal.foreach { log =>
sideEffect(log.size)
countVar.update(_ += 1)
}
Let's say you fires an event into bus
, and its transaction A started executing. The callback provided to bus.events.foreach
will schedule two transactions to update logVar
, B and C. After that, transaction A will finish as there are no other listeners.
Transaction B will immediately start executing. ev
will be appended to logVar
state, then this new state will be propagated to logVar.signal
. sideEffect(1)
will be called, and another transaction D to update countVar
will be scheduled. After that, transaction B will finish as there are no other listeners.
Now, which transaction will execute next, C (the second update to logVar
), or D (update to countVar
)? Since Airstream v0.11.0, D will execute next, because its considered to be a child of the transaction B that just finished, because it was scheduled while transaction B was running. After a transaction finishes, Airstream first executes any pending transactions that were scheduled while it was running, in the order in which they were scheduled. This is recursive, so effectively we iterate over an hierarchy of transactions in a depth-first search.
In practice, this makes sense: in the code, the first logVar.update(_ :+ ev)
is seen before the second logVar.update(_ :+ ev)
, so the first transaction will completely finish, including any descendant transactions it creates, before we hand over control to its sibling transaction.
Remember that all of this happens synchronously. There can be no async boundaries within a transaction. Any event fires after an async delay is necessarily fired in a new transaction that is initialized / scheduled after the async delay, so it's not part of the pending transaction queue until the async delay resolves, and when it does, it's guaranteed that there are no pending transactions in the queue as Javascript is single-threaded.
Airstream offers standard observables operators like map
/ filter
/ collect
/ compose
/ combineWith
etc. You will need to read the API doc or the actual code or use IDE autocompletion to discover those that aren't documented here or in other section of the documentation. In the code, see BaseObservable
, Observable
, EventStream
, and Signal
traits and their companion objects.
Some of the more interesting / non-standard operators are documented below:
These operators get current / latest values from several observables at once.
The standard combineWith
operator emits updates that are the tuples of the latest available value from each of the parent observables. In that sense it is quite similar to the combineLatest RX operator. This is the canonical way to combine two observables in Airstream (and not flatMap).
For example, signalA.combinewith(signalB)
emits the latest available (A, B)
value whenever signalA
or signalB
emits. If both signals emit simultaneously, i.e. in the same transaction, then the combined signal will emit only once, avoiding the common FRP glitch. See also: Topological rank.
For streams (streamA.combinewith(streamB)
), the combined stream emits its first event when it has observed all of its parent streams to have emitted at least one event. Since it emits (A, B)
, it needs to wait for both A
and B
to become available.
combineWith
can only be used with either signals, or streams. You can't mix them. You can however convert e.g. your stream to a signal before handing it off to signal.combineWith
, using stream operators like toWeakSignal
, startWith(initial)
, scanLeft
, etc.
combineWith
has several arity helpers. See N-arity Operators.
- You can combine more than two observables at once, e.g.
stream1.combineWith(stream2, stream3, ...)
combinieWith
auto-flattens nested tuples, i.e.streamA.combineWith(streamB).combineWith(streamC)
will emit events of(A, B, C)
, not the inconvenient((A, B), C)
combineWith
has several other variations:
combineWithFn
lets you specify an alternative combining function instead of tupling.EventStream.combine(stream1, stream2, ...)
andSignal.combine(signal1, signal2, ...)
helpers.
This operator, defined for both signals and streams, lets you get read the current value of another signal, every time a certain observable emits an update. For example, stream.withCurrentValueOf(signal)
will emit (event, <currentSignalValue>)
whenever stream
emits event
. For convenience, you can also read the current value of Var-s this way, although for Var-s, you can always just call .now()
.
See Getting Signal's current value.
You can read the values of multiple signals and/or Vars at once: observable.withCurrentValueOf(signal1, signal2, var3)
.
This operator is exactly like withCurrentValueOf
(see right above), but it discards the event
itself. So, stream.withCurrentValueOf(signal)
will emit <currentSignalValue>
whenever stream
emits an event. So the stream
is basically acting as a timing / trigger for sampling other signals and/or Vars (yes, you can sample multiple at the same time, similar to withCurrentValueOf
.
See Getting Signal's current value.
These operators re-emit events from each of their parent streams.
stream1.mergewith(stream2, stream3, ...)
emits all of the events that stream1
, stream2
, stream3
, etc. emit. This operator only accepts streams of the same event type, and returns a stream of that same type.
Aliases / helpers:
EventStream.merge(stream1, stream2, stream3, ...)
EventStream.mergeSeq(seqOfStreams)
See also:
mergeWith
works for merging static, known-in-advance sets of streams, but if you want to merge a set of streams that varies over time, you can use flatMapMerge or flattenMerge, EventBus.addSource, or, in Laminar, you can create an EventBus and -->
events into it, to avoid dealing with the owners manually with addSource
.
Both streams and signals have various distinct*
operators to filter updates using ==
or other comparisons. These can be used to make your signals behave like they did prior to v15.0.0 (see blog post), or to achieve different, custom logic:
signal.distinct // performs `==` checks, similar to pre-15.0.0 behaviour
signal.distinctBy(_.id) // performs `==` checks on a certain key
signal.distinctByRef // performs reference equality checks
signal.distinctByFn((prevValue, nextValue) => isSame) // custom checks
signal.distinctErrors((prevErr, nextErr) => isSame) // filter errors in the error channel
signal.distinctTry((prevTryValue, nextTryValue) => isSame) // one comparator for both event and error channels
The same operators are available on streams too.
Note that all distinct
operators assume that the values you pass through them are not mutated. Internally, distinct
compares every new value to the last received value, and it remembers the latter by reference, so if you're always emitting the same instance (e.g. of js.Array
) that you're mutating upstream, the distinct
operator will never be able to detect those mutations, so it will filter them aall out.
Airstream offers several methods and operators that work on up to 9 observables or tuples up to Tuple9:
mapN((a, b, ...) => ???)
Available on observables of (A, B, ...)
tuples
filterN((a, b, ...) => ???)
Available on observables of (A, B, ...)
tuples
observableA.combineWith(observableB, observableC, ...)
There is a bit of magic to this method for convenience. streamOfA.combineWith(streamOfB)
returns a stream of (A, B)
tuples only if neither A nor B are tuple types. Otherwise, combineWith
flattens the tuple types, so for example both streamOfA.combineWith(streamOfB).combineWith(streamOfC)
and streamOfA.combineWith(streamOfB, streamOfC)
return a stream of (A, B, C)
, not ((A, B), C)
. We achieve this using implicit Composition
instances provided by the tuplez library.
observableA.combineWithFn(observableB, ...)((a, b, ...) => ???)
Similar to combineWith
, but you get to provide the combinator instead of relying on tuples. For example: streamOfX.combineWithFn(streamOfY)(Point)
where Point is case class Point(x: Int, y: Int)
.
EventStream.combine(streamA, streamB, ...) et al.
N-arity combine
and combineWithFn
methods are also available on EventStream and Signal companion objects.
observableA.withCurrentValueOf(signalB, signalC, ...)
Same auto-flattening of tuples as combineWith
.
observable.sample(signalA, signalB, ...)
Returns an observable of (A, B, ...)
tuples
Some operators are available only on Event Streams, not Signals. This is by design. For example, filter
is not applicable to Signals because a Signal can't exist without a current value, so signal.filter(_ => false)
would not make any sense. Similarly, you can't delay(ms)
a signal because you can't delay its initial value.
However, you can still use those operators with Signals, you just need to be explicit that you're applying them only to the Signal's changes, not to the initial value of the Signal:
val signal: Signal[Int] = ???
val delayedSignal = signal.composeChanges(changes => changes.delay(1000)) // all updates delayed by one second
val filteredSignal = signal.composeChanges(_.filter(_ % 2 == 0)) // only allows changes with even numbers (initial value can still be odd)
For more advanced transformations, composeAll
operator lets you transform the Signal's initial value as well.
Suppose you have two streams that emit in the same Transaction. Generally you don't know in which order they will emit, unless one of them depends on the other.
If this order matters to you, you can use delaySync
operator to establish the desired order:
val stream1: EventStream[Int] = ???
val stream2: EventStream[Int] = ???
val stream1synced = stream1.delaySync(after = stream2)
stream1synced
synchronously re-emits all values that stream1
feeds into it. Its only guarantee is that if stream1
and stream2
emit in the same transaction, stream1synced
will emit AFTER stream2
(assuming it has observers of course, or it won't emit at all, as usual). Otherwise, stream2
does not affect stream1synced
in any way. Don't confuse this with the sample
operator.
Note: delaySync
is better than a simple delay
because it does not introduce an asynchronous boundary. delaySync
does not use a setTimeout
under the hood. In Airstream terms, stream1synced
synchronously depends on stream1
, so all events in stream1synced
fire in the same transaction as stream1
, which is not the case with stream1.delay(1000)
– those events would fire in a separate Transaction, and at an async delay.
Under the hood delaySync
uses the same pendingObservables
machinery as combinedWith
operator – see Topological Rank docs for an explanation.
Airstream offers a powerful split
operator that splits an observable of M[Input]
into an observable of M[Output]
based on Input => Key
. The functionality of this operator is very generic, so we will explore its properties by diving into concrete examples.
Note: These operators are available on qualifying streams and signals by means of SplittableSignal
and SplittableEventStream
value classes.
This operator is particularly hard to put into words, at least on my first try. You might want to read the split signal into signals
test in SplitEventStreamSpec.scala
And hey, don't be a stranger, remember we have Discord for chat.
If you are familiar with Laminar, consider skipping to the second example
Suppose you have an Signal[List[Foo]]
, and you want to get Signal[Map[String, Signal[Foo]]]
where the keys of the map are Foo ids, and the values of the map are signals of the latest version of a Foo with that id.
The important part here is the desire to obtain individual signals of Foo by id, not to transform a List
into a Map
. Here is how we could do this:
case class Foo(id: String, version: Int)
val inputSignal: Signal[List[Foo]] = ???
val outputSignal: Signal[List[(String, Signal[Foo])]] = inputSignal.split(
key = _.id
)(
project = (key, initialFoo, thisFooSignal) => (key, thisFooSignal)
)
val resultSignal: Signal[Map[String, Signal[Foo]]] = outputSignal.map(list => Map(list: _*))
Let's unpack all this.
In this example our input is a signal of a list of Foo-s, and we split
it into a signal of a list of (fooId, fooSignal)
pairs. In each of those pairs, fooSignal
is a signal that emits a new Foo
whenever inputSignal
emits a value that contains a Foo such that foo.id == fooId
.
So essentially each of the pairs in outputSignal
contains a a foo id and a signal of the latest version of a Foo for this id, as found in inputSignal
.
Finally, in resultSignal
we trivially transform outputSignal
to convert a list to a map.
Suppose you want to render a list of Foo
-s into a list of elements. You know how to render an in