-
Notifications
You must be signed in to change notification settings - Fork 39
/
EventStream.scala
625 lines (546 loc) · 23.4 KB
/
EventStream.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
package reactive
import java.util.TimerTask
import reactive.logging.Logger
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, Future}
import scala.ref.WeakReference
import scala.util.DynamicVariable
object EventStream {
private object empty0 extends EventSource[Nothing] {
override def debugName = "EventStream.empty"
}
def empty[A]: EventStream[A] = empty0
}
/**
* An EventStream is a source of events (arbitrary values sent to listener functions).
* You can fire events from it, you can react to events with any behavior, and you can
* create derived EventStreams, whose events are based on the original EventStream.
* The API is modeled after the Scala standard library collections framework.
*
* An EventStream is like a collection in the sense that it consists of multiple values.
* However, unlike actual collections, the values are not available upon request; they
* occur whenever they occur. Nevertheless, many operations that apply to collections
* apply to event streams. To react to events, use foreach or foldLeft. To create derived,
* transformed EventStreams, use map, flatMap, filter, foldLeft, and the | (union) operator.
* Note that you can of course use for comprehensions as syntactic sugar for many
* of the above.
*
* Methods that return a new EventStream generally do not require an (implicit) Observing.
* Instead, the new EventStream itself holds references to the parent EventStream and
* the event function (which refers to, or is, the function passed in to the method).
* (As a result, if you derive EventStreams with a function that performs side effects,
* in order to ensure that the function is not garbage collected you must retain a reference
* to the resulting EventStream.)
*
* On the other hand, methods which do require an Observing take functions which are expected
* perform side effects, and therefore do not hold a reference to the function themselves but
* rather use the Observing for that purpose. As a result, they will remain in memory as long
* as the Observing object does.
*
* You can also create a Signal from an EventStream using hold.
* @tparam T the type of values fired as events
* @see EventSource
*/
trait EventStream[+T] extends Foreachable[T] {
/**
* Registers a listener function to run whenever
* an event is fired. The function may be held in a `WeakReference`.
* A strong reference is placed in the `Observing`, so
* if the `Observing` is garbage collected,
* the listener may be as well.
* @param f a function to be applied on every event
* @param observing the object whose gc lifetime should determine that of the function
*/
def foreach(f: T => Unit)(implicit observing: Observing): Unit = {
val subscription = subscribe(f)
observing.addSubscription(subscription)
}
/**
* Registers a listener function to run whenever
* an event is fired, and a returns a [[Subscription]]
* that can be used to remove the listener.
* The function may be held in a `WeakReference`.
* A strong reference is placed in the `Subscription`, so
* the `Subscription` is garbage collected,
* the listener may be as well.
* @param f a function to be applied on every event
*/
def subscribe(f: T => Unit): Subscription
/**
* Returns a new EventStream, that for every event that this EventStream
* fires, that one will fire an event that is the result of
* applying 'f' to this EventStream's event.
* @param f the function that transforms events fired by this EventStream
* into events to be fired by the resulting EventStream.
*/
def map[U](f: T => U): EventStream[U]
/**
* Create a new EventStream that consists of the events of
* the EventStreams returned by f. f is applied on every
* event of the original EventStream, and its returned
* EventStream is used until the next event fired by
* the original EventStream, at which time the previously
* returned EventStream is no longer used and a new one
* is used instead.
* @param f the function that is applied for every event
* to produce the next segment of the resulting EventStream.
*/
def flatMap[U](f: T => EventStream[U]): EventStream[U]
/**
* Returns a new EventStream that propagates a subset of the events that
* this EventStream fires.
* @param f the predicate function that determines which events will
* be fired by the new EventStream.
*/
def filter(f: T => Boolean): EventStream[T]
/**
* Filter and map in one step. Takes a PartialFunction.
* Whenever an event is received, if the PartialFunction
* is defined at that event, the value returned by applying
* it will be fired.
* @param pf the PartialFunction
*/
def collect[U](pf: PartialFunction[T, U]): EventStream[U]
/**
* Returns a new EventStream that propagates this EventStream's events
* until the predicate returns false.
*
* @param p the predicate function, taking an event as its argument
* and returning true if event propagation should continue
*/
def takeWhile(p: T => Boolean): EventStream[T]
/**
* Allows one, in a functional manner, to respond to an event while
* taking into account past events.
* For every event t, f is called with arguments (u, t), where u
* is initially the value of the 'initial' parameter, and subsequently
* the result of the previous application of f.
* Returns a new EventStream that, for every event t fired by
* the original EventStream, fires the result of the application of f
* (which will also be the next value of u passed to it).
* Often 'u' will be an object representing some accumulated state.
* For instance, given an EventStream[Int] named 'es',
* es.foldLeft(0)(_ + _)
* would return an EventStream that, for every (integer) event fired
* by es, would fire the sum of all events that have been fired by es.
*/
def foldLeft[U](initial: U)(f: (U, T) => U): EventStream[U]
/**
* Union of two EventStreams.
* Returns a new EventStream that consists of all events
* fired by both this EventStream and 'that.'
* @param that the other EventStream to combine in the resulting
* EventStream.
*/
def |[U >: T](that: EventStream[U]): EventStream[U]
/**
* Returns a Signal whose value is initially the 'init' parameter,
* and after every event fired by this EventStream, the value of
* that event.
* @param init the initial value of the signal
*/
def hold[U >: T](init: U): Signal[U]
/**
* Returns a derived EventStream that does not fire events
* during a prior call to fire on the same thread, thus
* preventing infinite recursion between multiple event streams
* that are mutually dependent.
*/
def nonrecursive: EventStream[T]
/**
* Returns a derived EventStream that only fires events that are not equal to the
* previous event. This can be used to prevent infinite recursion between multiple
* event streams that are mutually dependent in a consistent manner.
*/
def distinct: EventStream[T]
/**
* Returns a derived event stream in which event propagation does not happen on the thread firing
* the event, but instead is executed by the provided `ExecutionContext`.
* Chained `Future`s are used to ensure the propagation happens sequentially.
*/
def async(implicit executionContext: ExecutionContext): EventStream[T]
/**
* Returns a derived event stream in which event propagation does not happen on the thread firing
* the event, but instead is executed by the global `ExecutionContext`.
* Chained `Future`s are used to ensure the propagation happens sequentially.
*/
final def nonblocking: EventStream[T] = async(ExecutionContext.global)
/**
* Returns an EventStream whose tuple-valued events include a function for testing staleness.
* The events will be of type (T, ()=>Boolean), where T is the type of the
* parent event stream; and the tuple will contain the event fired in the parent
* as well as a function that can be used to test
* whether that event is outdated because a new event has been fired since then.
* This is especially useful in conjunction with 'nonblocking',
* because its actor implementation means that a new event cannot
* be received until the previous event is finished being handled.
* The test function is useful because it may be desirable to abort the time-consuming work
* if a new event has been fired since then.
* Example usage:
* for((v, isSuperseded) <- eventStream.zipWithStaleness) { doSomeWork(); if(!isSuperseded()) doSomeMoreWork() }
*/
def zipWithStaleness: EventStream[(T, () => Boolean)]
/**
* Returns an EventStream that only fires events that are not
* followed by another event within ''period'' milliseconds.
* For instance, if you want to display some results in response to
* the user typing, and you do not want to perform more work than
* necessary, you may want to wait until the user has not typed
* anything for a full second.
*/
def throttle(period: Long): EventStream[T]
/**
* Converts this EventStream of pairs into two EventStreams of the first
* and second half of each pair. For each event that this EventStream fires,
* the returned event streams will fire the first half and second half of
* that event, respectively.
*/
def unzip[A, B](implicit ev: T <:< (A, B)): (EventStream[A], EventStream[B]) =
map { t => ev(t)._1 } -> map { t => ev(t)._2 }
/**
* Converts this EventStream of `Either`s into two EventStreams of the
* `Left` and `Right` halves, respectively. For each `Left` event that
* this EventStream fires, the first of the returned streams will fire
* the data inside of that `Left`. Similarly, for each `Right` event
* that this EventStream fires, the second of the returned streams will
* fire the data inside of that `Right`.
*/
def uneither[A, B](implicit ev: T <:< Either[A,B]): (EventStream[A], EventStream[B]) =
collect{ case t if ev(t).isLeft => ev(t).left.get } ->
collect{ case t if ev(t).isRight => ev(t).right.get }
private[reactive] def addListener(f: T => Unit): Unit
private[reactive] def removeListener(f: T => Unit): Unit
def debugString: String
def debugName: String
}
class NamedFunction[-T, +R](name: => String, f: T => R) extends (T => R) {
def apply(in: T): R = f(in)
override def toString: String = "%s #%s" format(name, System.identityHashCode(f))
}
object NamedFunction {
def apply[T, R](name: => String)(f: T => R) = new NamedFunction(name, f)
}
/**
* A basic implementation of EventStream,
* adds fire method.
*/
//TODO perhaps EventSource = SimpleEventStream + fire
class EventSource[T] extends EventStream[T] with Logger {
case class HasListeners(listeners: List[WeakReference[T => Unit]])
case class FiringEvent(event: T, listenersCount: Int, collectedCount: Int)
case class AddingListener(listener: T => Unit)
case class AddedForeachListener(listener: T => Unit)
/**
* When n empty WeakReferences are found, purge them
*/
protected val purgeThreshold = 10
abstract class ChildEventSource[U, S](protected var state: S) extends EventSource[U] {
private val parent = EventSource.this
protected def handler: (T, S) => S
private val h = handler
protected val listener: T => Unit = NamedFunction[T, Unit](debugName + ".listener")(v => synchronized {
state = h(v, state)
})
parent addListener listener
}
class FlatMapped[U](initial: Option[T])(val f: T => EventStream[U]) extends ChildEventSource[U, Option[EventStream[U]]](initial map f) {
override def debugName: String = "%s.flatMap(%s)" format(EventSource.this.debugName, f)
val fireFunc: U => Unit = fire
state foreach { _ addListener fireFunc }
def handler: (T, Option[EventStream[U]]) => Some[EventStream[U]] = (parentEvent, lastES) => {
lastES foreach { _ removeListener fireFunc }
val newES = Some(f(parentEvent))
newES foreach { _ addListener fireFunc }
newES
}
}
class Throttled(delay: Long) extends ChildEventSource[T, (Option[T], Long)](None -> System.currentTimeMillis) {
override def debugName: String = EventSource.this.debugName + ".throttle(" + delay + ")"
private def onTimer(): Unit = {
val t1 = System.currentTimeMillis
state._1 match {
case Some(e) => fire(e)
case _ =>
}
state = None -> t1
}
var tt: TimerTask = _timer.schedule(delay)(onTimer())
def handler: (T, (Option[T], Long)) => (Some[T], Long) = {
case (event, _) =>
tt.cancel()
tt = _timer.schedule(delay)(onTimer())
Some(event) -> System.currentTimeMillis
}
}
class FoldedLeft[U](initial: U, f: (U, T) => U) extends ChildEventSource[U, U](initial) {
override def debugName: String = "%s.foldLeft(%s)(%s)" format(EventSource.this.debugName, initial, f)
def handler: (T, U) => U = (event, last) => {
val next = f(last, event)
fire(next)
next
}
}
class Collected[U](pf: PartialFunction[T, U]) extends ChildEventSource[U, Unit](()) {
override def debugName: String = "%s.collect(%s)" format(EventSource.this.debugName, pf)
private val pf0 = pf
def handler: (T, Unit) => Unit = (event, _) => {
if (pf.isDefinedAt(event))
fire(pf apply event)
}
}
private type WithVolatility[U] = (U, () => Boolean)
class AsyncEventStream(implicit executionContext: ExecutionContext) extends ChildEventSource[T, Unit](()) {
override def debugName: String = "%s.async" format EventSource.this.debugName
private val future = new AtomicRef(Future.successful(()))
def handler: (T, Unit) => Unit = {
case (parentEvent, _) =>
future.transform( _ andThen {
case _ => fire(parentEvent)
})
}
}
private var listeners: List[WeakReference[T => Unit]] = Nil
/**
* Whether this EventStream has any listeners depending on it
*/
//TODO should it return false if it has listeners that have been gc'ed?
def hasListeners: Boolean = listeners.nonEmpty //&& listeners.forall(_.get.isDefined)
def debugName: String = "(eventSource: %s #%s)".format(getClass, System.identityHashCode(this))
def debugString: String = {
"%s\n listeners%s".format(
debugName,
listeners.flatMap(_.get).mkString("\n ", "\n ", "\n")
)
}
/**
* Sends an event to all listeners.
* @param event the event to send
*/
def fire(event: T): Unit = {
trace(
FiringEvent(
event,
listeners.size,
listeners.length - listeners.count(_.get ne None)
)
)
trace(HasListeners(listeners))
var empty = 0
@tailrec def next(ls: List[WeakReference[T => Unit]]): Unit = ls match {
case Nil =>
case xs =>
val l = xs.head.get
if (l.isDefined) l.get apply event else empty += 1
next(ls.tail)
}
next(listeners)
if (empty >= purgeThreshold) listeners = listeners.filter(_.get.isDefined)
}
def flatMap[U](f: T => EventStream[U]): EventStream[U] =
new FlatMapped(None)(f)
@deprecated("Use eventStream.hold(initial).flatMap(f)", "0.2")
def flatMap[U](initial: T)(f: T => EventStream[U]): EventStream[U] =
new FlatMapped(Some(initial))(f) {
override def debugName: String = "%s.flatMap(%s)(%s)" format(EventSource.this.debugName, initial, f)
}
def collect[U](pf: PartialFunction[T, U]): EventStream[U] = new Collected(pf)
def map[U](f: T => U): EventStream[U] = {
new ChildEventSource[U, Unit](()) {
override def debugName: String = "%s.map(%s)" format(EventSource.this.debugName, f)
val f0: T => U = f
def handler: (T, Unit) => Unit = (event, _) => this.fire(f(event))
}
}
override def foreach(f: T => Unit)(implicit observing: Observing): Unit = {
super.foreach(f)(observing)
trace(AddedForeachListener(f))
}
def subscribe(f: T => Unit): Subscription = {
val subscription = new Subscription {
ref = (f, this)
def cleanUp(): Unit = removeListener(f)
}
addListener(f)
trace(HasListeners(listeners))
subscription
}
def filter(f: T => Boolean): EventStream[T] = new ChildEventSource[T, Unit](()) {
override def debugName: String = "%s.filter(%s)" format(EventSource.this.debugName, f)
val f0: T => Boolean = f
def handler: (T, Unit) => Unit = (event, _) => if (f(event)) fire(event)
}
def takeWhile(p: T => Boolean): EventStream[T] = new ChildEventSource[T, Unit](()) {
override def debugName: String = EventSource.this.debugName + ".takeWhile(" + p + ")"
def handler: (T, Unit) => Unit = (event, _) =>
if (p(event))
fire(event)
else
EventSource.this.removeListener(listener)
}
def foldLeft[U](initial: U)(f: (U, T) => U): EventStream[U] = new FoldedLeft(initial, f)
def nonrecursive: EventStream[T] = new ChildEventSource[T, Unit](()) {
override def debugName: String = "%s.nonrecursive" format EventSource.this.debugName
protected val firing = new scala.util.DynamicVariable(false)
def handler: (T, Unit) => Unit = (event, _) => if (!firing.value) firing.withValue(true) {
fire(event)
}
}
def distinct: EventStream[T] = {
val folded = new FoldedLeft[List[T]](Nil, {
case (Nil, e) => e :: Nil
case (old :: _, e) => e :: old :: Nil
})
val pf: PartialFunction[List[T], T] = {
case e :: Nil => e
case e :: old :: _ if e != old => e
}
new folded.Collected(pf) {
override def debugName: String = EventSource.this.debugName + ".distinct"
}
}
def |[U >: T](that: EventStream[U]): EventStream[U] = new EventSource[U] {
override def debugName: String = "(" + EventSource.this.debugName + " | " + that.debugName + ")"
val parent: EventSource[T] = EventSource.this
val f: U => Unit = fire
EventSource.this addListener f
that addListener f
}
def hold[U >: T](init: U): Signal[U] = new Signal[U] {
override def debugName: String = EventSource.this.debugName + ".hold(" + init + ")"
private lazy val initial: U = init
private var current: U = init
def now: U = current
val change: EventSource[T] = EventSource.this
val f: T => Unit = (v: T) => current = v
change addListener f
}
def zipWithStaleness: EventStream[(T, () => Boolean)] = new ChildEventSource[WithVolatility[T], Option[Volatility]](None) {
override def debugName: String = "%s.withStaleness" format EventSource.this.debugName
def handler: (T, Option[Volatility]) => Some[Volatility] = {
case (parentEvent, volatilityOption) =>
volatilityOption.foreach(_.stale = true)
val volatility = new Volatility
fire((parentEvent, volatility))
Some(volatility)
}
}
def throttle(period: Long): EventStream[T] = new Throttled(period)
override def async(implicit executionContext: ExecutionContext): EventStream[T] = new AsyncEventStream()(executionContext)
private[reactive] def addListener(f: T => Unit): Unit = synchronized {
trace(AddingListener(f))
listeners :+= new WeakReference(f)
}
private[reactive] def removeListener(f: T => Unit): Unit = synchronized {
//remove the last listener that is identical to f
listeners.lastIndexWhere(_.get.exists(f.eq)) match {
case -1 =>
case n =>
listeners = listeners.patch(n, Nil, 1)
}
}
}
/**
* This trait adds the ability to an event stream
* to fire an event when the first listener is
* added.
* @author nafg
*
*/
trait TracksAlive[T] extends EventSource[T] {
private val aliveVar = Var(false)
/**
* This signal indicates whether the event stream
* is being listened to
*/
val alive: Signal[Boolean] = aliveVar.map{ x => x } // read only
override def foreach(f: T => Unit)(implicit observing: Observing): Unit = {
if (!aliveVar.now) {
aliveVar() = true
}
super.foreach(f)(observing)
}
}
/**
* This EventStream allows one to block events
* from within a certain scope. This can be used
* to help prevent infinite loops when two EventStreams may depend on each other.
*/
//TODO suppressable event streams' transformed derivatives
//should also be Suppressable
trait Suppressable[T] extends EventSource[T] {
protected val suppressed = new DynamicVariable(false)
/**
* Runs code while suppressing events from being fired on the same thread.
* While running the code, calls to 'fire' on the same thread do nothing.
* @param p the code to run while suppressing events
* @return the result of evaluating p
*/
def suppressing[R](p: => R): R = suppressed.withValue(true)(p)
override def fire(event: T): Unit = if (!suppressed.value) super.fire(event)
}
/**
* This EventStream fires SeqDeltas (Seq deltas) and can batch them up.
*/
//TODO batchable event streams' transformed derivatives
//should also be Batchable
trait Batchable[A, B] extends EventSource[SeqDelta[A, B]] {
protected val batch = new DynamicVariable(List[SeqDelta[A, B]]())
private val inBatch = new DynamicVariable(false)
/**
* Runs code while batching messages.
* While the code is running, calls to 'fire' on the same
* thread will not fire messages immediately, but will collect
* them. When the code completes, the messages are wrapped in a
* single Batch which is then fired. If there is only one message
* to be fired it is not wrapped in a Batch but fired directly.
* Nested calls to batching are ignored, so all messages
* collected from within the outermost call are collected and
* they are fired in one batch at the end.
* @param p the code to run
* @return the result of evaluating p
*/
def batching[R](p: => R): R = if (batch.value.isEmpty) {
val ret = inBatch.withValue(true) { p }
batch.value match {
case Nil =>
case msg :: Nil =>
super.fire(msg)
case msgs =>
super.fire(Batch(msgs.reverse: _*))
}
batch.value = Nil
ret
} else {
p
}
override def fire(msg: SeqDelta[A, B]): Unit = {
if (inBatch.value)
batch.value ::= msg
else
super.fire(msg)
}
}
/**
* An EventStream that is implemented by delegating everything to another EventStream
*/
trait EventStreamProxy[T] extends EventStream[T] {
protected[this] def underlying: EventStream[T]
override def debugString: String = underlying.debugString
override def debugName: String = underlying.debugName
override def flatMap[U](f: T => EventStream[U]): EventStream[U] = underlying.flatMap[U](f)
override def foldLeft[U](z: U)(f: (U, T) => U): EventStream[U] = underlying.foldLeft[U](z)(f)
override def map[U](f: T => U): EventStream[U] = underlying.map[U](f)
override def subscribe(f: T => Unit): Subscription = underlying.subscribe(f)
override def |[U >: T](that: EventStream[U]): EventStream[U] = underlying.|(that)
override def filter(f: T => Boolean): EventStream[T] = underlying.filter(f)
override def collect[U](pf: PartialFunction[T, U]): EventStream[U] = underlying.collect(pf)
override def takeWhile(p: T => Boolean): EventStream[T] = underlying.takeWhile(p)
override def hold[U >: T](init: U): Signal[U] = underlying.hold(init)
override def nonrecursive: EventStream[T] = underlying.nonrecursive
override def distinct: EventStream[T] = underlying.distinct
override def async(implicit ec: ExecutionContext): EventStream[T] = underlying.async(ec)
override def zipWithStaleness: EventStream[(T, () => Boolean)] = underlying.zipWithStaleness
override def throttle(period: Long): EventStream[T] = underlying.throttle(period)
private[reactive] def addListener(f: T => Unit): Unit = underlying.addListener(f)
private[reactive] def removeListener(f: T => Unit): Unit = underlying.removeListener(f)
}