Skip to content

Commit

Permalink
Add FetchSourceContext to Update API
Browse files Browse the repository at this point in the history
This commit adds the ability to fetch the source of the updated
response. It also refactors the FetchSourceContext to include the
required data that is sent over to Elasticsearch.
  • Loading branch information
olivere committed Jul 18, 2017
1 parent 4c8608a commit 9577055
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 25 deletions.
58 changes: 37 additions & 21 deletions fetch_source_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ import (
"strings"
)

// FetchSourceContext enables source filtering, i.e. it allows control
// over how the _source field is returned with every hit. It is used
// with various endpoints, e.g. when searching for documents, retrieving
// individual documents, or even updating documents.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/5.5/search-request-source-filtering.html
// for details.
type FetchSourceContext struct {
fetchSource bool
transformSource bool
includes []string
excludes []string
fetchSource bool
includes []string
excludes []string
}

// NewFetchSourceContext returns a new FetchSourceContext.
func NewFetchSourceContext(fetchSource bool) *FetchSourceContext {
return &FetchSourceContext{
fetchSource: fetchSource,
Expand All @@ -24,51 +31,60 @@ func NewFetchSourceContext(fetchSource bool) *FetchSourceContext {
}
}

// FetchSource indicates whether to return the _source.
func (fsc *FetchSourceContext) FetchSource() bool {
return fsc.fetchSource
}

// SetFetchSource specifies whether to return the _source.
func (fsc *FetchSourceContext) SetFetchSource(fetchSource bool) {
fsc.fetchSource = fetchSource
}

// Include indicates to return specific parts of the _source.
// Wildcards are allowed here.
func (fsc *FetchSourceContext) Include(includes ...string) *FetchSourceContext {
fsc.includes = append(fsc.includes, includes...)
return fsc
}

// Exclude indicates to exclude specific parts of the _source.
// Wildcards are allowed here.
func (fsc *FetchSourceContext) Exclude(excludes ...string) *FetchSourceContext {
fsc.excludes = append(fsc.excludes, excludes...)
return fsc
}

func (fsc *FetchSourceContext) TransformSource(transformSource bool) *FetchSourceContext {
fsc.transformSource = transformSource
return fsc
}

// Source returns the JSON-serializable data to be used in a body.
func (fsc *FetchSourceContext) Source() (interface{}, error) {
if !fsc.fetchSource {
return false, nil
}
return map[string]interface{}{
"includes": fsc.includes,
"excludes": fsc.excludes,
}, nil
if len(fsc.includes) == 0 && len(fsc.excludes) == 0 {
return true, nil
}
src := make(map[string]interface{})
if len(fsc.includes) > 0 {
src["includes"] = fsc.includes
}
if len(fsc.excludes) > 0 {
src["excludes"] = fsc.excludes
}
return src, nil
}

// Query returns the parameters in a form suitable for a URL query string.
func (fsc *FetchSourceContext) Query() url.Values {
params := url.Values{}
if !fsc.fetchSource {
if fsc.fetchSource {
if len(fsc.includes) > 0 {
params.Add("_source_include", strings.Join(fsc.includes, ","))
}
if len(fsc.excludes) > 0 {
params.Add("_source_exclude", strings.Join(fsc.excludes, ","))
}
} else {
params.Add("_source", "false")
return params
}
if len(fsc.includes) > 0 {
params.Add("_source_include", strings.Join(fsc.includes, ","))
}
if len(fsc.excludes) > 0 {
params.Add("_source_exclude", strings.Join(fsc.excludes, ","))
}
return params
}
4 changes: 2 additions & 2 deletions fetch_source_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestFetchSourceContextFetchSource(t *testing.T) {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"excludes":[],"includes":[]}`
expected := `true`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
Expand All @@ -71,7 +71,7 @@ func TestFetchSourceContextFetchSourceWithIncludesOnly(t *testing.T) {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"excludes":[],"includes":["a","b"]}`
expected := `{"includes":["a","b"]}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
Expand Down
2 changes: 1 addition & 1 deletion get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestGet(t *testing.T) {
}

func TestGetWithSourceFiltering(t *testing.T) {
client := setupTestClientAndCreateIndex(t)
client := setupTestClientAndCreateIndex(t) // , SetTraceLog(log.New(os.Stdout, "", 0)))

tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."}
_, err := client.Index().Index(testIndexName).Type("tweet").Id("1").BodyJson(&tweet1).Do(context.TODO())
Expand Down
2 changes: 1 addition & 1 deletion search_aggs_metrics_top_hits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestTopHitsAggregation(t *testing.T) {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"top_hits":{"_source":{"excludes":[],"includes":["title"]},"size":1,"sort":[{"last_activity_date":{"order":"desc"}}]}}`
expected := `{"top_hits":{"_source":{"includes":["title"]},"size":1,"sort":[{"last_activity_date":{"order":"desc"}}]}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
Expand Down
25 changes: 25 additions & 0 deletions update.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type UpdateService struct {
parent string
script *Script
fields []string
fsc *FetchSourceContext
version *int64
versionType string
retryOnConflict *int
Expand Down Expand Up @@ -172,6 +173,23 @@ func (b *UpdateService) Pretty(pretty bool) *UpdateService {
return b
}

// FetchSource asks Elasticsearch to return the updated _source in the response.
func (s *UpdateService) FetchSource(fetchSource bool) *UpdateService {
if s.fsc == nil {
s.fsc = NewFetchSourceContext(fetchSource)
} else {
s.fsc.SetFetchSource(fetchSource)
}
return s
}

// FetchSourceContext indicates that _source should be returned in the response,
// allowing wildcard patterns to be defined via FetchSourceContext.
func (s *UpdateService) FetchSourceContext(fetchSourceContext *FetchSourceContext) *UpdateService {
s.fsc = fetchSourceContext
return s
}

// url returns the URL part of the document request.
func (b *UpdateService) url() (string, url.Values, error) {
// Build url
Expand Down Expand Up @@ -250,6 +268,13 @@ func (b *UpdateService) body() (interface{}, error) {
if b.detectNoop != nil {
source["detect_noop"] = *b.detectNoop
}
if b.fsc != nil {
src, err := b.fsc.Source()
if err != nil {
return nil, err
}
source["_source"] = src
}

return source, nil
}
Expand Down
68 changes: 68 additions & 0 deletions update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elastic

import (
"context"
"encoding/json"
"net/url"
"testing"
Expand Down Expand Up @@ -231,3 +232,70 @@ func TestUpdateViaDocAndUpsert(t *testing.T) {
t.Errorf("expected\n%s\ngot:\n%s", expected, got)
}
}

func TestUpdateViaDocAndUpsertAndFetchSource(t *testing.T) {
client := setupTestClient(t)
update := client.Update().
Index("test").Type("type1").Id("1").
Doc(map[string]interface{}{"name": "new_name"}).
DocAsUpsert(true).
Timeout("1s").
Refresh("true").
FetchSource(true)
path, params, err := update.url()
if err != nil {
t.Fatalf("expected to return URL, got: %v", err)
}
expectedPath := `/test/type1/1/_update`
if expectedPath != path {
t.Errorf("expected URL path\n%s\ngot:\n%s", expectedPath, path)
}
expectedParams := url.Values{
"refresh": []string{"true"},
"timeout": []string{"1s"},
}
if expectedParams.Encode() != params.Encode() {
t.Errorf("expected URL parameters\n%s\ngot:\n%s", expectedParams.Encode(), params.Encode())
}
body, err := update.body()
if err != nil {
t.Fatalf("expected to return body, got: %v", err)
}
data, err := json.Marshal(body)
if err != nil {
t.Fatalf("expected to marshal body as JSON, got: %v", err)
}
got := string(data)
expected := `{"_source":true,"doc":{"name":"new_name"},"doc_as_upsert":true}`
if got != expected {
t.Errorf("expected\n%s\ngot:\n%s", expected, got)
}
}

func TestUpdateAndFetchSource(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t) // , SetTraceLog(log.New(os.Stdout, "", 0)))
res, err := client.Update().
Index(testIndexName).Type("tweet").Id("1").
Doc(map[string]interface{}{"user": "sandrae"}).
DetectNoop(true).
FetchSource(true).
Do(context.Background())
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("expected response != nil")
}
if res.GetResult == nil {
t.Fatal("expected GetResult != nil")
}
data, err := json.Marshal(res.GetResult.Source)
if err != nil {
t.Fatalf("expected to marshal body as JSON, got: %v", err)
}
got := string(data)
expected := `{"user":"sandrae","message":"Welcome to Golang and Elasticsearch.","retweets":0,"created":"0001-01-01T00:00:00Z"}`
if got != expected {
t.Errorf("expected\n%s\ngot:\n%s", expected, got)
}
}

0 comments on commit 9577055

Please sign in to comment.