forked from rancher/rancher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstatsaggregator.go
130 lines (111 loc) · 3.84 KB
/
statsaggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package clusterstats
import (
"reflect"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/rancher/types/config"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
)
type StatsAggregator struct {
NodesLister v3.NodeLister
Clusters v3.ClusterInterface
}
type ClusterNodeData struct {
Capacity v1.ResourceList
Allocatable v1.ResourceList
Requested v1.ResourceList
Limits v1.ResourceList
ConditionNoDiskPressureStatus v1.ConditionStatus
ConditionNoMemoryPressureStatus v1.ConditionStatus
}
func Register(management *config.ManagementContext) {
clustersClient := management.Management.Clusters("")
machinesClient := management.Management.Nodes("")
s := &StatsAggregator{
NodesLister: machinesClient.Controller().Lister(),
Clusters: clustersClient,
}
clustersClient.AddHandler("cluster-stats", s.sync)
machinesClient.AddHandler("cluster-stats", s.machineChanged)
}
func (s *StatsAggregator) sync(key string, cluster *v3.Cluster) error {
if cluster == nil {
return nil
}
return s.aggregate(cluster, cluster.Name)
}
func (s *StatsAggregator) aggregate(cluster *v3.Cluster, clusterName string) error {
machines, err := s.NodesLister.List(cluster.Name, labels.Everything())
if err != nil {
return err
}
origStatus := cluster.Status.DeepCopy()
// capacity keys
pods, mem, cpu := resource.Quantity{}, resource.Quantity{}, resource.Quantity{}
// allocatable keys
apods, amem, acpu := resource.Quantity{}, resource.Quantity{}, resource.Quantity{}
// requested keys
rpods, rmem, rcpu := resource.Quantity{}, resource.Quantity{}, resource.Quantity{}
// limited keys
lpods, lmem, lcpu := resource.Quantity{}, resource.Quantity{}, resource.Quantity{}
condDisk := v1.ConditionTrue
condMem := v1.ConditionTrue
for _, machine := range machines {
capacity := machine.Status.InternalNodeStatus.Capacity
if capacity != nil {
pods.Add(*capacity.Pods())
mem.Add(*capacity.Memory())
cpu.Add(*capacity.Cpu())
}
allocatable := machine.Status.InternalNodeStatus.Allocatable
if allocatable != nil {
apods.Add(*allocatable.Pods())
amem.Add(*allocatable.Memory())
acpu.Add(*allocatable.Cpu())
}
requested := machine.Status.Requested
if requested != nil {
rpods.Add(*requested.Pods())
rmem.Add(*requested.Memory())
rcpu.Add(*requested.Cpu())
}
limits := machine.Status.Limits
if limits != nil {
lpods.Add(*limits.Pods())
lmem.Add(*limits.Memory())
lcpu.Add(*limits.Cpu())
}
if condDisk == v1.ConditionTrue && v3.ClusterConditionNoDiskPressure.IsTrue(machine) {
condDisk = v1.ConditionFalse
}
if condMem == v1.ConditionTrue && v3.ClusterConditionNoMemoryPressure.IsTrue(machine) {
condMem = v1.ConditionFalse
}
}
cluster.Status.Capacity = v1.ResourceList{v1.ResourcePods: pods, v1.ResourceMemory: mem, v1.ResourceCPU: cpu}
cluster.Status.Allocatable = v1.ResourceList{v1.ResourcePods: apods, v1.ResourceMemory: amem, v1.ResourceCPU: acpu}
cluster.Status.Requested = v1.ResourceList{v1.ResourcePods: rpods, v1.ResourceMemory: rmem, v1.ResourceCPU: rcpu}
cluster.Status.Limits = v1.ResourceList{v1.ResourcePods: lpods, v1.ResourceMemory: lmem, v1.ResourceCPU: lcpu}
if condDisk == v1.ConditionTrue {
v3.ClusterConditionNoDiskPressure.True(cluster)
} else {
v3.ClusterConditionNoDiskPressure.False(cluster)
}
if condMem == v1.ConditionTrue {
v3.ClusterConditionNoMemoryPressure.True(cluster)
} else {
v3.ClusterConditionNoMemoryPressure.False(cluster)
}
if !reflect.DeepEqual(origStatus, cluster.Status) {
_, err := s.Clusters.Update(cluster)
return err
}
return nil
}
func (s *StatsAggregator) machineChanged(key string, machine *v3.Node) error {
if machine != nil {
s.Clusters.Controller().Enqueue("", machine.Namespace)
}
return nil
}