Skip to content

Commit

Permalink
ufate
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed May 22, 2024
1 parent 9d089fd commit 3319390
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 46 deletions.
6 changes: 3 additions & 3 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
segments := h.s.meta.SelectSegments(SegmentFilterFunc(func(s *SegmentInfo) bool {
return s.InsertChannel == channel.GetName() && !s.GetIsFake() && s.PartitionID == partitionID
}))
currentPlanID := h.s.meta.partitionStatsMeta.GetCurrentPlanID(channel.GetCollectionID(), partitionID, channel.GetName())
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName())

segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
Expand All @@ -151,7 +151,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Skip bulk insert segments.
continue
}
if s.GetLevel() == datapb.SegmentLevel_L2 && s.PartitionStatsVersion > currentPlanID {
if s.GetLevel() == datapb.SegmentLevel_L2 && s.PartitionStatsVersion > currentPartitionStatsVersion {
// skip major compaction not fully completed.
continue
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// unindexed is flushed segments as well
indexedIDs.Insert(unIndexedIDs.Collect()...)

partStatsVersionsMap[partitionID] = currentPlanID
partStatsVersionsMap[partitionID] = currentPartitionStatsVersion
}
return &datapb.VchannelInfo{
CollectionID: channel.GetCollectionID(),
Expand Down
32 changes: 16 additions & 16 deletions internal/datacoord/partition_stats_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type partitionStatsMeta struct {
}

type partitionStatsInfo struct {
currentPlanID int64
infos map[int64]*datapb.PartitionStatsInfo
currentVersion int64
infos map[int64]*datapb.PartitionStatsInfo
}

func newPartitionStatsMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*partitionStatsMeta, error) {
Expand Down Expand Up @@ -52,13 +52,13 @@ func (psm *partitionStatsMeta) reloadFromKV() error {
psm.partitionStatsInfos[info.GetVChannel()] = make(map[int64]*partitionStatsInfo)
}
if _, ok := psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()]; !ok {
currentPlanID, err := psm.catalog.GetPartitionStatsCurrentPlanID(psm.ctx, info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel())
currentPartitionStatsVersion, err := psm.catalog.GetCurrentPartitionStatsVersion(psm.ctx, info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel())
if err != nil {
return err
}
psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()] = &partitionStatsInfo{
currentPlanID: currentPlanID,
infos: make(map[int64]*datapb.PartitionStatsInfo),
currentVersion: currentPartitionStatsVersion,
infos: make(map[int64]*datapb.PartitionStatsInfo),
}
}
psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos[info.GetVersion()] = info
Expand Down Expand Up @@ -151,32 +151,32 @@ func (psm *partitionStatsMeta) DropPartitionStatsInfo(info *datapb.PartitionStat
return nil
}

func (psm *partitionStatsMeta) SaveCurrentPlanID(collectionID, partitionID int64, vChannel string, currentPlanID int64) error {
func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error {
psm.Lock()
defer psm.Unlock()

log.Info("update current planID", zap.Int64("collectionID", collectionID),
log.Info("update current partition stats version", zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.String("vChannel", vChannel), zap.Int64("currentPlanID", currentPlanID))
zap.String("vChannel", vChannel), zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion))

if _, ok := psm.partitionStatsInfos[vChannel]; !ok {
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPlanID",
fmt.Errorf("update current planID failed, there is no partition info exists with collID: %d, partID: %d, vChannel: %s", collectionID, partitionID, vChannel))
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion",
fmt.Errorf("update current partition stats version failed, there is no partition info exists with collID: %d, partID: %d, vChannel: %s", collectionID, partitionID, vChannel))
}
if _, ok := psm.partitionStatsInfos[vChannel][partitionID]; !ok {
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPlanID",
fmt.Errorf("update current planID failed, there is no partition info exists with collID: %d, partID: %d, vChannel: %s", collectionID, partitionID, vChannel))
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion",
fmt.Errorf("update current partition stats version failed, there is no partition info exists with collID: %d, partID: %d, vChannel: %s", collectionID, partitionID, vChannel))
}

if err := psm.catalog.SavePartitionStatsCurrentPlanID(psm.ctx, collectionID, partitionID, vChannel, currentPlanID); err != nil {
if err := psm.catalog.SaveCurrentPartitionStatsVersion(psm.ctx, collectionID, partitionID, vChannel, currentPartitionStatsVersion); err != nil {
return err
}

psm.partitionStatsInfos[vChannel][partitionID].currentPlanID = currentPlanID
psm.partitionStatsInfos[vChannel][partitionID].currentVersion = currentPartitionStatsVersion
return nil
}

