Permalink
Browse files

adding interval

  • Loading branch information...
1 parent 88cf2f9 commit 6204a4c05789cadd3c47eb5fc3b4d8fbaa54b722 @kassens committed Jan 13, 2011
Showing with 25 additions and 7 deletions.
  1. +9 −7 src/main/scala/CEP.scala
  2. +16 −0 src/test/scala/MySuite.scala
View
@@ -10,22 +10,23 @@ object time {
*/
protected[events] def now = current
- def skip(seconds: Int) {
+ def add(seconds: Int) {
assert(seconds > 0)
current += seconds
}
}
class JointEventNode[T, U, V](ev1: Event[T], ev2: => Event[U], interval: Int, merge: (T, U) => V) extends EventNode[V] {
- var events1: List[T] = Nil
- var events2: List[U] = Nil
+ var events1: List[(Int, T)] = Nil
+ var events2: List[(Int, U)] = Nil
/*
* Reaction to event1
*/
lazy val onEvt1 = (id: Int, v1: T, reacts: ListBuffer[() => Unit]) => {
- events1 = v1 :: events1
- for (v2 <- events2.reverse) {
+ events1 = (time.now + interval, v1) :: events1
+ events2 = events2.filter(_._1 >= time.now)
+ for ((_, v2) <- events2.reverse) {
reactions(id, merge(v1, v2), reacts)
}
}
@@ -34,8 +35,9 @@ class JointEventNode[T, U, V](ev1: Event[T], ev2: => Event[U], interval: Int, me
* Reaction to event2
*/
lazy val onEvt2 = (id: Int, v2: U, reacts: ListBuffer[() => Unit]) => {
- events2 = v2 :: events2
- for (v1 <- events1.reverse) {
+ events2 = (time.now + interval, v2) :: events2
+ events1 = events1.filter(_._1 >= time.now)
+ for ((_, v1) <- events1.reverse) {
reactions(id, merge(v1, v2), reacts)
}
}
@@ -104,4 +104,20 @@ class MySuite extends FunSuite with OneInstancePerTest {
assertLogged("bs")((1, 3), (2, 3), (1, 4), (2, 4))
}
+
+ test("join with interval") {
+ val buyJoinSell = buy join (sell, 10, (a: Int, b: Int) => (a, b))
+ buyJoinSell += logger("bs")
+
+ buy(1)
+ sell(2)
+ sell(3)
+ sell(4)
+ time.add(11)
+ buy(5)
+ sell(6)
+ sell(7)
+
+ assertLogged("bs")((1, 2), (1, 3), (1, 4), (5, 6), (5, 7))
+ }
}

0 comments on commit 6204a4c

Please sign in to comment.