From ee3ebceab960cf68ab9a89ee6d78c031ef5b4a4e Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Sun, 6 Nov 2016 14:31:42 +0100 Subject: [PATCH] Add Ingest API Close #381 --- client.go | 26 ++++- ingest_delete_pipeline.go | 124 ++++++++++++++++++++++++ ingest_delete_pipeline_test.go | 31 ++++++ ingest_get_pipeline.go | 118 +++++++++++++++++++++++ ingest_get_pipeline_test.go | 118 +++++++++++++++++++++++ ingest_put_pipeline.go | 152 ++++++++++++++++++++++++++++++ ingest_put_pipeline_test.go | 31 ++++++ ingest_simulate_pipeline.go | 157 +++++++++++++++++++++++++++++++ ingest_simulate_pipeline_test.go | 35 +++++++ 9 files changed, 791 insertions(+), 1 deletion(-) create mode 100644 ingest_delete_pipeline.go create mode 100644 ingest_delete_pipeline_test.go create mode 100644 ingest_get_pipeline.go create mode 100644 ingest_get_pipeline_test.go create mode 100644 ingest_put_pipeline.go create mode 100644 ingest_put_pipeline_test.go create mode 100644 ingest_simulate_pipeline.go create mode 100644 ingest_simulate_pipeline_test.go diff --git a/client.go b/client.go index 9d029e816..b73d7d14b 100644 --- a/client.go +++ b/client.go @@ -24,7 +24,7 @@ import ( const ( // Version is the current version of Elastic. - Version = "5.0.3" + Version = "5.0.4" // DefaultUrl is the default endpoint of Elasticsearch on the local machine. // It is used e.g. when initializing a new Client without a specific URL. @@ -1463,6 +1463,30 @@ func (c *Client) PutMapping() *IndicesPutMappingService { // TODO cat shards // TODO cat segments +// -- Ingest APIs -- + +// IngestPutPipeline adds pipelines and updates existing pipelines in +// the cluster. +func (c *Client) IngestPutPipeline(id string) *IngestPutPipelineService { + return NewIngestPutPipelineService(c).Id(id) +} + +// IngestGetPipeline returns pipelines based on ID. +func (c *Client) IngestGetPipeline(ids ...string) *IngestGetPipelineService { + return NewIngestGetPipelineService(c).Id(ids...) +} + +// IngestDeletePipeline deletes a pipeline by ID. +func (c *Client) IngestDeletePipeline(id string) *IngestDeletePipelineService { + return NewIngestDeletePipelineService(c).Id(id) +} + +// IngestSimulatePipeline executes a specific pipeline against the set of +// documents provided in the body of the request. +func (c *Client) IngestSimulatePipeline() *IngestSimulatePipelineService { + return NewIngestSimulatePipelineService(c) +} + // -- Cluster APIs -- // ClusterHealth retrieves the health of the cluster. diff --git a/ingest_delete_pipeline.go b/ingest_delete_pipeline.go new file mode 100644 index 000000000..641c1eb26 --- /dev/null +++ b/ingest_delete_pipeline.go @@ -0,0 +1,124 @@ +// Copyright 2012-2016 Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "encoding/json" + "fmt" + "net/url" + + "golang.org/x/net/context" + + "gopkg.in/olivere/elastic.v5/uritemplates" +) + +// IngestDeletePipelineService deletes pipelines by ID. +// It is documented at https://www.elastic.co/guide/en/elasticsearch/reference/5.0/delete-pipeline-api.html. +type IngestDeletePipelineService struct { + client *Client + pretty bool + id string + masterTimeout string + timeout string +} + +// NewIngestDeletePipelineService creates a new IngestDeletePipelineService. +func NewIngestDeletePipelineService(client *Client) *IngestDeletePipelineService { + return &IngestDeletePipelineService{ + client: client, + } +} + +// Id is documented as: Pipeline ID. +func (s *IngestDeletePipelineService) Id(id string) *IngestDeletePipelineService { + s.id = id + return s +} + +// MasterTimeout is documented as: Explicit operation timeout for connection to master node. +func (s *IngestDeletePipelineService) MasterTimeout(masterTimeout string) *IngestDeletePipelineService { + s.masterTimeout = masterTimeout + return s +} + +// Timeout is documented as: Explicit operation timeout. +func (s *IngestDeletePipelineService) Timeout(timeout string) *IngestDeletePipelineService { + s.timeout = timeout + return s +} + +// Pretty indicates that the JSON response be indented and human readable. +func (s *IngestDeletePipelineService) Pretty(pretty bool) *IngestDeletePipelineService { + s.pretty = pretty + return s +} + +// buildURL builds the URL for the operation. +func (s *IngestDeletePipelineService) buildURL() (string, url.Values, error) { + // Build URL + path, err := uritemplates.Expand("/_ingest/pipeline/{id}", map[string]string{ + "id": s.id, + }) + if err != nil { + return "", url.Values{}, err + } + + // Add query string parameters + params := url.Values{} + if s.pretty { + params.Set("pretty", "1") + } + if s.masterTimeout != "" { + params.Set("master_timeout", s.masterTimeout) + } + if s.timeout != "" { + params.Set("timeout", s.timeout) + } + return path, params, nil +} + +// Validate checks if the operation is valid. +func (s *IngestDeletePipelineService) Validate() error { + var invalid []string + if s.id == "" { + invalid = append(invalid, "Id") + } + if len(invalid) > 0 { + return fmt.Errorf("missing required fields: %v", invalid) + } + return nil +} + +// Do executes the operation. +func (s *IngestDeletePipelineService) Do(ctx context.Context) (*IngestDeletePipelineResponse, error) { + // Check pre-conditions + if err := s.Validate(); err != nil { + return nil, err + } + + // Get URL for request + path, params, err := s.buildURL() + if err != nil { + return nil, err + } + + // Get HTTP response + res, err := s.client.PerformRequest(ctx, "DELETE", path, params, nil) + if err != nil { + return nil, err + } + + // Return operation response + ret := new(IngestDeletePipelineResponse) + if err := json.Unmarshal(res.Body, ret); err != nil { + return nil, err + } + return ret, nil +} + +// IngestDeletePipelineResponse is the response of IngestDeletePipelineService.Do. +type IngestDeletePipelineResponse struct { + Acknowledged bool `json:"acknowledged"` +} diff --git a/ingest_delete_pipeline_test.go b/ingest_delete_pipeline_test.go new file mode 100644 index 000000000..1163e0f17 --- /dev/null +++ b/ingest_delete_pipeline_test.go @@ -0,0 +1,31 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import "testing" + +func TestIngestDeletePipelineURL(t *testing.T) { + client := setupTestClientAndCreateIndex(t) + + tests := []struct { + Id string + Expected string + }{ + { + "my-pipeline-id", + "/_ingest/pipeline/my-pipeline-id", + }, + } + + for _, test := range tests { + path, _, err := client.IngestDeletePipeline(test.Id).buildURL() + if err != nil { + t.Fatal(err) + } + if path != test.Expected { + t.Errorf("expected %q; got: %q", test.Expected, path) + } + } +} diff --git a/ingest_get_pipeline.go b/ingest_get_pipeline.go new file mode 100644 index 000000000..ecff1a862 --- /dev/null +++ b/ingest_get_pipeline.go @@ -0,0 +1,118 @@ +// Copyright 2012-2016 Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "encoding/json" + "net/url" + "strings" + + "golang.org/x/net/context" + + "gopkg.in/olivere/elastic.v5/uritemplates" +) + +// IngestGetPipelineService returns pipelines based on ID. +// See https://www.elastic.co/guide/en/elasticsearch/reference/5.0/get-pipeline-api.html +// for documentation. +type IngestGetPipelineService struct { + client *Client + pretty bool + id []string + masterTimeout string +} + +// NewIngestGetPipelineService creates a new IngestGetPipelineService. +func NewIngestGetPipelineService(client *Client) *IngestGetPipelineService { + return &IngestGetPipelineService{ + client: client, + } +} + +// Id is a list of pipeline ids. Wildcards supported. +func (s *IngestGetPipelineService) Id(id ...string) *IngestGetPipelineService { + s.id = append(s.id, id...) + return s +} + +// MasterTimeout is an explicit operation timeout for connection to master node. +func (s *IngestGetPipelineService) MasterTimeout(masterTimeout string) *IngestGetPipelineService { + s.masterTimeout = masterTimeout + return s +} + +// Pretty indicates that the JSON response be indented and human readable. +func (s *IngestGetPipelineService) Pretty(pretty bool) *IngestGetPipelineService { + s.pretty = pretty + return s +} + +// buildURL builds the URL for the operation. +func (s *IngestGetPipelineService) buildURL() (string, url.Values, error) { + var err error + var path string + + // Build URL + if len(s.id) > 0 { + path, err = uritemplates.Expand("/_ingest/pipeline/{id}", map[string]string{ + "id": strings.Join(s.id, ","), + }) + } else { + path = "/_ingest/pipeline" + } + if err != nil { + return "", url.Values{}, err + } + + // Add query string parameters + params := url.Values{} + if s.pretty { + params.Set("pretty", "1") + } + if s.masterTimeout != "" { + params.Set("master_timeout", s.masterTimeout) + } + return path, params, nil +} + +// Validate checks if the operation is valid. +func (s *IngestGetPipelineService) Validate() error { + return nil +} + +// Do executes the operation. +func (s *IngestGetPipelineService) Do(ctx context.Context) (IngestGetPipelineResponse, error) { + // Check pre-conditions + if err := s.Validate(); err != nil { + return nil, err + } + + // Get URL for request + path, params, err := s.buildURL() + if err != nil { + return nil, err + } + + // Get HTTP response + res, err := s.client.PerformRequest(ctx, "GET", path, params, nil) + if err != nil { + return nil, err + } + + // Return operation response + var ret IngestGetPipelineResponse + if err := json.Unmarshal(res.Body, &ret); err != nil { + return nil, err + } + return ret, nil +} + +// IngestGetPipelineResponse is the response of IngestGetPipelineService.Do. +type IngestGetPipelineResponse map[string]*IngestGetPipeline + +type IngestGetPipeline struct { + ID string `json:"id"` + Config map[string]interface{} `json:"config"` +} diff --git a/ingest_get_pipeline_test.go b/ingest_get_pipeline_test.go new file mode 100644 index 000000000..ddafe9fce --- /dev/null +++ b/ingest_get_pipeline_test.go @@ -0,0 +1,118 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "context" + "testing" +) + +func TestIngestGetPipelineURL(t *testing.T) { + client := setupTestClientAndCreateIndex(t) + + tests := []struct { + Id []string + Expected string + }{ + { + nil, + "/_ingest/pipeline", + }, + { + []string{"my-pipeline-id"}, + "/_ingest/pipeline/my-pipeline-id", + }, + { + []string{"*"}, + "/_ingest/pipeline/%2A", + }, + { + []string{"pipeline-1", "pipeline-2"}, + "/_ingest/pipeline/pipeline-1%2Cpipeline-2", + }, + } + + for _, test := range tests { + path, _, err := client.IngestGetPipeline(test.Id...).buildURL() + if err != nil { + t.Fatal(err) + } + if path != test.Expected { + t.Errorf("expected %q; got: %q", test.Expected, path) + } + } +} + +func TestIngestLifecycle(t *testing.T) { + client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0))) + + // Get all pipelines (returns 404 that indicates an error) + getres, err := client.IngestGetPipeline().Do(context.TODO()) + if err == nil { + t.Fatal(err) + } + if getres != nil { + t.Fatalf("expected no response, got %v", getres) + } + + // Add a pipeline + pipelineDef := `{ + "description" : "reset retweets", + "processors" : [ + { + "set" : { + "field": "retweets", + "value": 0 + } + } + ] +}` + putres, err := client.IngestPutPipeline("my-pipeline").BodyString(pipelineDef).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + if putres == nil { + t.Fatal("expected response, got nil") + } + if want, have := true, putres.Acknowledged; want != have { + t.Fatalf("expected ack = %v, got %v", want, have) + } + + // Get all pipelines again + getres, err = client.IngestGetPipeline().Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + if want, have := 1, len(getres); want != have { + t.Fatalf("expected %d pipelines, got %d", want, have) + } + if _, found := getres["my-pipeline"]; !found { + t.Fatalf("expected to find pipline with id %q", "my-pipeline") + } + + // Get all pipeline by ID + getres, err = client.IngestGetPipeline("my-pipeline").Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + if want, have := 1, len(getres); want != have { + t.Fatalf("expected %d pipelines, got %d", want, have) + } + if _, found := getres["my-pipeline"]; !found { + t.Fatalf("expected to find pipline with id %q", "my-pipeline") + } + + // Delete pipeline + delres, err := client.IngestDeletePipeline("my-pipeline").Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + if delres == nil { + t.Fatal("expected response, got nil") + } + if want, have := true, delres.Acknowledged; want != have { + t.Fatalf("expected ack = %v, got %v", want, have) + } +} diff --git a/ingest_put_pipeline.go b/ingest_put_pipeline.go new file mode 100644 index 000000000..723a8ad78 --- /dev/null +++ b/ingest_put_pipeline.go @@ -0,0 +1,152 @@ +// Copyright 2012-2016 Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "encoding/json" + "fmt" + "net/url" + + "golang.org/x/net/context" + + "gopkg.in/olivere/elastic.v5/uritemplates" +) + +// IngestPutPipelineService adds pipelines and updates existing pipelines in +// the cluster. +// +// It is documented at https://www.elastic.co/guide/en/elasticsearch/reference/5.0/put-pipeline-api.html. +type IngestPutPipelineService struct { + client *Client + pretty bool + id string + masterTimeout string + timeout string + bodyJson interface{} + bodyString string +} + +// NewIngestPutPipelineService creates a new IngestPutPipelineService. +func NewIngestPutPipelineService(client *Client) *IngestPutPipelineService { + return &IngestPutPipelineService{ + client: client, + } +} + +// Id is the pipeline ID. +func (s *IngestPutPipelineService) Id(id string) *IngestPutPipelineService { + s.id = id + return s +} + +// MasterTimeout is an explicit operation timeout for connection to master node. +func (s *IngestPutPipelineService) MasterTimeout(masterTimeout string) *IngestPutPipelineService { + s.masterTimeout = masterTimeout + return s +} + +// Timeout specifies an explicit operation timeout. +func (s *IngestPutPipelineService) Timeout(timeout string) *IngestPutPipelineService { + s.timeout = timeout + return s +} + +// Pretty indicates that the JSON response be indented and human readable. +func (s *IngestPutPipelineService) Pretty(pretty bool) *IngestPutPipelineService { + s.pretty = pretty + return s +} + +// BodyJson is the ingest definition, defined as a JSON-serializable document. +// Use e.g. a map[string]interface{} here. +func (s *IngestPutPipelineService) BodyJson(body interface{}) *IngestPutPipelineService { + s.bodyJson = body + return s +} + +// BodyString is the ingest definition, specified as a string. +func (s *IngestPutPipelineService) BodyString(body string) *IngestPutPipelineService { + s.bodyString = body + return s +} + +// buildURL builds the URL for the operation. +func (s *IngestPutPipelineService) buildURL() (string, url.Values, error) { + // Build URL + path, err := uritemplates.Expand("/_ingest/pipeline/{id}", map[string]string{ + "id": s.id, + }) + if err != nil { + return "", url.Values{}, err + } + + // Add query string parameters + params := url.Values{} + if s.pretty { + params.Set("pretty", "1") + } + if s.masterTimeout != "" { + params.Set("master_timeout", s.masterTimeout) + } + if s.timeout != "" { + params.Set("timeout", s.timeout) + } + return path, params, nil +} + +// Validate checks if the operation is valid. +func (s *IngestPutPipelineService) Validate() error { + var invalid []string + if s.id == "" { + invalid = append(invalid, "Id") + } + if s.bodyString == "" && s.bodyJson == nil { + invalid = append(invalid, "BodyJson") + } + if len(invalid) > 0 { + return fmt.Errorf("missing required fields: %v", invalid) + } + return nil +} + +// Do executes the operation. +func (s *IngestPutPipelineService) Do(ctx context.Context) (*IngestPutPipelineResponse, error) { + // Check pre-conditions + if err := s.Validate(); err != nil { + return nil, err + } + + // Get URL for request + path, params, err := s.buildURL() + if err != nil { + return nil, err + } + + // Setup HTTP request body + var body interface{} + if s.bodyJson != nil { + body = s.bodyJson + } else { + body = s.bodyString + } + + // Get HTTP response + res, err := s.client.PerformRequest(ctx, "PUT", path, params, body) + if err != nil { + return nil, err + } + + // Return operation response + ret := new(IngestPutPipelineResponse) + if err := json.Unmarshal(res.Body, ret); err != nil { + return nil, err + } + return ret, nil +} + +// IngestPutPipelineResponse is the response of IngestPutPipelineService.Do. +type IngestPutPipelineResponse struct { + Acknowledged bool `json:"acknowledged"` +} diff --git a/ingest_put_pipeline_test.go b/ingest_put_pipeline_test.go new file mode 100644 index 000000000..9609f2f53 --- /dev/null +++ b/ingest_put_pipeline_test.go @@ -0,0 +1,31 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import "testing" + +func TestIngestPutPipelineURL(t *testing.T) { + client := setupTestClientAndCreateIndex(t) + + tests := []struct { + Id string + Expected string + }{ + { + "my-pipeline-id", + "/_ingest/pipeline/my-pipeline-id", + }, + } + + for _, test := range tests { + path, _, err := client.IngestPutPipeline(test.Id).buildURL() + if err != nil { + t.Fatal(err) + } + if path != test.Expected { + t.Errorf("expected %q; got: %q", test.Expected, path) + } + } +} diff --git a/ingest_simulate_pipeline.go b/ingest_simulate_pipeline.go new file mode 100644 index 000000000..212327dfb --- /dev/null +++ b/ingest_simulate_pipeline.go @@ -0,0 +1,157 @@ +// Copyright 2012-2016 Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "encoding/json" + "fmt" + "net/url" + + "golang.org/x/net/context" + + "gopkg.in/olivere/elastic.v5/uritemplates" +) + +// IngestSimulatePipelineService executes a specific pipeline against the set of +// documents provided in the body of the request. +// +// The API is documented at +// https://www.elastic.co/guide/en/elasticsearch/reference/5.0/simulate-pipeline-api.html. +type IngestSimulatePipelineService struct { + client *Client + pretty bool + id string + verbose *bool + bodyJson interface{} + bodyString string +} + +// NewIngestSimulatePipelineService creates a new IngestSimulatePipeline. +func NewIngestSimulatePipelineService(client *Client) *IngestSimulatePipelineService { + return &IngestSimulatePipelineService{ + client: client, + } +} + +// Id specifies the pipeline ID. +func (s *IngestSimulatePipelineService) Id(id string) *IngestSimulatePipelineService { + s.id = id + return s +} + +// Verbose mode. Display data output for each processor in executed pipeline. +func (s *IngestSimulatePipelineService) Verbose(verbose bool) *IngestSimulatePipelineService { + s.verbose = &verbose + return s +} + +// Pretty indicates that the JSON response be indented and human readable. +func (s *IngestSimulatePipelineService) Pretty(pretty bool) *IngestSimulatePipelineService { + s.pretty = pretty + return s +} + +// BodyJson is the ingest definition, defined as a JSON-serializable simulate +// definition. Use e.g. a map[string]interface{} here. +func (s *IngestSimulatePipelineService) BodyJson(body interface{}) *IngestSimulatePipelineService { + s.bodyJson = body + return s +} + +// BodyString is the simulate definition, defined as a string. +func (s *IngestSimulatePipelineService) BodyString(body string) *IngestSimulatePipelineService { + s.bodyString = body + return s +} + +// buildURL builds the URL for the operation. +func (s *IngestSimulatePipelineService) buildURL() (string, url.Values, error) { + var err error + var path string + + // Build URL + if s.id != "" { + path, err = uritemplates.Expand("/_ingest/pipeline/{id}/_simulate", map[string]string{ + "id": s.id, + }) + } else { + path = "/_ingest/pipeline/_simulate" + } + if err != nil { + return "", url.Values{}, err + } + + // Add query string parameters + params := url.Values{} + if s.pretty { + params.Set("pretty", "1") + } + if s.verbose != nil { + params.Set("verbose", fmt.Sprintf("%v", *s.verbose)) + } + return path, params, nil +} + +// Validate checks if the operation is valid. +func (s *IngestSimulatePipelineService) Validate() error { + var invalid []string + if s.bodyString == "" && s.bodyJson == nil { + invalid = append(invalid, "BodyJson") + } + if len(invalid) > 0 { + return fmt.Errorf("missing required fields: %v", invalid) + } + return nil +} + +// Do executes the operation. +func (s *IngestSimulatePipelineService) Do(ctx context.Context) (*IngestSimulatePipelineResponse, error) { + // Check pre-conditions + if err := s.Validate(); err != nil { + return nil, err + } + + // Get URL for request + path, params, err := s.buildURL() + if err != nil { + return nil, err + } + + // Setup HTTP request body + var body interface{} + if s.bodyJson != nil { + body = s.bodyJson + } else { + body = s.bodyString + } + + // Get HTTP response + res, err := s.client.PerformRequest(ctx, "POST", path, params, body) + if err != nil { + return nil, err + } + + // Return operation response + ret := new(IngestSimulatePipelineResponse) + if err := json.Unmarshal(res.Body, ret); err != nil { + return nil, err + } + return ret, nil +} + +// IngestSimulatePipelineResponse is the response of IngestSimulatePipeline.Do. +type IngestSimulatePipelineResponse struct { + Docs []*IngestSimulateDocumentResult `json:"docs"` +} + +type IngestSimulateDocumentResult struct { + Doc map[string]interface{} `json:"doc"` + ProcessorResults []*IngestSimulateProcessorResult `json:"processor_results"` +} + +type IngestSimulateProcessorResult struct { + ProcessorTag string `json:"tag"` + Doc map[string]interface{} `json:"doc"` +} diff --git a/ingest_simulate_pipeline_test.go b/ingest_simulate_pipeline_test.go new file mode 100644 index 000000000..a254f85ff --- /dev/null +++ b/ingest_simulate_pipeline_test.go @@ -0,0 +1,35 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import "testing" + +func TestIngestSimulatePipelineURL(t *testing.T) { + client := setupTestClientAndCreateIndex(t) + + tests := []struct { + Id string + Expected string + }{ + { + "", + "/_ingest/pipeline/_simulate", + }, + { + "my-pipeline-id", + "/_ingest/pipeline/my-pipeline-id/_simulate", + }, + } + + for _, test := range tests { + path, _, err := client.IngestSimulatePipeline().Id(test.Id).buildURL() + if err != nil { + t.Fatal(err) + } + if path != test.Expected { + t.Errorf("expected %q; got: %q", test.Expected, path) + } + } +}