/
executors.go
93 lines (78 loc) · 1.59 KB
/
executors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package util
import (
"fmt"
"sync"
)
//
// Executor
//
type Executor[T any] interface {
Queue(task T)
Wait() []error
Close()
}
//
// ParallelExecutor
//
type ParallelExecutor[T any] struct {
PanicAsError string // when non-empty, wil capture panics as errors under this name
processor func(task T) error
tasks chan T
wg sync.WaitGroup
errs []error
errsLock sync.Mutex
}
func NewParallelExecutor[T any](bufferSize int, processor func(task T) error) *ParallelExecutor[T] {
self := ParallelExecutor[T]{
processor: processor,
tasks: make(chan T, bufferSize),
}
return &self
}
func (self *ParallelExecutor[T]) Start(workers int) {
for range workers {
go self.worker()
}
}
// ([Executor] interface)
func (self *ParallelExecutor[T]) Queue(task T) {
self.wg.Add(1)
self.tasks <- task
}
// ([Executor] interface)
func (self *ParallelExecutor[T]) Wait() []error {
self.wg.Wait()
close(self.tasks)
return self.errs
}
// ([Executor] interface)
func (self *ParallelExecutor[T]) Close() {
close(self.tasks)
}
func (self *ParallelExecutor[T]) worker() {
for {
select {
case task, ok := <-self.tasks:
if ok {
if err := self.process(task); err != nil {
self.errsLock.Lock()
self.errs = append(self.errs, err)
self.errsLock.Unlock()
}
self.wg.Done()
} else {
break
}
}
}
}
func (self *ParallelExecutor[T]) process(task T) (rerr error) {
if self.PanicAsError != "" {
defer func() {
if err := recover(); err != nil {
rerr = fmt.Errorf("panic during %s: %v", self.PanicAsError, err)
}
}()
}
return self.processor(task)
}