Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fixed connection anno #488

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/core/v1alpha1/store_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (
// StoreConnectionAnno expose the connection count of the store
StoreConnectionAnno = "matrixorigin.io/connections"

// StoreScoreAnno expose the score of the store
StoreScoreAnno = "matrixorigin.io/score"

// StoreCordonAnno cordons a CN store
StoreCordonAnno = "matrixorigin.io/store-cordon"
)
11 changes: 7 additions & 4 deletions pkg/controllers/cnstore/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (c *withCNSet) OnPreparingStop(ctx *recon.Context[*corev1.Pod]) error {
return recon.ErrReSync("wait for min delay", retryInterval)
}

storeConnection, err := common.GetStoreConnection(pod)
storeConnection, err := common.GetStoreScore(pod)
if err != nil {
return errors.Wrap(err, "error get store connection count")
}
Expand Down Expand Up @@ -372,19 +372,22 @@ func (c *withCNSet) syncStats(ctx *recon.Context[*corev1.Pod]) error {
}
}

sc := &common.StoreConnection{
sc := &common.StoreScore{
SessionCount: count,
PipelineCount: pipelineCount,
}
err = ctx.Patch(pod, func() error {
if err := common.SetStoreConnection(pod, sc); err != nil {
if err := common.SetStoreScore(pod, sc); err != nil {
return err
}
pod.Annotations[common.DeletionCostAnno] = strconv.Itoa(sc.GenDeletionCost())
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
pod.Labels[common.CNUUIDLabelKey] = uid
// NB: store-connections anno is no longer used in mo-operator, but must be kept for external compatibility
// ref: https://github.com/matrixorigin/MO-Cloud/issues/3007
pod.Annotations[v1alpha1.StoreConnectionAnno] = strconv.Itoa(sc.PipelineCount + sc.SessionCount)
return nil
})
if err != nil {
Expand Down Expand Up @@ -498,7 +501,7 @@ func (annotationChangedExcludeStats) Update(e event.UpdateEvent) bool {
newAnnos := e.ObjectNew.GetAnnotations()
for k, v := range newAnnos {
// exclude stats
if k == common.DeletionCostAnno || k == v1alpha1.StoreConnectionAnno {
if k == common.DeletionCostAnno || k == v1alpha1.StoreConnectionAnno || k == v1alpha1.StoreScoreAnno {
continue
}
// only consider newly added annotations or annotation value change, deletion of annotation key
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/cnstore/pooling.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *withCNSet) poolingCNReconcile(ctx *recon.Context[*corev1.Pod]) error {
if time.Since(parsed) < c.cn.Spec.ScalingConfig.GetMinDelayDuration() {
return recon.ErrReSync("wait min delay duration", retryInterval)
}
storeConnection, err := common.GetStoreConnection(pod)
storeConnection, err := common.GetStoreScore(pod)
if err != nil {
return errors.Wrap(err, "error get store connection count")
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/controllers/common/cnstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,26 +145,26 @@ func ToStoreLabels(labels []v1alpha1.CNLabel) map[string]metadata.LabelList {
return lm
}

type StoreConnection struct {
type StoreScore struct {
SessionCount int `json:"sessionCount"`
PipelineCount int `json:"pipelineCount"`
}

func (s *StoreConnection) GenDeletionCost() int {
func (s *StoreScore) GenDeletionCost() int {
return s.SessionCount
}

func (s *StoreConnection) IsSafeToReclaim() bool {
func (s *StoreScore) IsSafeToReclaim() bool {
return s.SessionCount == 0 && s.PipelineCount == 0
}

// GetStoreConnection get the store connection count from Pod anno
func GetStoreConnection(pod *corev1.Pod) (*StoreConnection, error) {
connectionStr, ok := pod.Annotations[v1alpha1.StoreConnectionAnno]
// GetStoreScore get the store connection count from Pod anno
func GetStoreScore(pod *corev1.Pod) (*StoreScore, error) {
connectionStr, ok := pod.Annotations[v1alpha1.StoreScoreAnno]
if !ok {
return nil, errors.Errorf("cannot find connection count for CN pod %s/%s, connection annotation is empty", pod.Namespace, pod.Name)
}
s := &StoreConnection{}
s := &StoreScore{}
if len(connectionStr) == 0 {
return s, nil
}
Expand All @@ -180,16 +180,16 @@ func GetStoreConnection(pod *corev1.Pod) (*StoreConnection, error) {
return s, nil
}

// SetStoreConnection set the store connection info to Pod anno
func SetStoreConnection(pod *corev1.Pod, s *StoreConnection) error {
// SetStoreScore set the store connection info to Pod anno
func SetStoreScore(pod *corev1.Pod, s *StoreScore) error {
b, err := json.Marshal(s)
if err != nil {
return errors.Wrap(err, "error marshal connection info")
}
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[v1alpha1.StoreConnectionAnno] = string(b)
pod.Annotations[v1alpha1.StoreScoreAnno] = string(b)
return nil
}

Expand Down
Loading