func (psm *partitionStatsMeta) GetCurrentPlanID(collectionID, partitionID int64, vChannel string) int64 {
func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 {
psm.RLock()
defer psm.RUnlock()

Expand All @@ -186,5 +186,5 @@ func (psm *partitionStatsMeta) GetCurrentPlanID(collectionID, partitionID int64,
if _, ok := psm.partitionStatsInfos[vChannel][partitionID]; !ok {
return 0
}
return psm.partitionStatsInfos[vChannel][partitionID].currentPlanID
return psm.partitionStatsInfos[vChannel][partitionID].currentVersion
}
2 changes: 1 addition & 1 deletion internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,7 @@ func TestGetQueryVChanPositions_PartitionStats(t *testing.T) {
svr.meta.partitionStatsMeta.partitionStatsInfos = map[string]map[int64]*partitionStatsInfo{
vchannel: {
partitionID: {
currentPlanID: version,
currentVersion: version,
infos: map[int64]*datapb.PartitionStatsInfo{
version: {Version: version},
},
Expand Down
6 changes: 3 additions & 3 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ type DataCoordCatalog interface {
SavePartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error
DropPartitionStatsInfo(ctx context.Context, info *datapb.PartitionStatsInfo) error

SavePartitionStatsCurrentPlanID(ctx context.Context, collID, partID int64, vChannel string, currentVersion int64) error
GetPartitionStatsCurrentPlanID(ctx context.Context, collID, partID int64, vChannel string) (int64, error)
DropPartitionStatsCurrentPlanID(ctx context.Context, collID, partID int64, vChannel string) error
SaveCurrentPartitionStatsVersion(ctx context.Context, collID, partID int64, vChannel string, currentVersion int64) error
GetCurrentPartitionStatsVersion(ctx context.Context, collID, partID int64, vChannel string) (int64, error)
DropCurrentPartitionStatsVersion(ctx context.Context, collID, partID int64, vChannel string) error
}

type QueryCoordCatalog interface {
Expand Down
12 changes: 6 additions & 6 deletions internal/metastore/kv/datacoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,14 +934,14 @@ func (kc *Catalog) DropPartitionStatsInfo(ctx context.Context, info *datapb.Part
return kc.MetaKv.Remove(key)
}

func (kc *Catalog) SavePartitionStatsCurrentPlanID(ctx context.Context, collID, partID int64, vChannel string, currentVersion int64) error {
key := buildPartitionStatsCurrentVersionPath(collID, partID, vChannel)
func (kc *Catalog) SaveCurrentPartitionStatsVersion(ctx context.Context, collID, partID int64, vChannel string, currentVersion int64) error {
key := buildCurrentPartitionStatsVersionPath(collID, partID, vChannel)
value := strconv.FormatInt(currentVersion, 10)
return kc.MetaKv.Save(key, value)
}

func (kc *Catalog) GetPartitionStatsCurrentPlanID(ctx context.Context, collID, partID int64, vChannel string) (int64, error) {
key := buildPartitionStatsCurrentVersionPath(collID, partID, vChannel)
func (kc *Catalog) GetCurrentPartitionStatsVersion(ctx context.Context, collID, partID int64, vChannel string) (int64, error) {
key := buildCurrentPartitionStatsVersionPath(collID, partID, vChannel)
valueStr, err := kc.MetaKv.Load(key)
if err != nil {
return 0, err
Expand All @@ -950,7 +950,7 @@ func (kc *Catalog) GetPartitionStatsCurrentPlanID(ctx context.Context, collID, p
return strconv.ParseInt(valueStr, 10, 64)
}

func (kc *Catalog) DropPartitionStatsCurrentPlanID(ctx context.Context, collID, partID int64, vChannel string) error {
key := buildPartitionStatsCurrentVersionPath(collID, partID, vChannel)
func (kc *Catalog) DropCurrentPartitionStatsVersion(ctx context.Context, collID, partID int64, vChannel string) error {
key := buildCurrentPartitionStatsVersionPath(collID, partID, vChannel)
return kc.MetaKv.Remove(key)
}
2 changes: 1 addition & 1 deletion internal/metastore/kv/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func buildPartitionStatsInfoPath(info *datapb.PartitionStatsInfo) string {
return fmt.Sprintf("%s/%d/%d/%s/%d", PartitionStatsInfoPrefix, info.CollectionID, info.PartitionID, info.VChannel, info.Version)
}

func buildPartitionStatsCurrentVersionPath(collID, partID int64, channel string) string {
func buildCurrentPartitionStatsVersionPath(collID, partID int64, channel string) string {
return fmt.Sprintf("%s/%d/%d/%s", PartitionStatsCurrentPlanIDPrefix, collID, partID, channel)
}

Expand Down
32 changes: 16 additions & 16 deletions internal/metastore/mocks/mock_datacoord_catalog.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.3 h1:KUSaWVePVlHMIluAXf2qmNffI1CMlGFLLiP+4iy9014=
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.3/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
Expand Down

0 comments on commit 3319390

Please sign in to comment.