-
Notifications
You must be signed in to change notification settings - Fork 269
/
redo_log_advancer.go
259 lines (239 loc) · 9.15 KB
/
redo_log_advancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package sinkmanager
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/memquota"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/sorter"
"github.com/pingcap/tiflow/cdc/redo"
"go.uber.org/zap"
)
type redoLogAdvancer struct {
// NOTICE: This task is immutable, so please never modify it.
task *redoTask
// redoDMLManager is used to write the redo log.
redoDMLManager redo.DMLManager
// memQuota is used to acquire memory quota for the redo log writer.
memQuota *memquota.MemQuota
// NOTICE: First time to run the task, we have initialized memory quota for the table.
// It is defaultRequestMemSize.
availableMem uint64
// How much memory we have used.
// This is used to calculate how much memory we need to acquire.
// Only when usedMem > availableMem we need to acquire memory.
usedMem uint64
// Used to record the last written position.
// We need to use it to update the lower bound of the table sink.
lastPos sorter.Position
// Buffer the events to be written to the redo log.
events []*model.RowChangedEvent
// emittedCommitTs is used to record the last emitted transaction commit ts.
emittedCommitTs uint64
// Used to record the latest written transaction commit ts.
lastTxnCommitTs uint64
// pendingTxnSize used to record the size of the uncommitted events.
pendingTxnSize uint64
// Used to record the current transaction commit ts.
currTxnCommitTs uint64
}
func newRedoLogAdvancer(
task *redoTask,
memQuota *memquota.MemQuota,
availableMem uint64,
redoDMLManager redo.DMLManager,
) *redoLogAdvancer {
return &redoLogAdvancer{
task: task,
memQuota: memQuota,
availableMem: availableMem,
events: make([]*model.RowChangedEvent, 0, bufferSize),
redoDMLManager: redoDMLManager,
}
}
// advance tries to emit the events to the redo log manager and
// advance the resolved ts of the redo log manager.
func (a *redoLogAdvancer) advance(ctx context.Context) error {
if len(a.events) > 0 {
// releaseMem is used to release the memory quota
// after the events are written to redo log.
// It more like a callback function.
var releaseMem func()
refundMem := a.pendingTxnSize
if refundMem > 0 {
releaseMem = func() {
a.memQuota.Refund(refundMem)
log.Debug("MemoryQuotaTracing: refund memory for redo log task",
zap.String("namespace", a.task.tableSink.changefeed.Namespace),
zap.String("changefeed", a.task.tableSink.changefeed.ID),
zap.Stringer("span", &a.task.span),
zap.Uint64("memory", refundMem))
}
}
if err := a.redoDMLManager.EmitRowChangedEvents(ctx, a.task.span, releaseMem,
a.events...); err != nil {
return errors.Trace(err)
}
a.events = a.events[:0]
if cap(a.events) > bufferSize {
a.events = make([]*model.RowChangedEvent, 0, bufferSize)
}
a.pendingTxnSize = 0
}
if a.lastTxnCommitTs > a.emittedCommitTs {
if err := a.redoDMLManager.UpdateResolvedTs(ctx, a.task.span,
a.lastTxnCommitTs); err != nil {
return errors.Trace(err)
}
log.Debug("update resolved ts to redo",
zap.String("namespace", a.task.tableSink.changefeed.Namespace),
zap.String("changefeed", a.task.tableSink.changefeed.ID),
zap.Stringer("span", &a.task.span),
zap.Uint64("resolvedTs", a.lastTxnCommitTs))
a.emittedCommitTs = a.lastTxnCommitTs
}
return nil
}
// tryAdvanceAndAcquireMem tries to acquire the memory quota and advance the redo log manager.
// allFetched indicates whether all the events have been fetched. Then we
// do not need to acquire the memory quota anymore.
// txnFinished indicates whether the current transaction has been finished.
// If it is finished, it is OK to wait next round task to advance the table sink.
// Otherwise, we need to advance the redo log at least to the current transaction.
func (a *redoLogAdvancer) tryAdvanceAndAcquireMem(
ctx context.Context,
allFetched bool,
txnFinished bool,
) error {
// If used memory size exceeds the required limit, do a force acquire to
// make sure the memory quota is not exceeded or leak.
// For example, if the memory quota is 100MB, and current usedMem is 90MB,
// and availableMem is 100MB, then we can get event from the source manager
// but if the event size is 20MB, we just exceed the available memory quota temporarily.
// So we need to force acquire the memory quota to make up the difference.
exceedAvailableMem := a.availableMem < a.usedMem
if exceedAvailableMem {
a.memQuota.ForceAcquire(a.usedMem - a.availableMem)
log.Debug("MemoryQuotaTracing: force acquire memory for redo log task",
zap.String("namespace", a.task.tableSink.changefeed.Namespace),
zap.String("changefeed", a.task.tableSink.changefeed.ID),
zap.Stringer("span", &a.task.span),
zap.Uint64("memory", a.usedMem-a.availableMem))
a.availableMem = a.usedMem
}
// Do emit in such situations:
// 1. we use more memory than we required;
// 2. the pending batch size exceeds maxUpdateIntervalSize;
// 3. all events are received.
if exceedAvailableMem || a.pendingTxnSize >= maxUpdateIntervalSize || allFetched {
if err := a.advance(
ctx,
); err != nil {
return errors.Trace(err)
}
}
if allFetched {
return nil
}
if a.usedMem >= a.availableMem {
if txnFinished {
if a.memQuota.TryAcquire(requestMemSize) {
a.availableMem += requestMemSize
log.Debug("MemoryQuotaTracing: try acquire memory for redo log task",
zap.String("namespace", a.task.tableSink.changefeed.Namespace),
zap.String("changefeed", a.task.tableSink.changefeed.ID),
zap.Stringer("span", &a.task.span),
zap.Uint64("memory", requestMemSize))
}
} else {
// NOTE: it's not required to use `forceAcquire` even if splitTxn is false.
// It's because memory will finally be `refund` after redo-logs are written.
if err := a.memQuota.BlockAcquire(requestMemSize); err != nil {
return errors.Trace(err)
}
a.availableMem += requestMemSize
log.Debug("MemoryQuotaTracing: block acquire memory for redo log task",
zap.String("namespace", a.task.tableSink.changefeed.Namespace),
zap.String("changefeed", a.task.tableSink.changefeed.ID),
zap.Stringer("span", &a.task.span),
zap.Uint64("memory", requestMemSize))
}
}
return nil
}
func (a *redoLogAdvancer) finish(
ctx context.Context,
upperBound sorter.Position,
) error {
a.lastPos = upperBound
a.lastTxnCommitTs = upperBound.CommitTs
err := a.tryAdvanceAndAcquireMem(
ctx,
true,
true,
)
return err
}
// tryMoveToNextTxn tries to move to the next transaction.
// 1. If the commitTs is different from the current transaction, it means
// the current transaction is finished. We need to move to the next transaction.
// 2. If current position is a commit fence, it means the current transaction
// is finished. We can safely move to the next transaction early. It would be
// helpful to advance the redo log manager.
func (a *redoLogAdvancer) tryMoveToNextTxn(commitTs model.Ts, pos sorter.Position) {
if a.currTxnCommitTs != commitTs {
a.lastTxnCommitTs = a.currTxnCommitTs
a.currTxnCommitTs = commitTs
}
// If the current position is a commit fence, it means the current transaction
// is finished. We can safely move to the next transaction early.
// NOTICE: Please do not combine this condition with the previous one.
// There is a case that the current position is a commit fence, also
// the commitTs is different from the current transaction.
// For example:
// 1. current commitTs is 10
// 2. commitTs is 11
// 3. pos is a commit fence (10,11)
// In this case, we should move to the next transaction.
// The lastTxnCommitTs should be 11, not 10.
if pos.IsCommitFence() {
a.lastTxnCommitTs = a.currTxnCommitTs
}
}
// appendEvents appends events to the buffer and record the memory usage.
func (a *redoLogAdvancer) appendEvents(events []*model.RowChangedEvent, size uint64) {
a.events = append(a.events, events...)
// Record the memory usage.
a.usedMem += size
// Record the pending transaction size. It means how many events we do
// not flush to the redo log manager.
a.pendingTxnSize += size
}
// hasEnoughMem returns whether the redo log task has enough memory to continue.
func (a *redoLogAdvancer) hasEnoughMem() bool {
return a.availableMem > a.usedMem
}
// cleanup cleans up the memory usage.
// Refund the memory usage if we do not use it.
func (a *redoLogAdvancer) cleanup() {
if a.availableMem > a.usedMem {
a.memQuota.Refund(a.availableMem - a.usedMem)
log.Debug("MemoryQuotaTracing: refund memory for redo log task",
zap.String("namespace", a.task.tableSink.changefeed.Namespace),
zap.String("changefeed", a.task.tableSink.changefeed.ID),
zap.Stringer("span", &a.task.span),
zap.Uint64("memory", a.availableMem-a.usedMem))
}
}