-
Notifications
You must be signed in to change notification settings - Fork 3
/
tasks.go
105 lines (92 loc) · 2.19 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package statusemail
import (
"context"
"errors"
"sync"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/vanti-dev/sc-bos/pkg/auto/statusemail/config"
"github.com/vanti-dev/sc-bos/pkg/gen"
"github.com/vanti-dev/sc-bos/pkg/task"
"github.com/vanti-dev/sc-bos/pkg/util/chans"
"github.com/vanti-dev/sc-bos/pkg/util/pull"
)
func tasksForSource(source config.Source, statusClient gen.StatusApiClient, c chan<- change, logger *zap.Logger) []task.Task {
return []task.Task{
func(ctx context.Context) (task.Next, error) {
messages := make(chan *gen.StatusLog)
group, ctx := errgroup.WithContext(ctx)
// fetch
group.Go(func() error {
defer close(messages)
return pullFrom(ctx, source.Name, statusClient, messages, pull.WithLogger(logger))
})
// forward messages
group.Go(func() error {
for msg := range messages {
if err := chans.SendContext(ctx, c, change{log: msg, source: source}); err != nil {
return err
}
}
return nil
})
return task.Normal, group.Wait()
},
}
}
type namedTasks struct {
mu sync.Mutex
stopByName map[string]taskRuntime
}
var (
ErrAlreadyRunning = errors.New("already running")
ErrNotRunning = errors.New("not running")
)
func (s *namedTasks) Run(ctx context.Context, name string, tasks []task.Task, opts ...task.Option) error {
ctx, stop := context.WithCancel(ctx)
defer stop()
id := &ctx
s.mu.Lock()
if s.stopByName == nil {
s.stopByName = make(map[string]taskRuntime)
}
_, ok := s.stopByName[name]
if ok {
s.mu.Unlock()
return ErrAlreadyRunning
}
s.stopByName[name] = taskRuntime{stop, id}
s.mu.Unlock()
defer func() {
// cleanup
s.mu.Lock()
defer s.mu.Unlock()
rt, ok := s.stopByName[name]
if ok && rt.id == id {
delete(s.stopByName, name)
}
}()
group, ctx := errgroup.WithContext(ctx)
for _, t := range tasks {
t := t
group.Go(func() error {
return task.Run(ctx, t, opts...)
})
}
return group.Wait()
}
func (s *namedTasks) Stop(name string) error {
s.mu.Lock()
defer s.mu.Unlock()
rt, ok := s.stopByName[name]
if !ok {
return ErrNotRunning
}
rt.stop()
delete(s.stopByName, name)
return nil
}
type taskRuntime struct {
stop func()
id *context.Context
}