Skip to content

Commit

Permalink
Concurrency seems to work well; cleaned up seq a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
zot committed Nov 9, 2010
1 parent b8cbe62 commit b0cd64e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 41 deletions.
2 changes: 2 additions & 0 deletions seq/bugreports/bug-20101109-1426
@@ -1,3 +1,5 @@
This seems to be bug 909

#sets: 0
#Attempts: 20
results...
Expand Down
9 changes: 0 additions & 9 deletions seq/examples/dice.go
Expand Up @@ -39,15 +39,6 @@ func main() {
rank := map[Seq]int{d4:0, d6:1, d8:2, d10:3}
sets := map[string]int{}
//attempts is [[label, [score, ...]]...]
//println("dice...")
//CDo(Product(From(dice,dice)), func(el El){Prettyln(el, names)})
//println("done")
//Prettyln(FlatMap(From(1,2,3,4), func(el El)Seq{
// return From("a", el)
// }))
//Names = names
//Prettyln(From(dice,dice,dice), names)
//Prettyln(Product(From(dice,dice,dice)), names)
attempts := Map(Filter(Product(From(dice, dice, dice)), func(d El)bool{
oldRank := -1
result := true
Expand Down
66 changes: 34 additions & 32 deletions seq/seq.go
Expand Up @@ -12,7 +12,7 @@ import "container/vector"
type El interface{}
type Seq interface {
// core methods
While(f func(i El)bool)
Find(f func(i El)bool) El
Rest() Seq
Len() int
// wrappers that keep the current type
Expand All @@ -38,10 +38,10 @@ func Sequential(s Seq) *SequentialSeq {
func FirstN(s Seq, n int) []interface{} {
r := make([]interface{}, n)
x := 0
While(s, func(el El)bool{
Find(s, func(el El)bool{
r[x] = el
x++
return x < n
return x == n
})
return r
}
Expand Down Expand Up @@ -78,40 +78,43 @@ func IsSeq(s interface{}) bool {

func First(s Seq) interface{} {
var result interface{}
s.While(func(el El)bool{
s.Find(func(el El)bool{
result = el
return false
return true
})
return result
}

func IsEmpty(s Seq) bool {
empty := true
s.While(func(el El)bool{
s.Find(func(el El)bool{
empty = false
return false
return true
})
return empty
}

func While(s Seq, f func(el El) bool) {s.While(f)}
func Find(s Seq, f func(el El) bool) El {return s.Find(f)}

func While(s Seq, f func(el El) bool) {s.Find(func(el El)bool{return !f(el)})}

func Do(s Seq, f func(el El)) {
s.While(func(el El)bool{
s.Find(func(el El)bool{
f(el)
return true
return false
})
}

// CDo -- do f concurrently on each element of s, in any order
func CDo(s Seq, f func(el El)) {Do(CMap(s, func(el El)El{f(el); return nil}), func(el El){})}
func CDo(s Seq, f func(el El)) {
c := CMap(s, func(el El)El{f(el); return nil})()
for <- c; !closed(c); <- c {}
}

func Len(s Seq) int {return s.Len()}

func Output(s Seq, c SeqChan) {
Do(s, func(el El){
c <- el
})
Do(s, func(el El){c <- el})
}

func Rest(s Seq) Seq {return s.Rest()}
Expand All @@ -129,7 +132,6 @@ func SAppend(s Seq, s2 Seq) *SequentialSeq {
vec := make(vector.Vector, 0, quickLen(s, 8) + quickLen(s2, 8))
AppendToVector(s, &vec)
AppendToVector(s2, &vec)
//print("SAppend ");Prettyln(s);print(" + ");Prettyln(s2);println(" = ");Prettyln((*SequentialSeq)(&vec))
return (*SequentialSeq)(&vec)
}

Expand Down Expand Up @@ -226,21 +228,19 @@ func (r *SlidingWindow) Set(index int, value interface{}) bool {
// spawn a goroutine that does the following for each value, with up to size pending at a time:
// spawn a goroutine to apply f to the value and send the result back in a channel
// send the results in order to the ouput channel as they are completed
func CMap(s Seq, f func(el El) El, sizePowerOpt... uint) Seq {
func CMap(s Seq, f func(el El) El, sizePowerOpt... uint) ConcurrentSeq {
sizePower := uint(6)
if len(sizePowerOpt) > 0 {sizePower = sizePowerOpt[0]}
size := 1 << sizePower
return Gen(func(output SeqChan){
//println("spawn")
//punt and convert sequence to concurrent
//maybe someday we'll handle SequentialSequences separately
input := Concurrent(s)()
//println("spawn")
window := NewSlidingWindow(sizePower)
replyChannel := make(chan reply)
inputCount, pendingInput, pendingOutput := 0, 0, 0
inputClosed := false
//println("START")
defer close(replyChannel)
for !inputClosed || pendingInput > 0 || pendingOutput > 0 {
first, hasFirst := window.GetFirst()
ic, oc, rc := input, output, replyChannel
Expand All @@ -256,6 +256,7 @@ func CMap(s Seq, f func(el El) El, sizePowerOpt... uint) Seq {
inputClosed = true
} else {
go func(index int, value interface{}) {
SpawnCount++
replyChannel <- reply{index, f(value)}
}(inputCount, inputElement)
inputCount++
Expand All @@ -267,23 +268,21 @@ func CMap(s Seq, f func(el El) El, sizePowerOpt... uint) Seq {
pendingOutput++
}
}
//println("DONE AFTER", inputCount, "ITEMS")
close(replyChannel)
})
}

func FlatMap(s Seq, f func(el El) Seq) Seq {return s.FlatMap(f)}

func SFlatMap(s Seq, f func(i El) Seq) Seq {
func SFlatMap(s Seq, f func(i El) Seq) *SequentialSeq {
vec := make(vector.Vector, 0, quickLen(s, 8))
Do(s, func(e El){Do(f(e).(Seq), func(sub El){vec.Push(sub)})})
return (*SequentialSeq)(&vec)
}

func CFlatMap(s Seq, f func(i El) Seq, sizeOpt... uint) Seq {
func CFlatMap(s Seq, f func(i El) Seq, sizeOpt... uint) ConcurrentSeq {
return Gen(func(c SeqChan){
Do(CMap(s, func(e El)El{return f(e)}, sizeOpt...), func(sub El){
Do(sub.(Seq), func(el El){c <- el})
Output(sub.(Seq), c)
})
})
}
Expand All @@ -306,11 +305,8 @@ func Combinations(s Seq, number int) Seq {
//returns the product of the Seqs contained in sequences
func Product(sequences Seq) Seq {
return Fold(sequences, From(From()), func(result, each El)El{
//fmt.Print("folding: ");Pretty(each, Names);fmt.Print(" into ");Prettyln(result, Names)
return result.(Seq).FlatMap(func(seq El)Seq{
//fmt.Print("flat map with: ");Prettyln(seq, Names)
return each.(Seq).Map(func(i El) El {
//fmt.Print("map with: ");Prettyln(i, Names)
return seq.(Seq).Append(From(i))
})
})
Expand All @@ -335,7 +331,7 @@ func Pretty(s interface{}, args... interface{}) io.Writer {
return writer
}

//This is pretty ugly :)
//This pretty is ugly :)
func prettyLevel(s interface{}, level int, names map[interface{}]string, w io.Writer) {
name, hasName := names[s]
if hasName {
Expand Down Expand Up @@ -402,10 +398,13 @@ func CUpto(limit int) ConcurrentSeq {
})
}

func (s ConcurrentSeq) While(f func(el El)bool) {
func (s ConcurrentSeq) Find(f func(el El)bool) El {
c := s()
defer close(c)
for el := <- c; !closed(c) && f(el); el = <- c {}
for el := <- c; !closed(c) ; el = <- c {
if f(el) {return el}
}
return nil
}

func (s ConcurrentSeq) Rest() Seq {
Expand Down Expand Up @@ -458,8 +457,11 @@ func AUpto(limit int) *SequentialSeq {
return (*SequentialSeq)(&a)
}

func (s *SequentialSeq) While(f func(el El)bool) {
for i := 0; i < len(*s) && f((*s)[i]); i++ {}
func (s *SequentialSeq) Find(f func(el El)bool) El {
for i := 0; i < len(*s); i++ {
if f((*s)[i]) {return (*s)[i]}
}
return nil
}

func (s *SequentialSeq) Rest() Seq {
Expand Down

0 comments on commit b0cd64e

Please sign in to comment.