-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblockingbatcher.go
More file actions
181 lines (147 loc) · 5.68 KB
/
blockingbatcher.go
File metadata and controls
181 lines (147 loc) · 5.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package sebbroker
import (
"context"
"fmt"
"time"
"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
"github.com/micvbang/simple-event-broker/internal/sebrecords"
"github.com/micvbang/simple-event-broker/seberr"
)
type Persist func(sebrecords.Batch) ([]uint64, error)
type blockedAdd struct {
batch sebrecords.Batch
response chan<- addResponse
}
type addResponse struct {
offsets []uint64
err error
}
// BlockingBatcher is responsible for batching records before persisting them
// into topic storage. Batching is done to amortize the cost of persisting data
// to topic storage. This is helpful when the topic storage is an object store
// that is expected to have large latencies and per-call $ costs.
//
// BlockingBatcher collects records for a batch until either
// 1) the block time has elapsed
// 2) the soft maximum number of bytes has been reached
//
// persistRecordBatch() will be called once the most recent context returned by
// contextFactory() has expired, or bytesSoftMax has been reached. Beware of
// long-lived contexts returned by contextFactory() as this could block all
// adders until the context expires!
type BlockingBatcher struct {
log logger.Logger
bytesSoftMax int
contextFactory func() context.Context
callers chan blockedAdd
persist Persist
}
func NewBlockingBatcher(log logger.Logger, blockTime time.Duration, bytesSoftMax int, persistRecordBatch Persist) *BlockingBatcher {
return NewBlockingBatcherWithConfig(log, bytesSoftMax, persistRecordBatch, NewContextFactory(blockTime))
}
func NewBlockingBatcherWithConfig(log logger.Logger, bytesSoftMax int, persist Persist, contextFactory func() context.Context) *BlockingBatcher {
b := &BlockingBatcher{
log: log,
callers: make(chan blockedAdd, 32),
contextFactory: contextFactory,
persist: persist,
bytesSoftMax: bytesSoftMax,
}
// NOTE: this goroutine is never stopped
go b.collectBatches()
return b
}
// AddRecords adds records to the batch that is currently being built and blocks
// until persistRecordBatch() has been called and completed; when AddRecords returns,
// the given record has either been persisted to topic storage or failed.
func (b *BlockingBatcher) AddRecords(batch sebrecords.Batch) ([]uint64, error) {
// NOTE: allows single records larger than bytesSoftMax; this is done to
// avoid making it impossible to add records of unexpectedly large size.
if len(batch.Data) > b.bytesSoftMax && batch.Len() > 1 {
return nil, fmt.Errorf("%w (%d bytes), bytes max is %d", seberr.ErrPayloadTooLarge, len(batch.Data), b.bytesSoftMax)
}
responses := make(chan addResponse)
b.callers <- blockedAdd{
response: responses,
batch: batch,
}
// block caller until record has been peristed (or persisting failed)
response := <-responses
if len(response.offsets) != batch.Len() {
// This is not supposed to happen; if it does, we can't trust b.persist().
panic(fmt.Sprintf("unexpected number of offsets returned %d, expected %d", len(response.offsets), batch.Len()))
}
return response.offsets, response.err
}
func (b *BlockingBatcher) collectBatches() {
for {
blockedCallers := make([]blockedAdd, 0, 64)
// block until there are records coming in, starting a new batch collection
blockedCaller := <-b.callers
blockedCallers = append(blockedCallers, blockedCaller)
batchBytes := len(blockedCaller.batch.Data)
batchRecords := blockedCaller.batch.Len()
ctx, cancel := context.WithCancel(b.contextFactory())
defer cancel()
t0 := time.Now()
innerLoop:
for {
select {
case blockedCaller := <-b.callers:
blockedCallers = append(blockedCallers, blockedCaller)
batchBytes += len(blockedCaller.batch.Data)
batchRecords += blockedCaller.batch.Len()
b.log.Debugf("added record to batch (%d)", len(blockedCallers))
if batchBytes >= b.bytesSoftMax {
b.log.Debugf("batch size exceeded soft max (%d/%d), collecting", batchBytes, b.bytesSoftMax)
// NOTE: this will not necessarily cause the batch collection
// branch of this select to be invoked; if there's more adds on
// handledAdds, it's likely that this branch will continue to
// process one or more of those.
cancel()
}
case <-ctx.Done():
b.log.Debugf("batch collection time: %v", time.Since(t0))
recordData := make([]byte, 0, batchBytes)
recordSizes := make([]uint32, 0, batchRecords)
for _, add := range blockedCallers {
recordData = append(recordData, add.batch.Data...)
recordSizes = append(recordSizes, add.batch.Sizes...)
}
// block until records are persisted or persisting failed
offsets, err := b.persist(sebrecords.NewBatch(recordSizes, recordData))
b.log.Debugf("%d records persisted (err: %v)", len(recordSizes), err)
if err != nil {
b.log.Debugf("reporting error to %d waiting callers", len(recordSizes))
// offsets should be 0 in all error responses
offsets = make([]uint64, len(recordSizes))
}
// unblock callers
offsetIndex := 0
for _, blockedCaller := range blockedCallers {
offsetMax := offsetIndex + blockedCaller.batch.Len()
blockedCaller.response <- addResponse{
offsets: offsets[offsetIndex:offsetMax],
err: err,
}
offsetIndex = offsetMax
close(blockedCaller.response)
}
b.log.Debugf("done reporting results")
break innerLoop
}
}
}
}
func NewContextFactory(blockTime time.Duration) func() context.Context {
return func() context.Context {
ctx, cancel := context.WithTimeout(context.Background(), blockTime)
go func() {
// We have to cancel the context. Just ensure that it's cancelled at
// some point in the future.
time.Sleep(blockTime * 2)
cancel()
}()
return ctx
}
}