From b77d01d3674ea0eccf3001e60ab476eb10b8765e Mon Sep 17 00:00:00 2001 From: wayblink Date: Wed, 26 Jun 2024 10:24:03 +0800 Subject: [PATCH] fix: fix error ignore in compactor (#34169) #34170 Signed-off-by: wayblink --- internal/datanode/compaction/clustering_compactor.go | 1 + internal/datanode/compaction/mix_compactor.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 713af63997ac..1fb6b8c3acb7 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -771,6 +771,7 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return err } for fID, path := range partialBinlogs { diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index f46ef87724da..8a4ff77f4047 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -151,6 +151,7 @@ func (t *mixCompactionTask) merge( allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) + return nil, err } downloadTimeCost += time.Since(downloadStart) @@ -205,6 +206,7 @@ func (t *mixCompactionTask) merge( uploadStart := time.Now() if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return nil, err } uploadTimeCost += time.Since(uploadStart) mergeFieldBinlogs(allBinlogs, partialBinlogs) @@ -226,6 +228,7 @@ func (t *mixCompactionTask) merge( uploadStart := time.Now() if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return nil, err } uploadTimeCost += time.Since(uploadStart)