forked from biogo/biogo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
map.go
75 lines (62 loc) · 1.91 KB
/
map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Copyright ©2011-2013 The bíogo Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package concurrent
import (
"github.com/biogo/biogo/util"
"fmt"
"math"
)
// A Mapper is an Operator that can subdivide itself.
type Mapper interface {
Operator
Slice(i, j int) Mapper
Len() int
}
// Map routines to iterate a function over an array, potentially splitting the array slice into
// chunks so that each chunk is processed concurrently. When using concurrent processing the
// Chunk size is either the nearest even division of the total array over the chosen concurrent
// processing goroutines or a specified maximum chunk size, whichever is smaller. Reducing
// chunk size can reduce the impact of divergence in time for processing chunks, but may add
// to overhead.
func Map(set Mapper, threads, maxChunkSize int) (results []interface{}, err error) {
queue := make(chan Operator, 1)
p := NewProcessor(queue, 0, threads)
defer p.Stop()
chunkSize := util.Min(int(math.Ceil(float64(set.Len())/float64(threads))), maxChunkSize)
quit := make(chan struct{})
go func() {
for s := 0; s*chunkSize < set.Len(); s++ {
select {
case <-quit:
break
default:
endChunk := util.Min(chunkSize*(s+1), set.Len())
queue <- set.Slice(chunkSize*s, endChunk)
}
}
}()
for r := 0; r*chunkSize < set.Len(); r++ {
result := <-p.out
if result.Err != nil {
err = fmt.Errorf("concurrent: map failed: %v", err)
close(quit)
break
}
results = append(results, result.Value)
}
return
}
// A future Map function - synchronisation is via a Promise.
func PromiseMap(set Mapper, threads, maxChunkSize int) *Promise {
promise := NewPromise(false, false, false)
go func() {
result, err := Map(set, threads, maxChunkSize)
if err == nil {
promise.Fulfill(result)
} else {
promise.Fail(result, err)
}
}()
return promise
}