Skip to content

Commit

Permalink
fix: Make datacoord client retry on index api
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Feb 23, 2024
1 parent f0bff1e commit b988952
Show file tree
Hide file tree
Showing 4 changed files with 425 additions and 243 deletions.
154 changes: 138 additions & 16 deletions internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"github.com/cockroachdb/errors"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand All @@ -34,7 +35,9 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -559,9 +562,24 @@ func (c *Client) GcConfirm(ctx context.Context, req *datapb.GcConfirmRequest, op

// CreateIndex sends the build index request to IndexCoord.
func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.CreateIndex(ctx, req)
var resp *commonpb.Status
var err error

err = retry.Do(ctx, func() error {
var retryErr error
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.CreateIndex(ctx, req)
})

// retry on un implemented, to be compatible with 2.2.x
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
return retryErr
}
err = retryErr
return nil
})

return resp, err
}

// AlterIndex sends the alter index request to IndexCoord.
Expand All @@ -573,51 +591,155 @@ func (c *Client) AlterIndex(ctx context.Context, req *indexpb.AlterIndexRequest,

// GetIndexState gets the index states from IndexCoord.
func (c *Client) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest, opts ...grpc.CallOption) (*indexpb.GetIndexStateResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) {
return client.GetIndexState(ctx, req)
var resp *indexpb.GetIndexStateResponse
var err error

err = retry.Do(ctx, func() error {
var retryErr error
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStateResponse, error) {
return client.GetIndexState(ctx, req)
})

// retry on un implemented, to be compatible with 2.2.x
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
return retryErr
}
err = retryErr
return nil
})

return resp, err
}

// GetSegmentIndexState gets the index states from IndexCoord.
func (c *Client) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest, opts ...grpc.CallOption) (*indexpb.GetSegmentIndexStateResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetSegmentIndexStateResponse, error) {
return client.GetSegmentIndexState(ctx, req)
var resp *indexpb.GetSegmentIndexStateResponse
var err error

err = retry.Do(ctx, func() error {
var retryErr error
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetSegmentIndexStateResponse, error) {
return client.GetSegmentIndexState(ctx, req)
})

// retry on un implemented, to be compatible with 2.2.x
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
return retryErr
}
err = retryErr
return nil
})

return resp, err
}

// GetIndexInfos gets the index file paths from IndexCoord.
func (c *Client) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest, opts ...grpc.CallOption) (*indexpb.GetIndexInfoResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexInfoResponse, error) {
return client.GetIndexInfos(ctx, req)
var resp *indexpb.GetIndexInfoResponse
var err error

err = retry.Do(ctx, func() error {
var retryErr error
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexInfoResponse, error) {
return client.GetIndexInfos(ctx, req)
})

// retry on un implemented, to be compatible with 2.2.x
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
return retryErr
}
err = retryErr
return nil
})

return resp, err
}

// DescribeIndex describe the index info of the collection.
func (c *Client) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest, opts ...grpc.CallOption) (*indexpb.DescribeIndexResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.DescribeIndexResponse, error) {
return client.DescribeIndex(ctx, req)
var resp *indexpb.DescribeIndexResponse
var err error

err = retry.Do(ctx, func() error {
var retryErr error
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.DescribeIndexResponse, error) {
return client.DescribeIndex(ctx, req)
})

// retry on un implemented, to be compatible with 2.2.x
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
return retryErr
}
err = retryErr
return nil
})

return resp, err
}

// GetIndexStatistics get the statistics of the index.
func (c *Client) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexStatisticsRequest, opts ...grpc.CallOption) (*indexpb.GetIndexStatisticsResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStatisticsResponse, error) {
return client.GetIndexStatistics(ctx, req)
var resp *indexpb.GetIndexStatisticsResponse
var err error

err = retry.Do(ctx, func() error {
var retryErr error
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexStatisticsResponse, error) {
return client.GetIndexStatistics(ctx, req)
})

// retry on un implemented, to be compatible with 2.2.x
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
return retryErr
}
err = retryErr
return nil
})

return resp, err
}

// GetIndexBuildProgress describe the progress of the index.
func (c *Client) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest, opts ...grpc.CallOption) (*indexpb.GetIndexBuildProgressResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexBuildProgressResponse, error) {
return client.GetIndexBuildProgress(ctx, req)
var resp *indexpb.GetIndexBuildProgressResponse
var err error
err = retry.Do(ctx, func() error {
var retryErr error
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*indexpb.GetIndexBuildProgressResponse, error) {
return client.GetIndexBuildProgress(ctx, req)
})

// retry on un implemented, to be compatible with 2.2.x
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
return retryErr
}
err = retryErr
return nil
})

return resp, err
}

// DropIndex sends the drop index request to IndexCoord.
func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.DropIndex(ctx, req)
var resp *commonpb.Status
var err error

err = retry.Do(ctx, func() error {
var retryErr error
resp, retryErr = wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*commonpb.Status, error) {
return client.DropIndex(ctx, req)
})

// retry on un implemented, to be compatible with 2.2.x
if errors.Is(retryErr, merr.ErrServiceUnimplemented) {
return retryErr
}
err = retryErr
return nil
})

return resp, err
}

func (c *Client) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
Expand Down
Loading

0 comments on commit b988952

Please sign in to comment.