Skip to content

Commit

Permalink
Allow to retrieve _source from updated document
Browse files Browse the repository at this point in the history
The Bulk API now allows to fetch the source of the updated document.

See #677
  • Loading branch information
olivere committed Jan 18, 2018
1 parent d334c06 commit ba99e50
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 6 deletions.
1 change: 1 addition & 0 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ type BulkResponseItem struct {
ForcedRefresh bool `json:"forced_refresh,omitempty"`
Found bool `json:"found,omitempty"`
Error *ErrorDetails `json:"error,omitempty"`
GetResult *GetResult `json:"get,omitempty"`
}

// Indexed returns all bulk request results of "index" actions.
Expand Down
1 change: 0 additions & 1 deletion bulk_delete_request_easyjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion bulk_index_request_easyjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 32 additions & 3 deletions bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ func TestBulkWithIndexSetOnClient(t *testing.T) {
}
}

func TestBulkRequestsSerialization(t *testing.T) {
func TestBulkIndexDeleteUpdate(t *testing.T) {
client := setupTestClientAndCreateIndex(t)
//client := setupTestClientAndCreateIndexAndLog(t)

tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
tweet2 := tweet{User: "sandrae", Message: "Dancing all night long. Yeah."}
Expand All @@ -212,6 +213,7 @@ func TestBulkRequestsSerialization(t *testing.T) {
index2Req := NewBulkIndexRequest().OpType("create").Index(testIndexName).Type("tweet").Id("2").Doc(tweet2)
delete1Req := NewBulkDeleteRequest().Index(testIndexName).Type("tweet").Id("1")
update2Req := NewBulkUpdateRequest().Index(testIndexName).Type("tweet").Id("2").
ReturnSource(true).
Doc(struct {
Retweets int `json:"retweets"`
}{
Expand All @@ -234,7 +236,7 @@ func TestBulkRequestsSerialization(t *testing.T) {
{"user":"sandrae","message":"Dancing all night long. Yeah.","retweets":0,"created":"0001-01-01T00:00:00Z"}
{"delete":{"_id":"1","_index":"` + testIndexName + `","_type":"tweet"}}
{"update":{"_id":"2","_index":"` + testIndexName + `","_type":"tweet"}}
{"doc":{"retweets":42}}
{"doc":{"retweets":42},"_source":true}
`
got, err := bulkRequest.bodyAsString()
if err != nil {
Expand All @@ -245,7 +247,7 @@ func TestBulkRequestsSerialization(t *testing.T) {
}

// Run the bulk request
bulkResponse, err := bulkRequest.Do(context.TODO())
bulkResponse, err := bulkRequest.Pretty(true).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -291,6 +293,9 @@ func TestBulkRequestsSerialization(t *testing.T) {
if created[0].Status != 201 {
t.Errorf("expected created[0].Status == %d; got %d", 201, created[0].Status)
}
if want, have := "created", created[0].Result; want != have {
t.Errorf("expected created[0].Result == %q; got %q", want, have)
}

// Deleted actions
deleted := bulkResponse.Deleted()
Expand All @@ -309,6 +314,9 @@ func TestBulkRequestsSerialization(t *testing.T) {
if !deleted[0].Found {
t.Errorf("expected deleted[0].Found == %v; got %v", true, deleted[0].Found)
}
if want, have := "deleted", deleted[0].Result; want != have {
t.Errorf("expected deleted[0].Result == %q; got %q", want, have)
}

// Updated actions
updated := bulkResponse.Updated()
Expand All @@ -327,6 +335,25 @@ func TestBulkRequestsSerialization(t *testing.T) {
if updated[0].Version != 2 {
t.Errorf("expected updated[0].Version == %d; got %d", 2, updated[0].Version)
}
if want, have := "updated", updated[0].Result; want != have {
t.Errorf("expected updated[0].Result == %q; got %q", want, have)
}
if updated[0].GetResult == nil {
t.Fatalf("expected updated[0].GetResult to be != nil; got nil")
}
if updated[0].GetResult.Source == nil {
t.Fatalf("expected updated[0].GetResult.Source to be != nil; got nil")
}
if want, have := true, updated[0].GetResult.Found; want != have {
t.Fatalf("expected updated[0].GetResult.Found to be != %v; got %v", want, have)
}
var doc tweet
if err := json.Unmarshal(*updated[0].GetResult.Source, &doc); err != nil {
t.Fatalf("expected to unmarshal updated[0].GetResult.Source; got %v", err)
}
if want, have := 42, doc.Retweets; want != have {
t.Fatalf("expected updated tweet to have Retweets = %v; got %v", want, have)
}

// Succeeded actions
succeeded := bulkResponse.Succeeded()
Expand Down Expand Up @@ -534,6 +561,8 @@ func TestBulkContentType(t *testing.T) {
}
}

// -- Benchmarks --

func BenchmarkBulkAllocs(b *testing.B) {
b.Run("1000 docs with 64 byte", func(b *testing.B) { benchmarkBulkAllocs(b, 64, 1000) })
b.Run("1000 docs with 1 KiB", func(b *testing.B) { benchmarkBulkAllocs(b, 1024, 1000) })
Expand Down
12 changes: 12 additions & 0 deletions bulk_update_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type BulkUpdateRequest struct {
docAsUpsert *bool
detectNoop *bool
doc interface{}
returnSource *bool

source []string

Expand Down Expand Up @@ -62,6 +63,7 @@ type bulkUpdateRequestCommandData struct {
Upsert interface{} `json:"upsert,omitempty"`
Script interface{} `json:"script,omitempty"`
ScriptedUpsert *bool `json:"scripted_upsert,omitempty"`
Source *bool `json:"_source,omitempty"`
}

// NewBulkUpdateRequest returns a new BulkUpdateRequest.
Expand Down Expand Up @@ -194,6 +196,15 @@ func (r *BulkUpdateRequest) Upsert(doc interface{}) *BulkUpdateRequest {
return r
}

// ReturnSource specifies whether Elasticsearch should return the source
// after the update. In the request, this responds to the `_source` field.
// It is false by default.
func (r *BulkUpdateRequest) ReturnSource(source bool) *BulkUpdateRequest {
r.returnSource = &source
r.source = nil
return r
}

// String returns the on-wire representation of the update request,
// concatenated as a single string.
func (r *BulkUpdateRequest) String() string {
Expand Down Expand Up @@ -258,6 +269,7 @@ func (r *BulkUpdateRequest) Source() ([]string, error) {
Upsert: r.upsert,
ScriptedUpsert: r.scriptedUpsert,
Doc: r.doc,
Source: r.returnSource,
}
if r.script != nil {
script, err := r.script.Source()
Expand Down
21 changes: 20 additions & 1 deletion bulk_update_request_easyjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ba99e50

Please sign in to comment.