Skip to content

Commit

Permalink
fix: Break the dead loop of downloading binlog during compaction (#33246
Browse files Browse the repository at this point in the history
)

issue: #33245

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed May 22, 2024
1 parent 1007e6d commit 653af71
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 21 deletions.
36 changes: 16 additions & 20 deletions internal/datanode/binlog_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,18 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/retry"
)

var (
errUploadToBlobStorage = errors.New("upload to blob storage wrong")
errDownloadFromBlobStorage = errors.New("download from blob storage wrong")
)
var errUploadToBlobStorage = errors.New("upload to blob storage wrong")

type downloader interface {
// donload downloads insert-binlogs, stats-binlogs, and, delta-binlogs from blob storage for given paths.
// The paths are 1 group of binlog paths generated by 1 `Serialize`.
//
// errDownloadFromBlobStorage is returned if ctx is canceled from outside while a downloading is inprogress.
// Beware of the ctx here, if no timeout or cancel is applied to this ctx, this downloading may retry forever.
// throw errors promptly instead of needlessly retrying forever after v2.3.16
download(ctx context.Context, paths []string) ([]*Blob, error)
}

Expand Down Expand Up @@ -79,21 +77,19 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
for i, path := range paths {
localPath := path
future := getMultiReadPool().Submit(func() (any, error) {
var vs []byte
err := errStart
for err != nil {
select {
case <-ctx.Done():
log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths))
return nil, errDownloadFromBlobStorage
default:
if err != errStart {
time.Sleep(50 * time.Millisecond)
}
vs, err = b.Read(ctx, localPath)
var val []byte
var err error

log.Debug("binlogIO download", zap.String("path", localPath))
err = retry.Do(ctx, func() error {
val, err = b.Read(ctx, localPath)
if err != nil {
log.Warn("binlogIO fail to download", zap.String("path", localPath), zap.Error(err))
}
}
return vs, nil
return err
}, retry.Attempts(3), retry.RetryErr(merr.IsRetryableErr))

return val, err
})
futures[i] = future
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/binlog_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
cancel()

_, err := b.download(ctx, []string{"test"})
assert.EqualError(t, err, errDownloadFromBlobStorage.Error())
assert.ErrorIs(t, err, context.Canceled)
}
})
}
Expand Down

0 comments on commit 653af71

Please sign in to comment.