From 4506df7a8cfc35454d09aaa7ff6824046b7a6581 Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 25 Jun 2024 21:14:39 +0800 Subject: [PATCH] fix error ignore in compactor Signed-off-by: wayblink --- internal/datanode/compaction/clustering_compactor.go | 1 + internal/datanode/compaction/mix_compactor.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 713af63997ac..1fb6b8c3acb7 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -771,6 +771,7 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return err } for fID, path := range partialBinlogs { diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index f46ef87724da..8a4ff77f4047 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -151,6 +151,7 @@ func (t *mixCompactionTask) merge( allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) + return nil, err } downloadTimeCost += time.Since(downloadStart) @@ -205,6 +206,7 @@ func (t *mixCompactionTask) merge( uploadStart := time.Now() if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return nil, err } uploadTimeCost += time.Since(uploadStart) mergeFieldBinlogs(allBinlogs, partialBinlogs) @@ -226,6 +228,7 @@ func (t *mixCompactionTask) merge( uploadStart := time.Now() if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return nil, err } uploadTimeCost += time.Since(uploadStart)