Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add list import tasks api #16605

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
939 changes: 773 additions & 166 deletions internal/core/src/pb/milvus.pb.cc

Large diffs are not rendered by default.

385 changes: 375 additions & 10 deletions internal/core/src/pb/milvus.pb.h

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ func (m *mockRootCoordService) GetImportState(ctx context.Context, req *milvuspb
panic("not implemented") // TODO: Implement
}

// Returns id array of all import tasks
func (m *mockRootCoordService) ListImportTasks(ctx context.Context, in *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
panic("not implemented") // TODO: Implement
}

// Report impot task state to rootcoord
func (m *mockRootCoordService) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
Expand Down
9 changes: 6 additions & 3 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/typeutil"
)

Expand Down Expand Up @@ -878,9 +879,11 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
)
return fmt.Errorf("syncSegmentID Failed: invalid shard number %d", shardNum)
}
log.Info("import task flush segment",
zap.Any("channel names", req.ImportTask.ChannelNames),
zap.Int("shard num", shardNum))

tr := timerecord.NewTimeRecorder("import callback function")
defer tr.Elapse("finished")

log.Info("import task flush segment", zap.Any("ChannelNames", req.ImportTask.ChannelNames), zap.Int("shardNum", shardNum))
segReqs := []*datapb.SegmentIDRequest{
{
ChannelName: req.ImportTask.ChannelNames[shardNum],
Expand Down
4 changes: 4 additions & 0 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,10 @@ func (s *Server) GetImportState(ctx context.Context, req *milvuspb.GetImportStat
return s.proxy.GetImportState(ctx, req)
}

func (s *Server) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
return s.proxy.ListImportTasks(ctx, req)
}

func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
return s.proxy.GetReplicas(ctx, req)
}
Expand Down
8 changes: 8 additions & 0 deletions internal/distributed/proxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (m *MockRootCoord) GetImportState(ctx context.Context, req *milvuspb.GetImp
return nil, nil
}

func (m *MockRootCoord) ListImportTasks(ctx context.Context, in *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
return nil, nil
}

func (m *MockRootCoord) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult) (*commonpb.Status, error) {
return nil, nil
}
Expand Down Expand Up @@ -736,6 +740,10 @@ func (m *MockProxy) GetImportState(ctx context.Context, req *milvuspb.GetImportS
return nil, nil
}

func (m *MockProxy) ListImportTasks(ctx context.Context, in *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
return nil, nil
}

func (m *MockProxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
return nil, nil
}
Expand Down
14 changes: 14 additions & 0 deletions internal/distributed/rootcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,20 @@ func (c *Client) GetImportState(ctx context.Context, req *milvuspb.GetImportStat
return ret.(*milvuspb.GetImportStateResponse), err
}

// List id array of all import tasks
func (c *Client) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(rootcoordpb.RootCoordClient).ListImportTasks(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.ListImportTasksResponse), err
}

// Report impot task state to rootcoord
func (c *Client) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult) (*commonpb.Status, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
Expand Down
3 changes: 3 additions & 0 deletions internal/distributed/rootcoord/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ func Test_NewClient(t *testing.T) {
r34Timeout, err := client.ListCredUsers(shortCtx, nil)
retCheck(r34Timeout, err)

r35Timeout, err := client.ListImportTasks(shortCtx, nil)
retCheck(r35Timeout, err)

// clean up
err = client.Stop()
assert.Nil(t, err)
Expand Down
5 changes: 5 additions & 0 deletions internal/distributed/rootcoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,11 @@ func (s *Server) GetImportState(ctx context.Context, in *milvuspb.GetImportState
return s.rootCoord.GetImportState(ctx, in)
}

// Returns id array of all import tasks
func (s *Server) ListImportTasks(ctx context.Context, in *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
return s.rootCoord.ListImportTasks(ctx, in)
}

// Report impot task state to datacoord
func (s *Server) ReportImport(ctx context.Context, in *rootcoordpb.ImportResult) (*commonpb.Status, error) {
return s.rootCoord.ReportImport(ctx, in)
Expand Down
9 changes: 9 additions & 0 deletions internal/proto/milvus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ service MilvusService {
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
rpc Import(ImportRequest) returns (ImportResponse) {}
rpc GetImportState(GetImportStateRequest) returns (GetImportStateResponse) {}
rpc ListImportTasks(ListImportTasksRequest) returns (ListImportTasksResponse) {}

// https://wiki.lfaidata.foundation/display/MIL/MEP+27+--+Support+Basic+Authentication
rpc CreateCredential(CreateCredentialRequest) returns (common.Status) {}
Expand Down Expand Up @@ -832,6 +833,14 @@ message GetImportStateResponse {
repeated common.KeyValuePair infos = 5; // more informations about the task, progress percent, file path, failed reason, etc.
}

message ListImportTasksRequest {
}

message ListImportTasksResponse {
common.Status status = 1;
repeated int64 tasks = 2; // id list of all import tasks
}

message GetReplicasRequest {
common.MsgBase base = 1;
int64 collectionID = 2;
Expand Down
Loading