From 71ce680008d514ff4ad133084772888eb2cbe6ad Mon Sep 17 00:00:00 2001 From: ethfoo Date: Tue, 11 Jul 2023 13:52:31 +0800 Subject: [PATCH] Fix: check if bulk response has errors (#592) --- pkg/sink/elasticsearch/client.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/sink/elasticsearch/client.go b/pkg/sink/elasticsearch/client.go index ee6b9d47e..b9c27cf5c 100644 --- a/pkg/sink/elasticsearch/client.go +++ b/pkg/sink/elasticsearch/client.go @@ -177,19 +177,19 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error { c.cli.Bulk.WithParameters(c.config.Params), c.cli.Bulk.WithHeader(c.config.Headers)) if err != nil { - return err + return errors.WithMessagef(err, "request to elasticsearch bulk failed") } if resp.Body != nil { defer resp.Body.Close() } - if resp.IsError() { - blkResp := BulkIndexerResponse{} - err := json.NewDecoder(resp.Body).Decode(&blkResp) - if err != nil { - out, _ := json.Marshal(resp.Body) - return errors.Errorf("elasticsearch response error: %s", out) - } + blkResp := BulkIndexerResponse{} + if err := json.NewDecoder(resp.Body).Decode(&blkResp); err != nil { + out, _ := json.Marshal(resp.Body) + return errors.Errorf("elasticsearch response error: %s", out) + } + + if blkResp.HasErrors { failed := blkResp.Failed() failedCount := len(failed) // to avoid too many error messages