Skip to content

Commit

Permalink
Support reindex as asynchronous task (#550)
Browse files Browse the repository at this point in the history
* With WaitForCompletion(false), a task id is returned.
* Monitor the progress of an individual task.
  • Loading branch information
Keith Hatton authored and olivere committed Jul 18, 2017
1 parent dec3d22 commit 6aa4cc3
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 0 deletions.
5 changes: 5 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,11 @@ func (c *Client) TasksList() *TasksListService {
return NewTasksListService(c)
}

// TasksGetTask retrieves a task running on the cluster.
func (c *Client) TasksGetTask() *TasksGetTaskService {
return NewTasksGetTaskService(c)
}

// TODO Pending cluster tasks
// TODO Cluster Reroute
// TODO Cluster Update Settings
Expand Down
36 changes: 36 additions & 0 deletions reindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,42 @@ func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, er
return ret, nil
}

func (s *ReindexService) Start(ctx context.Context) (*StartTaskResult, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
}

if s.waitForCompletion != nil && *s.waitForCompletion {
return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true")
}

// Get URL for request
path, params, err := s.buildURL()
if err != nil {
return nil, err
}

// Setup HTTP request body
body, err := s.getBody()
if err != nil {
return nil, err
}

// Get HTTP response
res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
if err != nil {
return nil, err
}

// Return operation response
ret := new(StartTaskResult)
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
return nil, err
}
return ret, nil
}

// -- Source of Reindex --

// ReindexSource specifies the source of a Reindex process.
Expand Down
86 changes: 86 additions & 0 deletions reindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,89 @@ func TestReindex(t *testing.T) {
t.Fatalf("expected %d documents; got: %d", sourceCount, targetCount)
}
}

func TestReindexWithWaitForCompletionFalse(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t)
esversion, err := client.ElasticsearchVersion(DefaultURL)
if err != nil {
t.Fatal(err)
}
if esversion < "2.3.0" {
t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
}

sourceCount, err := client.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if sourceCount <= 0 {
t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
}

targetCount, err := client.Count(testIndexName2).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if targetCount != 0 {
t.Fatalf("expected %d documents; got: %d", 0, targetCount)
}

// Simple copying
src := NewReindexSource().Index(testIndexName)
dst := NewReindexDestination().Index(testIndexName2)
res, err := client.Reindex().Source(src).Destination(dst).WaitForCompletion(false).Start(context.TODO())
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("expected result != nil")
}

if len(res.TaskID) == 0 {
t.Errorf("expected a task id, got %v", res)
}

tasksGetTask := client.TasksGetTask()
taskStatus, err := tasksGetTask.TaskId(res.TaskID).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if taskStatus == nil {
t.Fatal("expected task status result != nil")
}
}

func TestReindexWithWaitForCompletionTrueCannotBeStarted(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t)
esversion, err := client.ElasticsearchVersion(DefaultURL)
if err != nil {
t.Fatal(err)
}
if esversion < "2.3.0" {
t.Skipf("Elasticsearch %v does not support Reindex API yet", esversion)
}

sourceCount, err := client.Count(testIndexName).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if sourceCount <= 0 {
t.Fatalf("expected more than %d documents; got: %d", 0, sourceCount)
}

targetCount, err := client.Count(testIndexName2).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if targetCount != 0 {
t.Fatalf("expected %d documents; got: %d", 0, targetCount)
}

// Simple copying
src := NewReindexSource().Index(testIndexName)
dst := NewReindexDestination().Index(testIndexName2)
_, err = client.Reindex().Source(src).Destination(dst).WaitForCompletion(true).Start(context.TODO())
if err == nil {
t.Fatal("error should have been returned")
}
}
117 changes: 117 additions & 0 deletions tasks_get_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package elastic

import (
"context"
"fmt"
"net/url"

"gopkg.in/olivere/elastic.v5/uritemplates"
)

// TasksGetTaskService retrieves the state of a task in the cluster. It is part of the Task Management API
// documented at http://www.elastic.co/guide/en/elasticsearch/reference/5.2/tasks-list.html.
//
// It is supported as of Elasticsearch 2.3.0.
type TasksGetTaskService struct {
client *Client
pretty bool
taskId string
detailed *bool
waitForCompletion *bool
}

// NewTasksGetTaskService creates a new TasksGetTaskService.
func NewTasksGetTaskService(client *Client) *TasksGetTaskService {
return &TasksGetTaskService{
client: client,
}
}

// TaskId indicates to return the task with specified id.
func (s *TasksGetTaskService) TaskId(taskId string) *TasksGetTaskService {
s.taskId = taskId
return s
}

// Detailed indicates whether to return detailed task information (default: false).
func (s *TasksGetTaskService) Detailed(detailed bool) *TasksGetTaskService {
s.detailed = &detailed
return s
}

// WaitForCompletion indicates whether to wait for the matching tasks
// to complete (default: false).
func (s *TasksGetTaskService) WaitForCompletion(waitForCompletion bool) *TasksGetTaskService {
s.waitForCompletion = &waitForCompletion
return s
}

// Pretty indicates that the JSON response be indented and human readable.
func (s *TasksGetTaskService) Pretty(pretty bool) *TasksGetTaskService {
s.pretty = pretty
return s
}

// buildURL builds the URL for the operation.
func (s *TasksGetTaskService) buildURL() (string, url.Values, error) {
// Build URL
var err error
var path string
params := url.Values{}
path, err = uritemplates.Expand("/_tasks/{task_id}", map[string]string{
"task_id": s.taskId,
})

if err != nil {
return "", url.Values{}, err
}

// Add query string parameters
if s.pretty {
params.Set("pretty", "1")
}
if s.detailed != nil {
params.Set("detailed", fmt.Sprintf("%v", *s.detailed))
}
if s.waitForCompletion != nil {
params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
}
return path, params, nil
}

// Validate checks if the operation is valid.
func (s *TasksGetTaskService) Validate() error {
return nil
}

// Do executes the operation.
func (s *TasksGetTaskService) Do(ctx context.Context) (*TasksGetTaskResponse, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
}

// Get URL for request
path, params, err := s.buildURL()
if err != nil {
return nil, err
}

// Get HTTP response
res, err := s.client.PerformRequest(ctx, "GET", path, params, nil)
if err != nil {
return nil, err
}

// Return operation response
ret := new(TasksGetTaskResponse)
if err := s.client.decoder.Decode(res.Body, ret); err != nil {
return nil, err
}
return ret, nil
}

type TasksGetTaskResponse struct {
Completed bool `json:completed`
Task TaskInfo `json:task`
}
4 changes: 4 additions & 0 deletions tasks_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,7 @@ type TaskInfo struct {
RunningTimeInNanos int64 `json:"running_time_in_nanos"`
ParentTaskId string `json:"parent_task_id"` // like "YxJnVYjwSBm_AUbzddTajQ:12356"
}

type StartTaskResult struct {
TaskID string `json:"task"`
}

0 comments on commit 6aa4cc3

Please sign in to comment.