Skip to content

Commit

Permalink
additional check for errors
Browse files Browse the repository at this point in the history
elasticsearch would return a HTTP 200, even if the bulk request failed.
This is a first patch to mitigate the "losing documents" issue
(#5).

We abort the indexing, if we encounter a single failed response and
suggest the user to lower the number of workers (-w) or to increase
thread_pool.bulk.queue_size in the elasticsearch configuration.

Future versions of esbulk might transparently retry a failed operation
some number of times, but that requires a detailed inspection of the error
response and is therefore postponed from this fix.
  • Loading branch information
miku committed Nov 28, 2016
1 parent 1eeea56 commit b16722f
Showing 1 changed file with 45 additions and 2 deletions.
47 changes: 45 additions & 2 deletions common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package esbulk

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -32,6 +33,27 @@ type Options struct {
Scheme string
}

// Item represents a bulk action.
type Item struct {
IndexAction struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Status int `json:"status"`
Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"error"`
}
}

// BulkResponse is a response to a bulk request.
type BulkResponse struct {
Took int `json:"took"`
HasErrors bool `json:"errors"`
Items []Item `json:"items"`
}

// SetServer parses out host and port for a string and sets the option values.
func (o *Options) SetServer(s string) error {
locator, err := url.Parse(s)
Expand Down Expand Up @@ -123,15 +145,36 @@ func BulkIndex(docs []string, options Options) error {
lines = append(lines, header)
lines = append(lines, doc)
}

body := fmt.Sprintf("%s\n", strings.Join(lines, "\n"))

// There are multiple ways indexing can fail, e.g. connection errors or
// bad requests. Finally, if we have a HTTP 200, the bulk request could
// still have failed: for that we need to decode the elasticsearch
// response.
response, err := http.Post(link, "application/json", strings.NewReader(body))
if err != nil {
return err
}
defer response.Body.Close()

if response.StatusCode >= 400 {
return fmt.Errorf("indexing failed with %d %s", response.StatusCode, http.StatusText(response.StatusCode))
var buf bytes.Buffer
if _, err := io.Copy(&buf, response.Body); err != nil {
return err
}
return fmt.Errorf("indexing failed with %d %s: %s",
response.StatusCode, http.StatusText(response.StatusCode), buf.String())
}
return response.Body.Close()

var br BulkResponse
if err := json.NewDecoder(response.Body).Decode(&br); err != nil {
return err
}
if br.HasErrors {
return fmt.Errorf("error during bulk operation, try a lower -w value or increase thread_pool.bulk.queue_size in your nodes")
}
return nil
}

// Worker will batch index documents that come in on the lines channel.
Expand Down

0 comments on commit b16722f

Please sign in to comment.