Skip to content

Commit

Permalink
Merge pull request #21 from k-yomo/USS-1292-wait-until-replica-update…
Browse files Browse the repository at this point in the history
…-finishes

Wait until replica update finishes
  • Loading branch information
k-yomo committed Mar 27, 2023
2 parents 9facee9 + 6a40c47 commit 8c46ae2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 20 deletions.
58 changes: 41 additions & 17 deletions pkg/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sort"
"strconv"
"time"

"github.com/elastic/cloud-sdk-go/pkg/util/slice"
esv8 "github.com/elastic/go-elasticsearch/v8"
Expand Down Expand Up @@ -146,20 +147,20 @@ func (c *clientImpl) GetIndexSettings(ctx context.Context, indexName string) (*I
}

type IndexHealth struct {
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
ActiveShardsPercentAsNumber int `json:"active_shards_percent_as_number"`
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
}

func (i *IndexHealth) IsHealthy() bool {
Expand Down Expand Up @@ -207,7 +208,30 @@ func (c *clientImpl) UpdateIndexReplicaNum(ctx context.Context, indexName string
return fmt.Errorf("update number_of_replica: %w", err)
}

// TODO: wait until shard relocation finishes

return nil
return c.waitUntilIndexBecomeHealthy(ctx, indexName)
}

func (c *clientImpl) waitUntilIndexBecomeHealthy(ctx context.Context, indexName string) error {
ticker := time.NewTicker(1 * time.Minute)
consecutiveErrCount := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
indexHealth, err := c.GetIndexHealth(ctx, indexName)
if err != nil {
consecutiveErrCount += 1
if consecutiveErrCount == 3 {
return fmt.Errorf("get index health: %w", err)
}
continue
}
consecutiveErrCount = 0

if indexHealth.IsHealthy() {
return nil
}
}
}
}
24 changes: 21 additions & 3 deletions pkg/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func Test_clientImpl_GetIndexHealth(t *testing.T) {
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 100
"active_shards_percent_as_number": 100.0
}
`),
want: &IndexHealth{
Expand All @@ -498,7 +498,7 @@ func Test_clientImpl_GetIndexHealth(t *testing.T) {
NumberOfDataNodes: 1,
ActivePrimaryShards: 1,
ActiveShards: 1,
ActiveShardsPercentAsNumber: 100,
ActiveShardsPercentAsNumber: 100.0,
},
},
{
Expand Down Expand Up @@ -549,7 +549,25 @@ func Test_clientImpl_UpdateIndexReplicaNum(t *testing.T) {
indexName: "test",
replicaNum: 2,
},
esClient: newESClientWithMockResponse(200, `{"acknowledged": true}`),
esClient: newESClientWithMockResponse(200, `
{
"cluster_name": "dummy_cluster_name",
"status": "green",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": 1,
"active_shards": 1,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 100.0
}
`),
},
{
name: "error response",
Expand Down

0 comments on commit 8c46ae2

Please sign in to comment.