Recursive Flowgraphs
Golang supports multiple readers and writers on a single channel. The run-time system takes care of pairing up two winners for each transmission, and alternating between all possible candidates in a random and fair manner. The first step in demonstrating recursive flowgraphs is to leverage this capability to make a Pool of Node's all of which share a channel for input data and a channel for output data.
// FuncQsort recursively implements a quicksort with goroutines
// (x=qsort(a)).
func FuncQsort(a, x Edge, poolSz, poolReserve int32 ) *Pool {
// Make a pool of qsort nodes that can be dynamically used,
PoolQsort = MakePool(poolSz, poolReserve, "qsort",
[]Edge{a}, []Edge{x}, nil, qsortFire)
return &PoolQsort
}
Then this Pool of quicksort Node's is embedded in a pipeline with a single Node upstream and a single Node downstream. All the quicksort Node's share the same source and destination Edge's:
e,n := flowgraph.MakeGraph(2, poolSz+2)
n[0] = tbi(e[0], pow2)
n[1] = tbo(e[1])
p := flowgraph.FuncQsort(e[0], e[1], poolSz, 1)
copy(n[2:poolSz+2], p.Nodes())
The input test bench (tbi
) uses an extension of [sort.Interface] (https://golang.org/pkg/sort/#Interface) called RecursiveSort to pack up test vectors and pass them to qsort. SubSlice is the only additional method required to do the sort, the other methods are for testing and monitoring the sort:
type RecursiveSort interface {
sort.Interface
// SubSlice returns a sub-slice.
SubSlice(n, m int) Datum
// Slice returns current slice.
Slice() []int
// Original returns original slice
Original() []int
// Depth returns the depth of a recursive sort
Depth() int64
// ID returns a unique ID for the object
ID() int64
}
An unpublished struct bushel
is declared that meets the RecursiveSort API (see tbqsort.go example in flowgraph_test repository for method detail):
type bushel struct {
Slic []int
Orig []int
depth int64
bushelID int64
}
A bushel
is initialized with a random length vector of random integers and sent downstream to the pool of sort Node's:
func tbiRand(pow2 uint) RecursiveSort {
var s bushel
s.bushelID = bushelCnt
bushelCnt += 1
n := rand.Int(1<<pow2)
l := rand.Intn(n)
for i:=0; i<l; i++ {
s.Orig = append(s.Orig, rand.Intn(l))
}
s.Slic = s.Orig
return s
}
func tbi(x flowgraph.Edge) flowgraph.Node {
node := flowgraph.MakeNode("tbi", nil, []*flowgraph.Edge{&x}, nil,
func(n *flowgraph.Node) { x.Val = tbiRand(20) })
return node
}
The input test bench writes a bushel
value to the data channel shared by all qsort Node's in the pool. One of the Node's that is ready (waiting on select) will be chosen to read the data channel. That Node can ack back right away on the single acknowledge channel, so that the next bushel
flowing into the qsort pool can proceed, if and only if there is another Node available in the Pool:
func qsortFire (n *Node) {
a := n.Srcs[0]
x := n.Dsts[0]
p := &PoolQsort
// Ack early if Node available for upstream use.
ackEarly := p.Alloc(n, 1)
if ackEarly {
a.Flow = true
a.SendAck(n)
a.NoOut = true
}
From a Pool an acknowledge does not imply that the downstream Node is ready for more data. Instead it implies that the Pool itself is ready for more data.
This allocation of a Pool Node must be balanced by a freeing of a Pool Node when leaving qsortFire
via a deferred func. Any Node allocated for recursion fan-out must be freed as well.
// If upstream is a Node from PoolQsort.
recursed := n.Recursed()
// Return the right number of Node's to the Pool.
defer func() {
m := 0
if ackEarly { m++ }
if recursed { m++ }
if m!=0 { p.Free(n, m) }
}()
The bushel
is extracted from the Datum but only the RecursiveSort interface is visible, and can be used for displaying state:
av := a.SrcGet()
d,ok := av.(RecursiveSort)
if !ok {
n.LogError("not of type RecursiveSort (%T)\n", a.Val)
return
}
n.Tracef("Original(%p) sorted %t, Sliced sorted %t, depth=%d, id=%d, len=%d, poolsz=%d\n",
d.Original(), d.OriginalSorted(), d.SliceSorted(), d.Depth(), d.ID(), d.Len(), p.size )
If the slice to be sorted is small enough so that it no longer makes sense to distribute the work through recursion to other goroutines the sort is done locally. Also the sort is done locally if there are not enough free Node's in the Pool to recurse into. In this manner the Pool will never completely empty, because there are always a reserved number of Node's of each possible input (each upstream Node with a copy of the data channel).
n.NodeWrap(d)
wraps result with a Node pointer, so the downstream Node can steer its ack back to the right Node. x.NoOut
set to true
inhibits further output on this Edge this time around.
l := d.Len()
if l <= 4096 || !p.Alloc(n, 2) {
sort.Sort(d)
x.DstPut(n.NodeWrap(d))
x.SendData(n)
return
}
If the rest of the quicksort can be dealt with recursively, the first step is to do the pivot operation on the entire slice at this point. doPivot
returns the low and high indices of the quicksort pivot and sorts all values into above and below this pivot.
doPivot
in the flowgraph package is a verbatim copy of the unexported sort.doPivot
(the GO-LICENSE is distributed with the package). In the event sort.doPivot
becomes sort.DoPivot
this code redundancy will go away. In the event sort.doPivot
disappears this example will keep working.
mlo,mhi := doPivot(d, 0, l)
Knowing where to recurse, the slice undergoing sort is split into above and below the pivot, and each is sent back into the quicksort pool. This is accomplished by modifying the output Edge x
so that it reuses the data channel from the input Edge a
. Once again NodeWrap
is used to wrap the newly created slice Datum with the current Node, so that the downstream Node (which will also be a quicksort Node) can ack back. Notice if the high or low sub-slice is of length 0 or 1 the Node allocated for it is freed.
// Make a substitute output Edge to point back to the Pool.
xBack := x.PoolEdge(a)
var lo,hi Datum
if mlo>1 {
lo = n.NodeWrap(d.SubSlice(0, mlo))
xBack.DstPut(lo)
xBack.SendData(n)
x.RdyCnt++
} else {
p.Free(n, 1)
}
if l-mhi>1 {
hi = n.NodeWrap(d.SubSlice(mhi, l))
xBack.DstPut(hi)
xBack.SendData(n)
x.RdyCnt++
} else {
p.Free(n, 1)
}
Finally pack both outputs into the actual output Edge x
so the trace system can output them both (separated by a "|"), and leave any further output for x
disabled (by not using Edge.DstPut to set Edge.Flow).
x.Val = DoubleDatum{lo, hi} // for tracing as lo|hi.
}
For the full example see the tbqsort Makefile target in http://github.com/vectaport/flowgraph_test.