Skip to content

Commit

Permalink
Add Tasks Get API; add async reindexing
Browse files Browse the repository at this point in the history
This commit adds to PR #550 and changes a few things.

First, the Reindex.Start method is renamed to DoAsync to reflect that
Do starts the request. Reindex.DoAsync starts reindexing in the
background and returns a task id, which can be watched via the Task Get
API, which was also added in #550.

Furthermore, it adds some tests and missing fields in the response
types, and fixes a typo or two in the JSON structs.

See #550
  • Loading branch information
olivere committed Jul 18, 2017
1 parent 6aa4cc3 commit 3116eca
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 30 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ John Goodall [@jgoodall](https://github.com/jgoodall)
John Stanford [@jxstanford](https://github.com/jxstanford)
jun [@coseyo](https://github.com/coseyo)
Junpei Tsuji [@jun06t](https://github.com/jun06t)
Keith Hatton [@khatton-ft](https://github.com/khatton-ft)
Kenta SUZUKI [@suzuken](https://github.com/suzuken)
Kevin Mulvey [@kmulvey](https://github.com/kmulvey)
Kyle Brandt [@kylebrandt](https://github.com/kylebrandt)
Expand Down
8 changes: 7 additions & 1 deletion reindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,21 @@ func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, er
return ret, nil
}

func (s *ReindexService) Start(ctx context.Context) (*StartTaskResult, error) {
// DoAsync executes the reindexing operation asynchronously by starting a new task.
// Callers need to use the Task Management API to watch the outcome of the reindexing
// operation.
func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error) {
// Check pre-conditions
if err := s.Validate(); err != nil {
return nil, err
}

// DoAsync only makes sense with WaitForCompletion set to true
if s.waitForCompletion != nil && *s.waitForCompletion {
return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true")
}
f := false
s.waitForCompletion = &f

// Get URL for request
path, params, err := s.buildURL()
Expand Down
17 changes: 8 additions & 9 deletions reindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ func TestReindex(t *testing.T) {
}
}

func TestReindexWithWaitForCompletionFalse(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t)
func TestReindexAsync(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0)))
esversion, err := client.ElasticsearchVersion(DefaultURL)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -344,20 +344,19 @@ func TestReindexWithWaitForCompletionFalse(t *testing.T) {
// Simple copying
src := NewReindexSource().Index(testIndexName)
dst := NewReindexDestination().Index(testIndexName2)
res, err := client.Reindex().Source(src).Destination(dst).WaitForCompletion(false).Start(context.TODO())
res, err := client.Reindex().Source(src).Destination(dst).DoAsync(context.TODO())
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("expected result != nil")
}

if len(res.TaskID) == 0 {
t.Errorf("expected a task id, got %v", res)
if res.TaskId == "" {
t.Errorf("expected a task id, got %+v", res)
}

tasksGetTask := client.TasksGetTask()
taskStatus, err := tasksGetTask.TaskId(res.TaskID).Do(context.TODO())
taskStatus, err := tasksGetTask.TaskId(res.TaskId).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -392,10 +391,10 @@ func TestReindexWithWaitForCompletionTrueCannotBeStarted(t *testing.T) {
t.Fatalf("expected %d documents; got: %d", 0, targetCount)
}

// Simple copying
// DoAsync should fail when WaitForCompletion is true
src := NewReindexSource().Index(testIndexName)
dst := NewReindexDestination().Index(testIndexName2)
_, err = client.Reindex().Source(src).Destination(dst).WaitForCompletion(true).Start(context.TODO())
_, err = client.Reindex().Source(src).Destination(dst).WaitForCompletion(true).DoAsync(context.TODO())
if err == nil {
t.Fatal("error should have been returned")
}
Expand Down
21 changes: 4 additions & 17 deletions tasks_get_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type TasksGetTaskService struct {
client *Client
pretty bool
taskId string
detailed *bool
waitForCompletion *bool
}

Expand All @@ -33,12 +32,6 @@ func (s *TasksGetTaskService) TaskId(taskId string) *TasksGetTaskService {
return s
}

// Detailed indicates whether to return detailed task information (default: false).
func (s *TasksGetTaskService) Detailed(detailed bool) *TasksGetTaskService {
s.detailed = &detailed
return s
}

// WaitForCompletion indicates whether to wait for the matching tasks
// to complete (default: false).
func (s *TasksGetTaskService) WaitForCompletion(waitForCompletion bool) *TasksGetTaskService {
Expand All @@ -55,24 +48,18 @@ func (s *TasksGetTaskService) Pretty(pretty bool) *TasksGetTaskService {
// buildURL builds the URL for the operation.
func (s *TasksGetTaskService) buildURL() (string, url.Values, error) {
// Build URL
var err error
var path string
params := url.Values{}
path, err = uritemplates.Expand("/_tasks/{task_id}", map[string]string{
path, err := uritemplates.Expand("/_tasks/{task_id}", map[string]string{
"task_id": s.taskId,
})

if err != nil {
return "", url.Values{}, err
}

// Add query string parameters
params := url.Values{}
if s.pretty {
params.Set("pretty", "1")
}
if s.detailed != nil {
params.Set("detailed", fmt.Sprintf("%v", *s.detailed))
}
if s.waitForCompletion != nil {
params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
}
Expand Down Expand Up @@ -112,6 +99,6 @@ func (s *TasksGetTaskService) Do(ctx context.Context) (*TasksGetTaskResponse, er
}

type TasksGetTaskResponse struct {
Completed bool `json:completed`
Task TaskInfo `json:task`
Completed bool `json:"completed"`
Task *TaskInfo `json:"task,omitempty"`
}
43 changes: 43 additions & 0 deletions tasks_get_task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

import (
"testing"
)

func TestTasksGetTaskBuildURL(t *testing.T) {
client := setupTestClient(t)

// Get specific task
got, _, err := client.TasksGetTask().TaskId("123").buildURL()
if err != nil {
t.Fatal(err)
}
want := "/_tasks/123"
if got != want {
t.Errorf("want %q; got %q", want, got)
}
}

/*
func TestTasksGetTask(t *testing.T) {
client := setupTestClientAndCreateIndexAndAddDocs(t)
esversion, err := client.ElasticsearchVersion(DefaultURL)
if err != nil {
t.Fatal(err)
}
if esversion < "2.3.0" {
t.Skipf("Elasticsearch %v does not support Tasks Management API yet", esversion)
}
res, err := client.TasksGetTask().TaskId("123").Do(context.TODO())
if err != nil {
t.Fatal(err)
}
if res == nil {
t.Fatal("response is nil")
}
}
*/
9 changes: 6 additions & 3 deletions tasks_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,18 @@ type TaskInfo struct {
Id int64 `json:"id"` // the task id
Type string `json:"type"`
Action string `json:"action"`
Status interface{} `json:"status"`
Description interface{} `json:"description"`
Status interface{} `json:"status"` // has separate implementations of Task.Status in Java for reindexing, replication, and "RawTaskStatus"
Description interface{} `json:"description"` // same as Status
StartTime string `json:"start_time"`
StartTimeInMillis int64 `json:"start_time_in_millis"`
RunningTime string `json:"running_time"`
RunningTimeInNanos int64 `json:"running_time_in_nanos"`
Cancellable bool `json:"cancellable"`
ParentTaskId string `json:"parent_task_id"` // like "YxJnVYjwSBm_AUbzddTajQ:12356"
}

// StartTaskResult is used in cases where a task gets started asynchronously and
// the operation simply returnes a TaskID to watch for via the Task Management API.
type StartTaskResult struct {
TaskID string `json:"task"`
TaskId string `json:"task"`
}

0 comments on commit 3116eca

Please sign in to comment.