/
worker.go
159 lines (137 loc) · 4.16 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package modules
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/safing/portbase/log"
)
// Worker Default Configuration
const (
DefaultBackoffDuration = 2 * time.Second
)
var (
// ErrRestartNow may be returned (wrapped) by service workers to request an immediate restart.
ErrRestartNow = errors.New("requested restart")
errNoModule = errors.New("missing module (is nil!)")
)
// StartWorker directly starts a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to StartWorker starts a new goroutine and returns immediately.
func (m *Module) StartWorker(name string, fn func(context.Context) error) {
go func() {
err := m.RunWorker(name, fn)
if err != nil {
log.Warningf("%s: worker %s failed: %s", m.Name, name, err)
}
}()
}
// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start worker "%s" with nil module`, name)
return errNoModule
}
atomic.AddInt32(m.workerCnt, 1)
m.waitGroup.Add(1)
defer func() {
atomic.AddInt32(m.workerCnt, -1)
m.waitGroup.Done()
}()
return m.runWorker(name, fn)
}
// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker.
func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
if m == nil {
log.Errorf(`modules: cannot start service worker "%s" with nil module`, name)
return
}
go m.runServiceWorker(name, backoffDuration, fn)
}
func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
atomic.AddInt32(m.workerCnt, 1)
m.waitGroup.Add(1)
defer func() {
atomic.AddInt32(m.workerCnt, -1)
m.waitGroup.Done()
}()
if backoffDuration == 0 {
backoffDuration = DefaultBackoffDuration
}
failCnt := 0
lastFail := time.Now()
for {
if m.IsStopping() {
return
}
err := m.runWorker(name, fn)
if err != nil {
if !errors.Is(err, ErrRestartNow) {
// reset fail counter if running without error for some time
if time.Now().Add(-5 * time.Minute).After(lastFail) {
failCnt = 0
}
// increase fail counter and set last failed time
failCnt++
lastFail = time.Now()
// log error
sleepFor := time.Duration(failCnt) * backoffDuration
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
select {
case <-time.After(sleepFor):
case <-m.Ctx.Done():
return
}
// loop to restart
} else {
log.Infof("%s: service-worker %s %s - restarting now", m.Name, name, err)
}
} else {
// finish
return
}
}
}
func (m *Module) runWorker(name string, fn func(context.Context) error) (err error) {
defer func() {
// recover from panic
panicVal := recover()
if panicVal != nil {
me := m.NewPanicError(name, "worker", panicVal)
me.Report()
err = me
}
}()
// run
err = fn(m.Ctx)
return
}
func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error {
stopFnError := make(chan error)
go func() {
stopFnError <- m.runCtrlFn(name, fn)
}()
// wait for results
select {
case err := <-stopFnError:
return err
case <-time.After(timeout):
return fmt.Errorf("timed out (%s)", timeout)
}
}
func (m *Module) runCtrlFn(name string, fn func() error) (err error) {
if fn == nil {
return
}
defer func() {
// recover from panic
panicVal := recover()
if panicVal != nil {
me := m.NewPanicError(name, "module-control", panicVal)
me.Report()
err = me
}
}()
// run
err = fn()
return
}