-
Notifications
You must be signed in to change notification settings - Fork 17
/
callretry.go
119 lines (103 loc) · 2.03 KB
/
callretry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package ctxutil
import (
"context"
"fmt"
"strings"
"sync"
"time"
)
// Allows fn to return early on ctx cancel. If fn does not return in time, lateFn will run at the end of fn (async).
func Call(ctx context.Context, prefix string, fn func() error, lateFn func(error)) error {
buildErr := func(e error) error {
return fmt.Errorf("%v: %v", prefix, e)
}
var d = struct {
sync.Mutex
fn struct {
done bool
err error
}
ctxDone bool
}{}
// run fn in go routine
ctx2, cancel := context.WithCancel(ctx)
id := addCall(prefix) // keep track of fn()
go func() {
defer doneCall(id) // keep track of fn()
defer cancel()
err := fn()
d.Lock()
defer d.Unlock()
d.fn.done = true
if err != nil {
err = buildErr(err)
}
d.fn.err = err
if d.ctxDone {
if lateFn != nil {
lateFn(err)
} else {
// err is lost
}
}
}()
<-ctx2.Done()
d.Lock()
defer d.Unlock()
d.ctxDone = true
if d.fn.done {
return d.fn.err
} else {
// context was canceled and fn has not returned yet
return buildErr(ctx2.Err())
}
}
//----------
func Retry(ctx context.Context, retryPause time.Duration, prefix string, fn func() error, lateFn func(error)) error {
var err error
for {
err = Call(ctx, prefix, fn, lateFn)
if err != nil {
// keep retrying
} else {
return nil // done
}
select {
case <-ctx.Done():
return err // err is non-nil
default: // non-blocking select
time.Sleep(retryPause) // sleep before next retry
}
}
}
//----------
type cdata struct {
t time.Time
s string
}
var cmu sync.Mutex
var callm = map[int]*cdata{}
var ci = 0
func addCall(s string) int {
cmu.Lock()
defer cmu.Unlock()
ci++
callm[ci] = &cdata{s: s, t: time.Now()}
return ci
}
func doneCall(v int) {
cmu.Lock()
defer cmu.Unlock()
delete(callm, v)
}
func CallsState() string {
cmu.Lock()
defer cmu.Unlock()
u := []string{}
now := time.Now()
for _, d := range callm {
s := fmt.Sprintf("%v: %v ago", d.s, now.Sub(d.t))
u = append(u, s)
}
return fmt.Sprintf("%v entries\n%v\n", len(u), strings.Join(u, "\n"))
}