/
leader.go
163 lines (151 loc) · 5.25 KB
/
leader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package rqcluster
import (
"context"
"encoding/json"
"fmt"
"github.com/go-logr/logr"
rqclusterv1alpha1 "github.com/jmccormick2001/rq/pkg/apis/rqcluster/v1alpha1"
"io/ioutil"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"net/http"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)
// AutoGenerated was generated from the json response of the status
// API call that rqlite publishes
type AutoGenerated struct {
Build struct {
Branch string `json:"branch"`
BuildTime string `json:"build_time"`
Commit string `json:"commit"`
Version string `json:"version"`
} `json:"build"`
HTTP struct {
Addr string `json:"addr"`
Auth string `json:"auth"`
Redirect string `json:"redirect"`
} `json:"http"`
Mux struct {
Addr string `json:"addr"`
Encrypted string `json:"encrypted"`
Timeout string `json:"timeout"`
} `json:"mux"`
Node struct {
StartTime time.Time `json:"start_time"`
Uptime string `json:"uptime"`
} `json:"node"`
Runtime struct {
GOARCH string `json:"GOARCH"`
GOMAXPROCS int `json:"GOMAXPROCS"`
GOOS string `json:"GOOS"`
NumCPU int `json:"numCPU"`
NumGoroutine int `json:"numGoroutine"`
Version string `json:"version"`
} `json:"runtime"`
Store struct {
Addr string `json:"addr"`
ApplyTimeout string `json:"apply_timeout"`
DbConf struct {
DSN string `json:"DSN"`
Memory bool `json:"Memory"`
} `json:"db_conf"`
Dir string `json:"dir"`
ElectionTimeout string `json:"election_timeout"`
HeartbeatTimeout string `json:"heartbeat_timeout"`
Leader string `json:"leader"`
Meta struct {
APIPeers struct {
One032144002 string `json:"10.32.1.4:4002"`
One032154002 string `json:"10.32.1.5:4002"`
One032184002 string `json:"10.32.1.8:4002"`
} `json:"APIPeers"`
} `json:"meta"`
OpenTimeout string `json:"open_timeout"`
Peers []string `json:"peers"`
Raft struct {
AppliedIndex string `json:"applied_index"`
CommitIndex string `json:"commit_index"`
FsmPending string `json:"fsm_pending"`
LastContact string `json:"last_contact"`
LastLogIndex string `json:"last_log_index"`
LastLogTerm string `json:"last_log_term"`
LastSnapshotIndex string `json:"last_snapshot_index"`
LastSnapshotTerm string `json:"last_snapshot_term"`
NumPeers string `json:"num_peers"`
State string `json:"state"`
Term string `json:"term"`
} `json:"raft"`
SnapshotThreshold int `json:"snapshot_threshold"`
Sqlite3 struct {
DNS string `json:"dns"`
FkConstraints string `json:"fk_constraints"`
Path string `json:"path"`
Version string `json:"version"`
} `json:"sqlite3"`
} `json:"store"`
}
// getLeaderPod returns the leader pod for a given namespace and instance
func getLeaderPod(reqLogger logr.Logger, r *ReconcileRqcluster, requestNamespace, instanceName string) (*corev1.Pod, error) {
//reqLogger := log.WithValues("Request.Namespace", requestNamespace, "Request.Name", instanceName)
reqLogger.Info("getLeaderPod called")
podList := &corev1.PodList{}
err := r.client.List(context.TODO(), podList, client.InNamespace(requestNamespace), client.MatchingLabels{"cluster": instanceName, "leader": "true"})
if err != nil {
log.Info("error in looking for leader pod: " + err.Error())
fmt.Println("error in looking for leader pod here")
return nil, err
}
if len(podList.Items) != 1 {
fmt.Println("unable to find a leader pod here")
log.Info("unable to find leader pod that match this request ")
return nil, nil
}
fmt.Println("found a leader pod here")
log.Info("found leader pod that matches this request")
return &podList.Items[0], nil
}
func labelNewLeader(reqLogger logr.Logger, r *ReconcileRqcluster, instance *rqclusterv1alpha1.Rqcluster) error {
//reqLogger := log.WithValues("Request.Namespace", instance.Namespace, "Request.Name", instance.Name)
for i := 0; i < len(instance.Status.Nodes); i++ {
podName := instance.Status.Nodes[i]
reqLogger.Info("labelNewLeader node " + podName)
url := fmt.Sprintf("http://%s:4001/status?pretty", podName)
resp, err := http.Get(url)
if err != nil {
// log the error but continue to next pod anyway
// this is because the dead leader pod might still
// have its service and won't respond
reqLogger.Info(err.Error())
continue
//return err
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
reqLogger.Info(string(body))
dat := AutoGenerated{}
if err := json.Unmarshal(body, &dat); err != nil {
return err
}
if dat.Store.Raft.State == "Leader" {
fmt.Printf("%s is the leader\n", podName)
// Update the pod leader label
p := &corev1.Pod{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: podName, Namespace: instance.Namespace}, p)
if err != nil && errors.IsNotFound(err) {
reqLogger.Info("leader Pod was not found which is an error", "Pod.Namespace", instance.Namespace, "Pod.Name", podName)
return err
}
p.ObjectMeta.Labels["leader"] = "true"
err = r.client.Update(context.TODO(), p)
if err != nil {
return err
}
fmt.Println("pod updated to leader")
}
}
return nil
}