Skip to content
This repository
Browse code

Second cut at implementation of the DataStream monad, Duration, etc.

This version takes into account that a data stream's first value may not
be available immediately on creation.  It also defines the combinators
in ways that are, at least, less obviously wrong.  (For example, the
initial value of the stream returned by "someStream.filter( pred )"
will always satisfy the predicate, even if the most recent value of the
underlying stream, at the time of creation, does *not*.)
  • Loading branch information...
commit 216377f87851cbee6145f243bbcd56f8b9183786 1 parent 0d1498c
Robert Thau authored June 17, 2012

Showing 1 changed file with 76 additions and 32 deletions. Show diff stats Hide diff stats

  1. 108  src/main/scala/notifier/DataStream.scala
108  src/main/scala/notifier/DataStream.scala
@@ -37,22 +37,38 @@ import scala.collection.mutable.HashMap
37 37
 
38 38
 trait DataStream[T] 
39 39
 {
40  
-  /** Most recent value of this DataStream */
  40
+  /** A future for the first value of this DataStream, as of the time
  41
+    * of its creation.
  42
+    *
  43
+    * If the implementation only calls `noteNewValue` and never calls
  44
+    * `noteNewFuture` deriving from ExplicitNotificationDataStream
  45
+    * will handle the plumbing...
  46
+    */
41 47
 
42  
-  def currentValue: T
43  
-  protected var cachedCurrent = currentValue
  48
+  protected def initialFuture: Future[T]
  49
+  private [positronicnet] var cachedCurrentFuture = initialFuture
44 50
 
45  
-  /** Called by the implementation to notify whoever's listening to the
46  
-    * stream that a new value is available.
  51
+  /** May be called by the implementation to notify whoever's listening to the
  52
+    * stream that a future for a new value is available.
47 53
     */
48 54
 
49  
-  protected def notifyNewValue: Unit = {
50  
-    this.cachedCurrent = currentValue
  55
+  protected def noteNewFuture( fut: Future[T] ): Unit = {
  56
+    this.cachedCurrentFuture = fut
  57
+    for (listener <- listeners.values)
  58
+      fut.onSuccess{ listener( _ ) }
  59
+  }
  60
+
  61
+  /** May be called by the implementation to notify that a particular new
  62
+    * value has arrived.
  63
+    */
  64
+  
  65
+  protected def noteNewValue( value: T ): Unit = {
  66
+    this.cachedCurrentFuture = Future( value )
51 67
     for (listener <- listeners.values)
52  
-      listener( this.cachedCurrent )
  68
+      listener( value )
53 69
   }
54 70
 
55  
-  /** Receive this stream's current value, and further values from this
  71
+  /** Receive this stream's next available value, and further values from this
56 72
     * stream as they become available, due changes in the underlying
57 73
     * resources.
58 74
     *
@@ -61,7 +77,7 @@ trait DataStream[T]
61 77
     */
62 78
 
63 79
   def withValues( handler: T => Unit ): Unit = {
64  
-    handler( currentValue )
  80
+    cachedCurrentFuture.onSuccess{ handler( _ ) }
65 81
     addListener( new Object, handler )  // dummy tag
66 82
   }
67 83
 
@@ -112,62 +128,90 @@ trait DataStream[T]
112 128
   private[this] var listeners = new HashMap[ AnyRef, T => Unit ]
113 129
 }
114 130
 
  131
+/** Class representing a condition that starts or stops (e.g., some
  132
+  * activity being active.  In implementation, it is a DataStream of
  133
+  * element type DurationEvent, which in turn has the two distinguished
  134
+  * values DurationStart and DurationStop.
  135
+  *
  136
+  * It is conceded that the naming here sucks.
  137
+  */
  138
+
115 139
 trait Duration extends DataStream[DurationEvent]
116 140
 
117 141
 class DurationEvent
118 142
 case object DurationStart extends DurationEvent
119 143
 case object DurationStop extends DurationEvent
120 144
 
  145
+/** Useful base class (or trait) for DataStreams whose implementations
  146
+  * naturally call only `noteNewValue`, and not `notifyNewValue`.
  147
+  */
  148
+
  149
+trait ExplicitNotificationDataStream[T] extends DataStream[T]
  150
+{
  151
+  final protected override def noteNewFuture( fut: Future[T] ) =
  152
+    throw new RuntimeException( "Called noteNewFuture on a " + 
  153
+                                this.getClass.getName )
  154
+
  155
+  protected val initialFuture = new Future[T]
  156
+  private var haveInitialValue = false
  157
+
  158
+  override protected def noteNewValue( newVal: T ): Unit = {
  159
+    if (! haveInitialValue ) {
  160
+      haveInitialValue = true
  161
+      initialFuture.succeed( newVal )
  162
+    }
  163
+    super.noteNewValue( newVal )
  164
+  }
  165
+}
  166
+
121 167
 private class DurationFilteredDataStream[T]( duration: Duration,
122 168
                                              underlying: DataStream[T] )
123 169
   extends DataStream[T]
124 170
 {
125  
-  def currentValue = underlying.currentValue
  171
+  protected def initialFuture = underlying.cachedCurrentFuture
126 172
 
127  
-  duration.addListener ( this,  _ match {
  173
+  duration.withValues{  _ match {
128 174
 
129 175
     case DurationStart =>
130  
-      this.notifyNewValue
131  
-      underlying.addListener( this, dummy => this.notifyNewValue )
  176
+      this.noteNewFuture( underlying.cachedCurrentFuture )
  177
+      underlying.addListener( this, this.noteNewValue( _ ))
132 178
 
133 179
     case DurationStop =>
134 180
       underlying.removeListener( this )
135  
-  })
  181
+  }}
136 182
 }
137 183
 
138 184
 private[notifications] 
139 185
 class FlatMapProxyStream[T,V]( mapFunc: T => DataStream[V],
140 186
                                underlying: DataStream[T] ) 
141  
-  extends DataStream[V]
  187
+  extends ExplicitNotificationDataStream[V]
142 188
 {
143  
-  private[this] var vstream = mapFunc( underlying.currentValue )
144  
-
145  
-  def currentValue = vstream.currentValue
  189
+  private[this] var vstream: DataStream[V] = null
146 190
 
147  
-  vstream.addListener( this, dummy => this.notifyNewValue )
148  
-
149  
-  underlying.addListener( this, newVal => {
150  
-    vstream.removeListener( this )
  191
+  underlying.withValues{ newVal => {
  192
+    if (vstream != null) vstream.removeListener( this )
151 193
     vstream = mapFunc( newVal )
152  
-    vstream.addListener( this, dummy => this.notifyNewValue)
153  
-  })
  194
+    vstream.cachedCurrentFuture.onSuccess{ this.noteNewValue( _ ) }
  195
+    vstream.addListener( this, this.noteNewValue( _ ))
  196
+  }}
154 197
 }
155 198
 
156 199
 private[notifications] 
157 200
 class MapProxyStream[T,V]( mapFunc: T => V,
158 201
                            underlying: DataStream[T] ) 
159  
-  extends DataStream[T]
  202
+  extends ExplicitNotificationDataStream[V]
160 203
 {
161  
-  def currentValue = underlying.currentValue
162  
-  underlying.addListener( this, dummy => this.notifyNewValue )
  204
+  underlying.withValues{ value => this.noteNewValue( mapFunc( value )) }
163 205
 }
164 206
 
165 207
 private[notifications] 
166 208
 class FilterProxyStream[T,V]( pred: T => Boolean,
167 209
                               underlying: DataStream[T] ) 
168  
-  extends DataStream[T]
  210
+  extends ExplicitNotificationDataStream[T]
169 211
 {
170  
-  def currentValue = underlying.currentValue
171  
-  underlying.addListener( this, 
172  
-    newVal => if (pred(newVal)) this.notifyNewValue)
  212
+  underlying.withValues{ value =>
  213
+    if (pred( value ))
  214
+      this.noteNewValue( value )
  215
+  }
173 216
 }
  217
+

0 notes on commit 216377f

Please sign in to comment.
Something went wrong with that request. Please try again.