This repository has been archived by the owner on Nov 9, 2022. It is now read-only.
This repository is currently being migrated. It's locked while the migration is in progress.
/
update_status.go
191 lines (161 loc) · 5.7 KB
/
update_status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package storageos
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"reflect"
"strings"
"time"
storageosv1 "github.com/storageos/cluster-operator/pkg/apis/storageos/v1"
storageosapi "github.com/storageos/go-api"
"github.com/storageos/go-api/types"
v1 "k8s.io/api/core/v1"
)
var (
// nodeHealthTimeout specifies how long we should wait for the api to
// return health results.
nodeHealthTimeout = time.Second
// nodeLivenessTimeout specifies how long we should wait for a connection to
// the node's api port.
nodeLivenessTimeout = time.Second
)
func (s *Deployment) updateStorageOSStatus(status *storageosv1.StorageOSClusterStatus) error {
if reflect.DeepEqual(s.stos.Status, *status) {
return nil
}
// When there's a difference in node ready count, broadcast the health change event.
if s.stos.Status.Ready != status.Ready {
// Ready contains the node count in the format 3/3.
ready := strings.Split(status.Ready, "/")
// If the ready/total counts are equal and not zero, the cluster is
// healthy. Else, not ready. 0/0 is an unready cluster.
if ready[0] == ready[1] && ready[0] != "0" {
if s.recorder != nil {
s.recorder.Event(s.stos, v1.EventTypeNormal, "ChangedStatus", fmt.Sprintf("%s StorageOS nodes are functional. Cluster healthy", status.Ready))
}
} else {
if s.recorder != nil {
s.recorder.Event(s.stos, v1.EventTypeWarning, "ChangedStatus", fmt.Sprintf("%s StorageOS nodes are functional", status.Ready))
}
}
}
// Update subresource status.
s.stos.Status = *status
return s.client.Status().Update(context.Background(), s.stos)
}
func (s *Deployment) getStorageOSStatus() (*storageosv1.StorageOSClusterStatus, error) {
// Create an empty array because it's used to create cluster status. An
// uninitialized array results in error at cluster status validation.
// error: status.nodes in body must be of type array: "null"
nodeIPs := []string{}
// Everything is empty if join token is empty.
if len(s.stos.Spec.Join) > 0 {
nodeIPs = strings.Split(s.stos.Spec.Join, ",")
}
if s.nodev2 {
return s.getStorageOSV2Status(nodeIPs)
}
return s.getStorageOSV1Status(nodeIPs)
}
// getStorageOSV2Status queries health of all the nodes in the cluster and
// returns the cluster status.
//
// NodeHealthStatus is deprecated and not set for V2.
func (s *Deployment) getStorageOSV2Status(nodeIPs []string) (*storageosv1.StorageOSClusterStatus, error) {
var readyNodes int
totalNodes := len(nodeIPs)
memberStatus := new(storageosv1.MembersStatus)
for _, ip := range nodeIPs {
if isListening(ip, storageosapi.DefaultPort, nodeLivenessTimeout) {
readyNodes++
memberStatus.Ready = append(memberStatus.Ready, ip)
} else {
memberStatus.Unready = append(memberStatus.Unready, ip)
}
}
phase := storageosv1.ClusterPhaseCreating
if readyNodes == totalNodes {
phase = storageosv1.ClusterPhaseRunning
}
return &storageosv1.StorageOSClusterStatus{
Phase: phase,
Nodes: nodeIPs,
NodeHealthStatus: make(map[string]storageosv1.NodeHealth),
Ready: fmt.Sprintf("%d/%d", readyNodes, totalNodes),
Members: *memberStatus,
}, nil
}
// getStorageOSV1Status queries health of all the nodes in the join token and
// returns the cluster status.
func (s *Deployment) getStorageOSV1Status(nodeIPs []string) (*storageosv1.StorageOSClusterStatus, error) {
var readyNodes int
totalNodes := len(nodeIPs)
healthStatus := make(map[string]storageosv1.NodeHealth)
memberStatus := new(storageosv1.MembersStatus)
for _, node := range nodeIPs {
if status, err := getNodeHealth(node, nodeHealthTimeout); err == nil {
healthStatus[node] = *status
if isHealthy(status) {
readyNodes++
memberStatus.Ready = append(memberStatus.Ready, node)
} else {
memberStatus.Unready = append(memberStatus.Unready, node)
}
}
}
phase := storageosv1.ClusterPhaseCreating
if readyNodes == totalNodes {
phase = storageosv1.ClusterPhaseRunning
}
return &storageosv1.StorageOSClusterStatus{
Phase: phase,
Nodes: nodeIPs,
NodeHealthStatus: healthStatus,
Ready: fmt.Sprintf("%d/%d", readyNodes, totalNodes),
Members: *memberStatus,
}, nil
}
func isHealthy(health *storageosv1.NodeHealth) bool {
return health.DirectfsInitiator+health.Director+health.KV+health.KVWrite+
health.Nats+health.Presentation+health.Rdb == strings.Repeat("alive", 7)
}
func isListening(host string, port string, timeout time.Duration) bool {
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), timeout)
if err != nil {
return false
}
if conn != nil {
defer conn.Close()
}
return true
}
func getNodeHealth(address string, timeout time.Duration) (*storageosv1.NodeHealth, error) {
healthEndpointFormat := "http://%s:%s/v1/" + storageosapi.HealthAPIPrefix
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
client := &http.Client{}
var healthStatus types.HealthStatus
cpURL := fmt.Sprintf(healthEndpointFormat, address, storageosapi.DefaultPort)
cpReq, err := http.NewRequest("GET", cpURL, nil)
if err != nil {
return nil, err
}
cpResp, err := client.Do(cpReq.WithContext(ctx))
if err != nil {
return nil, err
}
if err := json.NewDecoder(cpResp.Body).Decode(&healthStatus); err != nil {
return nil, err
}
return &storageosv1.NodeHealth{
DirectfsInitiator: healthStatus.Submodules.DirectFSClient.Status,
Director: healthStatus.Submodules.Director.Status,
KV: healthStatus.Submodules.KV.Status,
KVWrite: healthStatus.Submodules.KVWrite.Status,
Nats: healthStatus.Submodules.NATS.Status,
Presentation: healthStatus.Submodules.FS.Status,
Rdb: healthStatus.Submodules.FSDriver.Status,
}, nil
}