Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
weiliu1031 committed Mar 5, 2024
1 parent e3cce11 commit f77f442
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 8 deletions.
9 changes: 9 additions & 0 deletions internal/querynodev2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/optimizers"
Expand Down Expand Up @@ -85,6 +86,7 @@ var _ types.QueryNodeComponent = (*QueryNode)(nil)
// `rootCoord` is a grpc client of root coordinator.
// `indexCoord` is a grpc client of index coordinator.
// `stateCode` is current statement of this query node, indicating whether it's healthy.

type QueryNode struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -130,6 +132,12 @@ type QueryNode struct {

// parameter turning hook
queryHook optimizers.QueryHook

injectionMode bool
// inject to request, before execute the logic in GetDataDistribution
preGetDataDistributionFunc func(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionRequest, error)
// inject to response, after execute the logic in GetDataDistribution
postGetDataDistributionFunc func(ctx context.Context, resp *querypb.GetDataDistributionResponse) (*querypb.GetDataDistributionResponse, error)
}

// NewQueryNode will return a QueryNode with abnormal state.
Expand All @@ -142,6 +150,7 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode {
lifetime: lifetime.NewLifetime(commonpb.StateCode_Abnormal),
}

node.injectionMode = true
node.tSafeManager = tsafe.NewTSafeReplica()
expr.Register("querynode", node)
return node
Expand Down
25 changes: 17 additions & 8 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,16 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
}

func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) {
resp := &querypb.GetDataDistributionResponse{}
var err error
req, err = node.preGetDataDistribution(ctx, req)
if err != nil {
return nil, err
}
defer func() {
resp, err = node.postGetDataDistribution(ctx, resp)
}()

log := log.Ctx(ctx).With(
zap.Int64("msgID", req.GetBase().GetMsgID()),
zap.Int64("nodeID", node.GetNodeID()),
Expand All @@ -1326,17 +1336,15 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
log.Warn("QueryNode.GetDataDistribution failed",
zap.Error(err))

return &querypb.GetDataDistributionResponse{
Status: merr.Status(err),
}, nil
resp.Status = merr.Status(err)
return resp, nil
}
defer node.lifetime.Done()

// check target matches
if err := merr.CheckTargetID(node.GetNodeID(), req.GetBase()); err != nil {
return &querypb.GetDataDistributionResponse{
Status: merr.Status(err),
}, nil
resp.Status = merr.Status(err)
return resp, nil
}

sealedSegments := node.manager.Segment.GetBy(segments.WithType(commonpb.SegmentState_Sealed))
Expand Down Expand Up @@ -1403,13 +1411,14 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
return true
})

return &querypb.GetDataDistributionResponse{
resp = &querypb.GetDataDistributionResponse{
Status: merr.Success(),
NodeID: node.GetNodeID(),
Segments: segmentVersionInfos,
Channels: channelVersionInfos,
LeaderViews: leaderViews,
}, nil
}
return resp, nil
}

func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
Expand Down
39 changes: 39 additions & 0 deletions internal/querynodev2/services_injection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package querynodev2

import (
"context"

"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
)

func (node *QueryNode) preGetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionRequest, error) {
if node.injectionMode && node.preGetDataDistributionFunc != nil {
log.Info("inject info to request")
return node.preGetDataDistributionFunc(ctx, req)
}
return req, nil
}

func (node *QueryNode) postGetDataDistribution(ctx context.Context, resp *querypb.GetDataDistributionResponse) (*querypb.GetDataDistributionResponse, error) {
if node.injectionMode && node.postGetDataDistributionFunc != nil {
log.Info("inject info to response")
return node.postGetDataDistributionFunc(ctx, resp)
}
return resp, nil
}

0 comments on commit f77f442

Please sign in to comment.