-
Notifications
You must be signed in to change notification settings - Fork 1
/
singleflight.go
109 lines (96 loc) · 2.22 KB
/
singleflight.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package helpers
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/singleflight"
)
var (
singleFlightGroup = new(singleflight.Group)
waitTime = time.Duration(100) * time.Millisecond
maxWaitTime = time.Duration(2000) * time.Millisecond
)
func SingleDo[T any](ctx context.Context, key string, call func() (T, error), retry uint, ttl ...time.Duration) (data T, err error) {
result, e, _ := singleFlightGroup.Do(key, func() (result interface{}, err error) {
if len(ttl) > 0 {
forgetTimer := time.AfterFunc(ttl[0], func() {
singleFlightGroup.Forget(key)
})
defer forgetTimer.Stop()
}
for i := 0; i <= int(retry); i++ {
result, err = call()
if err == nil {
return result, nil
}
if i == int(retry) {
return nil, err
}
waitTime := JitterBackoff(waitTime, maxWaitTime, i)
select {
case <-time.After(waitTime):
case <-ctx.Done():
return nil, err
}
}
return nil, err
})
if e != nil {
err = e
return
}
val, ok := result.(T)
if !ok {
err = errors.WithStack(fmt.Errorf("expected type %T but got type %T", data, result))
return
}
return val, nil
}
func SingleDoChan[T any](ctx context.Context, key string, call func() (T, error), retry uint, ttl ...time.Duration) (data T, err error) {
result := singleFlightGroup.DoChan(key, func() (result interface{}, err error) {
defer func() {
if e := recover(); e != nil {
err = errors.WithStack(fmt.Errorf("%v", e))
}
}()
if len(ttl) > 0 {
forgetTimer := time.AfterFunc(ttl[0], func() {
singleFlightGroup.Forget(key)
})
defer forgetTimer.Stop()
}
for i := 0; i <= int(retry); i++ {
result, err = call()
if err == nil {
return result, nil
}
if i == int(retry) {
return nil, err
}
waitTime := JitterBackoff(waitTime, maxWaitTime, i)
select {
case <-time.After(waitTime):
case <-ctx.Done():
return nil, err
}
}
return nil, err
})
select {
case r := <-result:
if r.Err != nil {
err = r.Err
return
}
val, ok := r.Val.(T)
if !ok {
err = errors.WithStack(fmt.Errorf("expected type %T but got type %T", data, r.Val))
return
}
return val, nil
case <-ctx.Done():
err = errors.WithStack(ctx.Err())
return
}
}