Skip to content

Commit

Permalink
Add Ingest API
Browse files Browse the repository at this point in the history
Close #381
  • Loading branch information
olivere committed Nov 6, 2016
1 parent a5168d4 commit ee3ebce
Show file tree
Hide file tree
Showing 9 changed files with 791 additions and 1 deletion.
26 changes: 25 additions & 1 deletion client.go
Expand Up @@ -24,7 +24,7 @@ import (

const (
// Version is the current version of Elastic.
Version = "5.0.3"
Version = "5.0.4"

// DefaultUrl is the default endpoint of Elasticsearch on the local machine.
// It is used e.g. when initializing a new Client without a specific URL.
Expand Down Expand Up @@ -1463,6 +1463,30 @@ func (c *Client) PutMapping() *IndicesPutMappingService {
// TODO cat shards
// TODO cat segments

// -- Ingest APIs --

// IngestPutPipeline adds pipelines and updates existing pipelines in
// the cluster.
func (c *Client) IngestPutPipeline(id string) *IngestPutPipelineService {
return NewIngestPutPipelineService(c).Id(id)
}

// IngestGetPipeline returns pipelines based on ID.
func (c *Client) IngestGetPipeline(ids ...string) *IngestGetPipelineService {
return NewIngestGetPipelineService(c).Id(ids...)
}

// IngestDeletePipeline deletes a pipeline by ID.
func (c *Client) IngestDeletePipeline(id string) *IngestDeletePipelineService {
return NewIngestDeletePipelineService(c).Id(id)
}

// IngestSimulatePipeline executes a specific pipeline against the set of
// documents provided in the body of the request.
func (c *Client) IngestSimulatePipeline() *IngestSimulatePipelineService {
return NewIngestSimulatePipelineService(c)
}

// -- Cluster APIs --

// ClusterHealth retrieves the health of the cluster.
Expand Down
124 changes: 124 additions & 0 deletions ingest_delete_pipeline.go
@@ -0,0 +1,124 @@
// Copyright 2012-2016 Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

import (
"encoding/json"
"fmt"
"net/url"

"golang.org/x/net/context"

"gopkg.in/olivere/elastic.v5/uritemplates"
)

// IngestDeletePipelineService deletes pipelines by ID.
// It is documented at https://www.elastic.co/guide/en/elasticsearch/reference/5.0/delete-pipeline-api.html.
type IngestDeletePipelineService struct {
client *Client
pretty bool
id string
masterTimeout string
timeout string
}

// NewIngestDeletePipelineService creates a new IngestDeletePipelineService.
func NewIngestDeletePipelineService(client *Client) *IngestDeletePipelineService {
return &IngestDeletePipelineService{
client: client,
}
}

// Id is documented as: Pipeline ID.
func (s *IngestDeletePipelineService) Id(id string) *IngestDeletePipelineService {
s.id = id
return s
}

// MasterTimeout is documented as: Explicit operation timeout for connection to master node.
func (s *IngestDeletePipelineService) MasterTimeout(masterTimeout string) *IngestDeletePipelineService {
s.masterTimeout = masterTimeout
return s
}

// Timeout is documented as: Explicit operation timeout.
func (s *IngestDeletePipelineService) Timeout(timeout string) *IngestDeletePipelineService {
s.timeout = timeout
return s
}

// Pretty indicates that the JSON response be indented and human readable.
func (s *IngestDeletePipelineService) Pretty(pretty bool) *IngestDeletePipelineService {
s.pretty = pretty
return s
}

// buildURL builds the URL for the operation.
func (s *IngestDeletePipelineService) buildURL() (string, url.Values, error) {
// Build URL
path, err := uritemplates.Expand("/_ingest/pipeline/{id}", map[string]string{
"id": s.id,
})
if err != nil {
return "", url.Values{}, err
}

// Add query string parameters
params := url.Values{}
if s.pretty {
params.Set("pretty", "1")
}
if s.masterTimeout != "" {
params.Set("master_timeout", s.masterTimeout)
}
if s.timeout != "" {
params.Set("timeout", s.timeout)
}
return path, params, nil
}

// Validate checks if the operation is valid.
func (s *IngestDeletePipelineService) Validate() error {
var invalid []string
if s.id == "" {
invalid = append(invalid, "Id")
}
if len(invalid) > 0 {
return fmt.Errorf("missing required fields: %v", invalid)
}
return nil
}

// Do executes the operation.
func (s *IngestDeletePipelineService) Do(ctx context.Context) (*IngestDeletePipelineResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
}

// Get URL for request
path, params, err := s.buildURL()
if err != nil {
return nil, err
}

// Get HTTP response
res, err := s.client.PerformRequest(ctx, "DELETE", path, params, nil)
if err != nil {
return nil, err
}

// Return operation response
ret := new(IngestDeletePipelineResponse)
if err := json.Unmarshal(res.Body, ret); err != nil {
return nil, err
}
return ret, nil
}

// IngestDeletePipelineResponse is the response of IngestDeletePipelineService.Do.
type IngestDeletePipelineResponse struct {
Acknowledged bool `json:"acknowledged"`
}
31 changes: 31 additions & 0 deletions ingest_delete_pipeline_test.go
@@ -0,0 +1,31 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

import "testing"

func TestIngestDeletePipelineURL(t *testing.T) {
client := setupTestClientAndCreateIndex(t)

tests := []struct {
Id string
Expected string
}{
{
"my-pipeline-id",
"/_ingest/pipeline/my-pipeline-id",
},
}

for _, test := range tests {
path, _, err := client.IngestDeletePipeline(test.Id).buildURL()
if err != nil {
t.Fatal(err)
}
if path != test.Expected {
t.Errorf("expected %q; got: %q", test.Expected, path)
}
}
}
118 changes: 118 additions & 0 deletions ingest_get_pipeline.go
@@ -0,0 +1,118 @@
// Copyright 2012-2016 Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

import (
"encoding/json"
"net/url"
"strings"

"golang.org/x/net/context"

"gopkg.in/olivere/elastic.v5/uritemplates"
)

// IngestGetPipelineService returns pipelines based on ID.
// See https://www.elastic.co/guide/en/elasticsearch/reference/5.0/get-pipeline-api.html
// for documentation.
type IngestGetPipelineService struct {
client *Client
pretty bool
id []string
masterTimeout string
}

// NewIngestGetPipelineService creates a new IngestGetPipelineService.
func NewIngestGetPipelineService(client *Client) *IngestGetPipelineService {
return &IngestGetPipelineService{
client: client,
}
}

// Id is a list of pipeline ids. Wildcards supported.
func (s *IngestGetPipelineService) Id(id ...string) *IngestGetPipelineService {
s.id = append(s.id, id...)
return s
}

// MasterTimeout is an explicit operation timeout for connection to master node.
func (s *IngestGetPipelineService) MasterTimeout(masterTimeout string) *IngestGetPipelineService {
s.masterTimeout = masterTimeout
return s
}

// Pretty indicates that the JSON response be indented and human readable.
func (s *IngestGetPipelineService) Pretty(pretty bool) *IngestGetPipelineService {
s.pretty = pretty
return s
}

// buildURL builds the URL for the operation.
func (s *IngestGetPipelineService) buildURL() (string, url.Values, error) {
var err error
var path string

// Build URL
if len(s.id) > 0 {
path, err = uritemplates.Expand("/_ingest/pipeline/{id}", map[string]string{
"id": strings.Join(s.id, ","),
})
} else {
path = "/_ingest/pipeline"
}
if err != nil {
return "", url.Values{}, err
}

// Add query string parameters
params := url.Values{}
if s.pretty {
params.Set("pretty", "1")
}
if s.masterTimeout != "" {
params.Set("master_timeout", s.masterTimeout)
}
return path, params, nil
}

// Validate checks if the operation is valid.
func (s *IngestGetPipelineService) Validate() error {
return nil
}

// Do executes the operation.
func (s *IngestGetPipelineService) Do(ctx context.Context) (IngestGetPipelineResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
}

// Get URL for request
path, params, err := s.buildURL()
if err != nil {
return nil, err
}

// Get HTTP response
res, err := s.client.PerformRequest(ctx, "GET", path, params, nil)
if err != nil {
return nil, err
}

// Return operation response
var ret IngestGetPipelineResponse
if err := json.Unmarshal(res.Body, &ret); err != nil {
return nil, err
}
return ret, nil
}

// IngestGetPipelineResponse is the response of IngestGetPipelineService.Do.
type IngestGetPipelineResponse map[string]*IngestGetPipeline

type IngestGetPipeline struct {
ID string `json:"id"`
Config map[string]interface{} `json:"config"`
}

0 comments on commit ee3ebce

Please sign in to comment.