Skip to content
laforge49 edited this page Oct 28, 2011 · 9 revisions

The IntersectionSeq[K,V] class is used to create an intersection of multiple sequences. These streams are joined by their keys and only keys which are present in all the sequences are included. The values in the intersection stream is a list of all the values associate with the same key.

Because of the parallel nature of the algorithm used, the conditional tailrec pattern (see Recursion) had to be extended. So it is worth looking at the implementation. But first lets see how to use IntersectionSeq.

Synchronized test code.

val seqs = new java.util.ArrayList[Sequence[Int, Int]]
val range = new Range(1, 200)
seqs.add(new FilterSeq(range, (x: Int) => x % 2 == 0))
seqs.add(new FilterSeq(range, (x: Int) => x % 3 == 0))
seqs.add(new FilterSeq(range, (x: Int) => x % 5 == 0))
val intersection = new IntersectionSeq(null, seqs)
Future(intersection, Loop((key: Int, value: java.util.List[Int]) => println(key+" "+value)))

Output.

30 [30, 30, 30]
60 [60, 60, 60]
90 [90, 90, 90]
120 [120, 120, 120]
150 [150, 150, 150]
180 [180, 180, 180]

Asynchronous test code.

val seqs = new java.util.ArrayList[Sequence[Int, Int]]
val range2 = new Range(1, 200)
range2.setMailbox(new ReactorMailbox)
val range3 = new Range(1, 200)
range3.setMailbox(new ReactorMailbox)
val range5 = new Range(1, 200)
range5.setMailbox(new ReactorMailbox)
seqs.add(new FilterSeq(range2, (x: Int) => x % 2 == 0))
seqs.add(new FilterSeq(range3, (x: Int) => x % 3 == 0))
seqs.add(new FilterSeq(range5, (x: Int) => x % 5 == 0))
val intersection = new IntersectionSeq(new ReactorMailbox, seqs)
Future(intersection, Loop((key: Int, value: java.util.List[Int]) => println(key+" "+value)))

(Output is the same as the above.)

IntersectionSeqTest

##Cursor

IntersectionSeq uses the Cursor class as a wrapper for the sequences it operates on. The Cursor class tracks the last KVPair result returned. Cursor objects are also comparable based on the key of the last returned KVPair.

Cursor objects are created with the same mailbox as the IntersectionSeq object, allowing the IntersectionSeq object to directly access the state of the Cursor objects.

Because Cursor objects carry state, they could complicate the use of IntersectionSeq objects. This is however avoided by creating new Cursor objects for every request received by an IntersectionSeq object. Fortunately, Cursor is fairly light weight. And this way IntersectionSeq is an immutable actor.

Cursor

##IntersectionSeq

The problem facing IntersectionSeq is when it receives a KVPair response with a larger key than was contained in the other responses. It then sends multiple requests to those other sequences for a key/value pair with the smallest key that is equal to or greater than the large key it just received. At that point IntersectionSeq will receive multiple responses, and those responses can be a mixed bag of synchronous and asynchronous. So the conditional tailrec pattern needed to be extended a bit.

We still use the async flag, but have replaced the sync flag with a list of synchronously received responses. It's a small change, but enough to cover the situation.

IntersectionSeq

Tutorial

Clone this wiki locally