forked from gomods/athens
/
with_singleflight.go
67 lines (56 loc) · 1.4 KB
/
with_singleflight.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package stash
import (
"context"
"sync"
"github.com/gomods/athens/pkg/config"
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
)
// WithSingleflight returns a singleflight stasher.
// This two clients make two subsequent
// requests to stash a module, then
// it will only do it once and give the first
// response to both the first and the second client.
func WithSingleflight(s Stasher) Stasher {
sf := &withsf{}
sf.stasher = s
sf.subs = map[string][]chan *sfResp{}
return sf
}
type sfResp struct {
newVer string
err error
}
type withsf struct {
stasher Stasher
mu sync.Mutex
subs map[string][]chan *sfResp
}
func (s *withsf) process(ctx context.Context, mod, ver string) {
mv := config.FmtModVer(mod, ver)
newVer, err := s.stasher.Stash(ctx, mod, ver)
s.mu.Lock()
defer s.mu.Unlock()
for _, ch := range s.subs[mv] {
ch <- &sfResp{newVer, err}
}
delete(s.subs, mv)
}
func (s *withsf) Stash(ctx context.Context, mod, ver string) (string, error) {
const op errors.Op = "singleflight.Stash"
ctx, span := observ.StartSpan(ctx, op.String())
defer span.End()
mv := config.FmtModVer(mod, ver)
s.mu.Lock()
subCh := make(chan *sfResp, 1)
_, inFlight := s.subs[mv]
if !inFlight {
s.subs[mv] = []chan *sfResp{subCh}
go s.process(ctx, mod, ver)
} else {
s.subs[mv] = append(s.subs[mv], subCh)
}
s.mu.Unlock()
resp := <-subCh
return resp.newVer, resp.err
}