/
drift_detection.go
48 lines (40 loc) · 1.78 KB
/
drift_detection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package fullnode
import (
"context"
"time"
"github.com/samber/lo"
cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
"github.com/strangelove-ventures/cosmos-operator/internal/cosmos"
"github.com/strangelove-ventures/cosmos-operator/internal/kube"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// DriftDetection detects pods that are lagging behind the latest block height.
type DriftDetection struct {
available func(pods []*corev1.Pod, minReady time.Duration, now time.Time) []*corev1.Pod
collector StatusCollector
computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int
}
func NewDriftDetection(collector StatusCollector) DriftDetection {
return DriftDetection{
available: kube.AvailablePods,
collector: collector,
computeRollout: kube.ComputeRollout,
}
}
// LaggingPods returns pods that are lagging behind the latest block height.
func (d DriftDetection) LaggingPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod {
synced := d.collector.Collect(ctx, client.ObjectKeyFromObject(crd)).Synced()
maxHeight := lo.MaxBy(synced, func(a cosmos.StatusItem, b cosmos.StatusItem) bool {
return a.Status.LatestBlockHeight() > b.Status.LatestBlockHeight()
}).Status.LatestBlockHeight()
thresh := uint64(crd.Spec.SelfHeal.HeightDriftMitigation.Threshold)
lagging := lo.FilterMap(synced, func(item cosmos.StatusItem, _ int) (*corev1.Pod, bool) {
isLagging := maxHeight-item.Status.LatestBlockHeight() >= thresh
return item.GetPod(), isLagging
})
avail := d.available(synced.Pods(), 5*time.Second, time.Now())
rollout := d.computeRollout(crd.Spec.RolloutStrategy.MaxUnavailable, int(crd.Spec.Replicas), len(avail))
return lo.Slice(lagging, 0, rollout)
}