Skip to content

Commit 865730d

Browse files
authored
dxf: retry failed write with the same TS instead of accumulating meter data to avoid duplicate data (#65115)
ref #61702
1 parent 6540abd commit 865730d

File tree

3 files changed

+326
-31
lines changed

3 files changed

+326
-31
lines changed

pkg/disttask/framework/metering/BUILD.bazel

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"//br/pkg/storage/recording",
1414
"//pkg/config/kerneltype",
1515
"//pkg/disttask/framework/proto",
16+
"//pkg/util",
1617
"//pkg/util/logutil",
1718
"@com_github_docker_go_units//:go-units",
1819
"@com_github_google_uuid//:uuid",
@@ -37,10 +38,11 @@ go_test(
3738
],
3839
embed = [":metering"],
3940
flaky = True,
40-
shard_count = 10,
41+
shard_count = 12,
4142
deps = [
4243
"//pkg/config/kerneltype",
4344
"//pkg/disttask/framework/proto",
45+
"//pkg/util",
4446
"@com_github_pingcap_metering_sdk//common",
4547
"@com_github_pingcap_metering_sdk//config",
4648
"@com_github_pingcap_metering_sdk//reader/metering",

pkg/disttask/framework/metering/metering.go

Lines changed: 133 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package metering
1616

1717
import (
1818
"context"
19+
"maps"
1920
"strings"
2021
"sync"
2122
"sync/atomic"
@@ -31,6 +32,7 @@ import (
3132
meteringwriter "github.com/pingcap/metering_sdk/writer/metering"
3233
"github.com/pingcap/tidb/pkg/config/kerneltype"
3334
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
35+
"github.com/pingcap/tidb/pkg/util"
3436
"github.com/pingcap/tidb/pkg/util/logutil"
3537
"go.uber.org/zap"
3638
)
@@ -39,6 +41,10 @@ const (
3941
// The timeout can not be too long because the pod grace termination period is fixed.
4042
writeTimeout = 10 * time.Second
4143
category = "dxf"
44+
// maxRetryCount defines the maximum retry count for writing metering data.
45+
// if the write still fails after maxRetryCount, those data will be dropped.
46+
maxRetryCount = 10
47+
retryInterval = 5 * time.Second
4248
)
4349

4450
var (
@@ -93,17 +99,60 @@ type wrappedRecorder struct {
9399
unregistered bool
94100
}
95101

102+
type writeFailData struct {
103+
ts int64
104+
retryCnt int
105+
items []map[string]any
106+
}
107+
108+
type retryData struct {
109+
mu sync.Mutex
110+
// TS -> write failed data
111+
data map[int64]*writeFailData
112+
}
113+
114+
func (d *retryData) addFailedData(ts int64, items []map[string]any) {
115+
d.mu.Lock()
116+
defer d.mu.Unlock()
117+
if d.data == nil {
118+
d.data = make(map[int64]*writeFailData)
119+
}
120+
d.data[ts] = &writeFailData{ts: ts, items: items}
121+
}
122+
123+
func (d *retryData) getDataClone() map[int64]*writeFailData {
124+
d.mu.Lock()
125+
defer d.mu.Unlock()
126+
return maps.Clone(d.data)
127+
}
128+
129+
func (d *retryData) remove(needRemove []*writeFailData) {
130+
if len(needRemove) == 0 {
131+
return
132+
}
133+
d.mu.Lock()
134+
defer d.mu.Unlock()
135+
for _, wd := range needRemove {
136+
delete(d.data, wd.ts)
137+
}
138+
}
139+
96140
// Meter is responsible for recording and reporting metering data.
97141
type Meter struct {
98-
sync.Mutex
142+
mu sync.Mutex
99143
recorders map[int64]*wrappedRecorder
100144
// taskID -> last flushed data
101145
// when flushing, we scrape the latest data from recorders and calculate the
102146
// delta and write to the metering storage.
147+
// we will store the latest data here regardless of whether the flush is
148+
// successful or not,
103149
lastFlushedData map[int64]*Data
104-
uuid string
105-
writer meteringwriterapi.MeteringWriter
106-
logger *zap.Logger
150+
// pendingRetryData is the data that failed to write and need to retry.
151+
pendingRetryData retryData
152+
uuid string
153+
writer meteringwriterapi.MeteringWriter
154+
logger *zap.Logger
155+
wg util.WaitGroupWrapper
107156
}
108157

109158
// NewMeter creates a new Meter instance.
@@ -139,8 +188,8 @@ func newMeterWithWriter(logger *zap.Logger, writer meteringwriterapi.MeteringWri
139188
}
140189

141190
func (m *Meter) getOrRegisterRecorder(r *Recorder) *Recorder {
142-
m.Lock()
143-
defer m.Unlock()
191+
m.mu.Lock()
192+
defer m.mu.Unlock()
144193
if old, ok := m.recorders[r.taskID]; ok {
145194
// each task might have different steps, it's possible for below sequence
146195
// - step 1 get recorder
@@ -157,8 +206,8 @@ func (m *Meter) getOrRegisterRecorder(r *Recorder) *Recorder {
157206

158207
// UnregisterRecorder unregisters a recorder.
159208
func (m *Meter) unregisterRecorder(taskID int64) {
160-
m.Lock()
161-
defer m.Unlock()
209+
m.mu.Lock()
210+
defer m.mu.Unlock()
162211
// we still need to flush for the unregistered recorder once more, so we only
163212
// mark it here, and delete when it's flushed.
164213
if r, ok := m.recorders[taskID]; ok {
@@ -168,8 +217,8 @@ func (m *Meter) unregisterRecorder(taskID int64) {
168217

169218
func (m *Meter) cleanupUnregisteredRecorders() []*Recorder {
170219
removed := make([]*Recorder, 0, 1)
171-
m.Lock()
172-
defer m.Unlock()
220+
m.mu.Lock()
221+
defer m.mu.Unlock()
173222
for taskID, r := range m.recorders {
174223
if !r.unregistered {
175224
continue
@@ -194,7 +243,7 @@ func (m *Meter) cleanupUnregisteredRecorders() []*Recorder {
194243
return removed
195244
}
196245

197-
func (m *Meter) onSuccessFlush(flushedData map[int64]*Data) {
246+
func (m *Meter) afterFlush(flushedData map[int64]*Data) {
198247
m.lastFlushedData = flushedData
199248
removedRecorders := m.cleanupUnregisteredRecorders()
200249
for _, r := range removedRecorders {
@@ -206,8 +255,8 @@ func (m *Meter) onSuccessFlush(flushedData map[int64]*Data) {
206255
}
207256

208257
func (m *Meter) scrapeCurrData() map[int64]*Data {
209-
m.Lock()
210-
defer m.Unlock()
258+
m.mu.Lock()
259+
defer m.mu.Unlock()
211260
data := make(map[int64]*Data, len(m.recorders))
212261
for taskID, r := range m.recorders {
213262
data[taskID] = r.currData()
@@ -231,6 +280,71 @@ func (m *Meter) calculateDataItems(currData map[int64]*Data) []map[string]any {
231280

232281
// StartFlushLoop creates a flush loop.
233282
func (m *Meter) StartFlushLoop(ctx context.Context) {
283+
m.wg.RunWithLog(func() {
284+
m.flushLoop(ctx)
285+
})
286+
m.wg.RunWithLog(func() {
287+
m.retryLoop(ctx)
288+
})
289+
m.wg.Wait()
290+
}
291+
292+
func (m *Meter) retryLoop(ctx context.Context) {
293+
for {
294+
select {
295+
case <-ctx.Done():
296+
return
297+
case <-time.After(retryInterval):
298+
}
299+
300+
m.retryWrite(ctx)
301+
}
302+
}
303+
304+
func (m *Meter) retryWrite(ctx context.Context) {
305+
data := m.pendingRetryData.getDataClone()
306+
if len(data) == 0 {
307+
return
308+
}
309+
310+
var (
311+
firstErr error
312+
needRemove = make([]*writeFailData, 0, len(data))
313+
)
314+
for ts, wd := range data {
315+
err := m.WriteMeterData(ctx, ts, m.uuid, wd.items)
316+
if err == nil {
317+
m.logger.Info("succeed to write metering data after retry",
318+
zap.Int64("timestamp", ts), zap.Int("retry-count", wd.retryCnt),
319+
zap.Any("data", wd.items))
320+
needRemove = append(needRemove, wd)
321+
continue
322+
}
323+
324+
if ctx.Err() != nil {
325+
break
326+
}
327+
if firstErr == nil {
328+
firstErr = err
329+
}
330+
331+
wd.retryCnt++
332+
if wd.retryCnt >= maxRetryCount {
333+
m.logger.Warn("dropping metering data after max retry count reached",
334+
zap.Int64("timestamp", ts), zap.Int("retry-count", wd.retryCnt),
335+
zap.Any("data", wd.items), zap.Error(err))
336+
needRemove = append(needRemove, wd)
337+
}
338+
}
339+
340+
if firstErr != nil {
341+
m.logger.Warn("failed to retry writing some metering data", zap.Error(firstErr))
342+
}
343+
344+
m.pendingRetryData.remove(needRemove)
345+
}
346+
347+
func (m *Meter) flushLoop(ctx context.Context) {
234348
// Control the writing timestamp accurately enough so that the previous round won't be overwritten by the next round.
235349
curTime := time.Now()
236350
nextTime := curTime.Truncate(FlushInterval).Add(FlushInterval)
@@ -259,7 +373,7 @@ func (m *Meter) flush(ctx context.Context, ts int64) {
259373
if len(items) == 0 {
260374
logger.Info("no metering data to flush", zap.Int("recorder-count", len(currData)),
261375
zap.Duration("duration", time.Since(startTime)))
262-
m.onSuccessFlush(currData)
376+
m.afterFlush(currData)
263377
return
264378
}
265379

@@ -268,12 +382,16 @@ func (m *Meter) flush(ctx context.Context, ts int64) {
268382
logger.Warn("failed to write metering data", zap.Error(err),
269383
zap.Duration("duration", time.Since(startTime)),
270384
zap.Any("data", items))
385+
// metering expect incremental data. due to the case described in NewMeter,
386+
// we can only retry the data with given TS, and cannot accumulate with
387+
// new data and send with new TS as this will cause data duplication.
388+
m.pendingRetryData.addFailedData(ts, items)
271389
} else {
272390
logger.Info("succeed to write metering data",
273391
zap.Duration("duration", time.Since(startTime)),
274392
zap.Any("data", items))
275-
m.onSuccessFlush(currData)
276393
}
394+
m.afterFlush(currData)
277395
}
278396

279397
// WriteMeterData writes the metering data.

0 commit comments

Comments
 (0)