-
Notifications
You must be signed in to change notification settings - Fork 0
/
batcher.go
76 lines (64 loc) · 1.24 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package batcher
import (
"context"
"sync"
"time"
)
type batcher struct {
batchSize int
timeout time.Duration
ch chan struct{}
storage storage
mu sync.Mutex
batch *batch
}
//go:generate mockery --name=storage --structname=Storage
type storage interface {
Get(ctx context.Context, keys []string) ([]any, error)
KeyByValue(v any) string
}
func New(
batchSize int,
timeout time.Duration,
s storage,
) *batcher {
b := &batcher{
batchSize: batchSize,
timeout: timeout,
storage: s,
ch: make(chan struct{}),
}
b.batch = b.newBatch()
go b.waitBatchClose(b.batch)
return b
}
func (s *batcher) waitBatchClose(b *batch) {
select {
case <-b.ch:
s.batch = s.newBatch()
go s.waitBatchClose(s.batch)
case <-s.ch:
}
}
func (s *batcher) Close(ctx context.Context) {
s.batch.process()
close(s.ch)
}
// AddKey to batch and return result or error
func (s *batcher) AddKey(ctx context.Context, key string) (any, error) {
resCh := make(chan any)
errCh := make(chan error)
s.mu.Lock()
if s.batch.addKeyToBatch(key, resCh, errCh) {
go s.batch.process()
}
s.mu.Unlock()
select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-resCh:
return res, nil
case err := <-errCh:
return nil, err
}
}