From 653af7108cc1dd4aeeb4538113588e5220ecbe5a Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 22 May 2024 17:29:40 +0800 Subject: [PATCH] fix: Break the dead loop of downloading binlog during compaction (#33246) issue: #33245 --------- Signed-off-by: Cai Zhang --- internal/datanode/binlog_io.go | 36 +++++++++++++---------------- internal/datanode/binlog_io_test.go | 2 +- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index aa92e6afffe6..d214b6151411 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -32,20 +32,18 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/retry" ) -var ( - errUploadToBlobStorage = errors.New("upload to blob storage wrong") - errDownloadFromBlobStorage = errors.New("download from blob storage wrong") -) +var errUploadToBlobStorage = errors.New("upload to blob storage wrong") type downloader interface { // donload downloads insert-binlogs, stats-binlogs, and, delta-binlogs from blob storage for given paths. // The paths are 1 group of binlog paths generated by 1 `Serialize`. // - // errDownloadFromBlobStorage is returned if ctx is canceled from outside while a downloading is inprogress. - // Beware of the ctx here, if no timeout or cancel is applied to this ctx, this downloading may retry forever. + // throw errors promptly instead of needlessly retrying forever after v2.3.16 download(ctx context.Context, paths []string) ([]*Blob, error) } @@ -79,21 +77,19 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error for i, path := range paths { localPath := path future := getMultiReadPool().Submit(func() (any, error) { - var vs []byte - err := errStart - for err != nil { - select { - case <-ctx.Done(): - log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths)) - return nil, errDownloadFromBlobStorage - default: - if err != errStart { - time.Sleep(50 * time.Millisecond) - } - vs, err = b.Read(ctx, localPath) + var val []byte + var err error + + log.Debug("binlogIO download", zap.String("path", localPath)) + err = retry.Do(ctx, func() error { + val, err = b.Read(ctx, localPath) + if err != nil { + log.Warn("binlogIO fail to download", zap.String("path", localPath), zap.Error(err)) } - } - return vs, nil + return err + }, retry.Attempts(3), retry.RetryErr(merr.IsRetryableErr)) + + return val, err }) futures[i] = future } diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index 4aac62526427..1f6a74ef7630 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -90,7 +90,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { cancel() _, err := b.download(ctx, []string{"test"}) - assert.EqualError(t, err, errDownloadFromBlobStorage.Error()) + assert.ErrorIs(t, err, context.Canceled) } }) }