-
Notifications
You must be signed in to change notification settings - Fork 0
/
cluster_alias_dao.go
226 lines (211 loc) · 6.17 KB
/
cluster_alias_dao.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
/*
Copyright 2015 Shlomi Noach, courtesy Booking.com
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package inst
import (
"fmt"
"github.com/openark/golib/log"
"github.com/openark/golib/sqlutils"
"github.com/openark/orchestrator/go/db"
)
// ReadClusterNameByAlias
func ReadClusterNameByAlias(alias string) (clusterName string, err error) {
query := `
select
cluster_name
from
cluster_alias
where
alias = ?
or cluster_name = ?
`
err = db.QueryOrchestrator(query, sqlutils.Args(alias, alias), func(m sqlutils.RowMap) error {
clusterName = m.GetString("cluster_name")
return nil
})
if err != nil {
return "", err
}
if clusterName == "" {
err = fmt.Errorf("No cluster found for alias %s", alias)
}
return clusterName, err
}
// DeduceClusterName attempts to resolve a cluster name given a name or alias.
// If unsuccessful to match by alias, the function returns the same given string
func DeduceClusterName(nameOrAlias string) (clusterName string, err error) {
if nameOrAlias == "" {
return "", fmt.Errorf("empty cluster name")
}
if name, err := ReadClusterNameByAlias(nameOrAlias); err == nil {
return name, nil
}
return nameOrAlias, nil
}
// ReadAliasByClusterName returns the cluster alias for the given cluster name,
// or the cluster name itself if not explicit alias found
func ReadAliasByClusterName(clusterName string) (alias string, err error) {
alias = clusterName // default return value
query := `
select
alias
from
cluster_alias
where
cluster_name = ?
`
err = db.QueryOrchestrator(query, sqlutils.Args(clusterName), func(m sqlutils.RowMap) error {
alias = m.GetString("alias")
return nil
})
return clusterName, err
}
// WriteClusterAlias will write (and override) a single cluster name mapping
func writeClusterAlias(clusterName string, alias string) error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
replace into
cluster_alias (cluster_name, alias, last_registered)
values
(?, ?, now())
`,
clusterName, alias)
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}
// writeClusterAliasManualOverride will write (and override) a single cluster name mapping
func writeClusterAliasManualOverride(clusterName string, alias string) error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
replace into
cluster_alias_override (cluster_name, alias)
values
(?, ?)
`,
clusterName, alias)
return log.Errore(err)
}
return ExecDBWriteFunc(writeFunc)
}
// UpdateClusterAliases writes down the cluster_alias table based on information
// gained from database_instance
func UpdateClusterAliases() error {
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
replace into
cluster_alias (alias, cluster_name, last_registered)
select
suggested_cluster_alias,
cluster_name,
now()
from
database_instance
left join database_instance_downtime using (hostname, port)
where
suggested_cluster_alias!=''
/* exclude newly demoted, downtimed masters */
and ifnull(
database_instance_downtime.downtime_active = 1
and database_instance_downtime.end_timestamp > now()
and database_instance_downtime.reason = ?
, 0) = 0
order by
ifnull(last_checked <= last_seen, 0) asc,
read_only desc,
num_slave_hosts asc
`, DowntimeLostInRecoveryMessage)
return log.Errore(err)
}
if err := ExecDBWriteFunc(writeFunc); err != nil {
return err
}
writeFunc = func() error {
// Handling the case where no cluster alias exists: we write a dummy alias in the form of the real cluster name.
_, err := db.ExecOrchestrator(`
replace into
cluster_alias (alias, cluster_name, last_registered)
select
cluster_name as alias, cluster_name, now()
from
database_instance
group by
cluster_name
having
sum(suggested_cluster_alias = '') = count(*)
`)
return log.Errore(err)
}
if err := ExecDBWriteFunc(writeFunc); err != nil {
return err
}
return nil
}
// ReplaceAliasClusterName replaces alis mapping of one cluster name onto a new cluster name.
// Used in topology failover/recovery
func ReplaceAliasClusterName(oldClusterName string, newClusterName string) (err error) {
{
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
update cluster_alias
set cluster_name = ?
where cluster_name = ?
`,
newClusterName, oldClusterName)
return log.Errore(err)
}
err = ExecDBWriteFunc(writeFunc)
}
{
writeFunc := func() error {
_, err := db.ExecOrchestrator(`
update cluster_alias_override
set cluster_name = ?
where cluster_name = ?
`,
newClusterName, oldClusterName)
return log.Errore(err)
}
if ferr := ExecDBWriteFunc(writeFunc); ferr != nil {
err = ferr
}
}
return err
}
// ReadUnambiguousSuggestedClusterAliases reads potential master hostname:port who have suggested cluster aliases,
// where no one else shares said suggested cluster alias. Such hostname:port are likely true owners
// of the alias.
func ReadUnambiguousSuggestedClusterAliases() (result map[string]InstanceKey, err error) {
result = map[string]InstanceKey{}
query := `
select
suggested_cluster_alias,
min(hostname) as hostname,
min(port) as port
from
database_instance
where
suggested_cluster_alias != ''
and replication_depth=0
group by
suggested_cluster_alias
having
count(*) = 1
`
err = db.QueryOrchestrator(query, sqlutils.Args(), func(m sqlutils.RowMap) error {
key := InstanceKey{Hostname: m.GetString("hostname"), Port: m.GetInt("port")}
suggestedAlias := m.GetString("suggested_cluster_alias")
result[suggestedAlias] = key
return nil
})
return result, err
}