/
config.go
255 lines (227 loc) · 12.1 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
/*
Copyright 2014 Outbrain Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"encoding/json"
"fmt"
"os"
"time"
"github.com/spf13/pflag"
"vitess.io/vitess/go/vt/log"
)
const (
LostInRecoveryDowntimeSeconds int = 60 * 60 * 24 * 365
)
var configurationLoaded = make(chan bool)
const (
HealthPollSeconds = 1
ActiveNodeExpireSeconds = 5
MaintenanceOwner = "vtorc"
AuditPageSize = 20
MaintenancePurgeDays = 7
MaintenanceExpireMinutes = 10
DebugMetricsIntervalSeconds = 10
StaleInstanceCoordinatesExpireSeconds = 60
DiscoveryMaxConcurrency = 300 // Number of goroutines doing hosts discovery
DiscoveryQueueCapacity = 100000
DiscoveryQueueMaxStatisticsSize = 120
DiscoveryCollectionRetentionSeconds = 120
HostnameResolveMethod = "default"
UnseenInstanceForgetHours = 240 // Number of hours after which an unseen instance is forgotten
ExpiryHostnameResolvesMinutes = 60 // Number of minutes after which to expire hostname-resolves
CandidateInstanceExpireMinutes = 60 // Minutes after which a suggestion to use an instance as a candidate replica (to be preferably promoted on primary failover) is expired.
FailureDetectionPeriodBlockMinutes = 60 // The time for which an instance's failure discovery is kept "active", so as to avoid concurrent "discoveries" of the instance's failure; this preceeds any recovery process, if any.
)
var (
sqliteDataFile = "file::memory:?mode=memory&cache=shared"
instancePollTime = 5 * time.Second
snapshotTopologyInterval = 0 * time.Hour
reasonableReplicationLag = 10 * time.Second
auditFileLocation = ""
auditToBackend = false
auditToSyslog = false
auditPurgeDuration = 7 * 24 * time.Hour // Equivalent of 7 days
recoveryPeriodBlockDuration = 30 * time.Second
preventCrossCellFailover = false
lockShardTimeout = 30 * time.Second
waitReplicasTimeout = 30 * time.Second
topoInformationRefreshDuration = 15 * time.Second
recoveryPollDuration = 1 * time.Second
)
// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.StringVar(&sqliteDataFile, "sqlite-data-file", sqliteDataFile, "SQLite Datafile to use as VTOrc's database")
fs.DurationVar(&instancePollTime, "instance-poll-time", instancePollTime, "Timer duration on which VTOrc refreshes MySQL information")
fs.DurationVar(&snapshotTopologyInterval, "snapshot-topology-interval", snapshotTopologyInterval, "Timer duration on which VTOrc takes a snapshot of the current MySQL information it has in the database. Should be in multiple of hours")
fs.DurationVar(&reasonableReplicationLag, "reasonable-replication-lag", reasonableReplicationLag, "Maximum replication lag on replicas which is deemed to be acceptable")
fs.StringVar(&auditFileLocation, "audit-file-location", auditFileLocation, "File location where the audit logs are to be stored")
fs.BoolVar(&auditToBackend, "audit-to-backend", auditToBackend, "Whether to store the audit log in the VTOrc database")
fs.BoolVar(&auditToSyslog, "audit-to-syslog", auditToSyslog, "Whether to store the audit log in the syslog")
fs.DurationVar(&auditPurgeDuration, "audit-purge-duration", auditPurgeDuration, "Duration for which audit logs are held before being purged. Should be in multiples of days")
fs.DurationVar(&recoveryPeriodBlockDuration, "recovery-period-block-duration", recoveryPeriodBlockDuration, "Duration for which a new recovery is blocked on an instance after running a recovery")
fs.BoolVar(&preventCrossCellFailover, "prevent-cross-cell-failover", preventCrossCellFailover, "Prevent VTOrc from promoting a primary in a different cell than the current primary in case of a failover")
fs.DurationVar(&lockShardTimeout, "lock-shard-timeout", lockShardTimeout, "Duration for which a shard lock is held when running a recovery")
fs.DurationVar(&waitReplicasTimeout, "wait-replicas-timeout", waitReplicasTimeout, "Duration for which to wait for replica's to respond when issuing RPCs")
fs.DurationVar(&topoInformationRefreshDuration, "topo-information-refresh-duration", topoInformationRefreshDuration, "Timer duration on which VTOrc refreshes the keyspace and vttablet records from the topology server")
fs.DurationVar(&recoveryPollDuration, "recovery-poll-duration", recoveryPollDuration, "Timer duration on which VTOrc polls its database to run a recovery")
}
// Configuration makes for vtorc configuration input, which can be provided by user via JSON formatted file.
// Some of the parameteres have reasonable default values, and some (like database credentials) are
// strictly expected from user.
// TODO(sougou): change this to yaml parsing, and possible merge with tabletenv.
type Configuration struct {
SQLite3DataFile string // full path to sqlite3 datafile
InstancePollSeconds uint // Number of seconds between instance reads
SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled)
ReasonableReplicationLagSeconds int // Above this value is considered a problem
AuditLogFile string // Name of log file for audit operations. Disabled when empty.
AuditToSyslog bool // If true, audit messages are written to syslog
AuditToBackendDB bool // If true, audit messages are written to the backend DB's `audit` table (default: true)
AuditPurgeDays uint // Days after which audit entries are purged from the database
RecoveryPeriodBlockSeconds int // (overrides `RecoveryPeriodBlockMinutes`) The time for which an instance's recovery is kept "active", so as to avoid concurrent recoveries on smae instance as well as flapping
PreventCrossDataCenterPrimaryFailover bool // When true (default: false), cross-DC primary failover are not allowed, vtorc will do all it can to only fail over within same DC, or else not fail over at all.
LockShardTimeoutSeconds int // Timeout on context used to lock shard. Should be a small value because we should fail-fast
WaitReplicasTimeoutSeconds int // Timeout on amount of time to wait for the replicas in case of ERS. Should be a small value because we should fail-fast. Should not be larger than LockShardTimeoutSeconds since that is the total time we use for an ERS.
TopoInformationRefreshSeconds int // Timer duration on which VTOrc refreshes the keyspace and vttablet records from the topo-server.
RecoveryPollSeconds int // Timer duration on which VTOrc recovery analysis runs
}
// ToJSONString will marshal this configuration as JSON
func (config *Configuration) ToJSONString() string {
b, _ := json.Marshal(config)
return string(b)
}
// Config is *the* configuration instance, used globally to get configuration data
var Config = newConfiguration()
var readFileNames []string
// UpdateConfigValuesFromFlags is used to update the config values from the flags defined.
// This is done before we read any configuration files from the user. So the config files take precedence.
func UpdateConfigValuesFromFlags() {
Config.SQLite3DataFile = sqliteDataFile
Config.InstancePollSeconds = uint(instancePollTime / time.Second)
Config.InstancePollSeconds = uint(instancePollTime / time.Second)
Config.SnapshotTopologiesIntervalHours = uint(snapshotTopologyInterval / time.Hour)
Config.ReasonableReplicationLagSeconds = int(reasonableReplicationLag / time.Second)
Config.AuditLogFile = auditFileLocation
Config.AuditToBackendDB = auditToBackend
Config.AuditToSyslog = auditToSyslog
Config.AuditPurgeDays = uint(auditPurgeDuration / (time.Hour * 24))
Config.RecoveryPeriodBlockSeconds = int(recoveryPeriodBlockDuration / time.Second)
Config.PreventCrossDataCenterPrimaryFailover = preventCrossCellFailover
Config.LockShardTimeoutSeconds = int(lockShardTimeout / time.Second)
Config.WaitReplicasTimeoutSeconds = int(waitReplicasTimeout / time.Second)
Config.TopoInformationRefreshSeconds = int(topoInformationRefreshDuration / time.Second)
Config.RecoveryPollSeconds = int(recoveryPollDuration / time.Second)
}
// LogConfigValues is used to log the config values.
func LogConfigValues() {
b, _ := json.MarshalIndent(Config, "", "\t")
log.Infof("Running with Configuration - %v", string(b))
}
func newConfiguration() *Configuration {
return &Configuration{
SQLite3DataFile: "file::memory:?mode=memory&cache=shared",
InstancePollSeconds: 5,
SnapshotTopologiesIntervalHours: 0,
ReasonableReplicationLagSeconds: 10,
AuditLogFile: "",
AuditToSyslog: false,
AuditToBackendDB: false,
AuditPurgeDays: 7,
RecoveryPeriodBlockSeconds: 30,
PreventCrossDataCenterPrimaryFailover: false,
LockShardTimeoutSeconds: 30,
WaitReplicasTimeoutSeconds: 30,
TopoInformationRefreshSeconds: 15,
RecoveryPollSeconds: 1,
}
}
func (config *Configuration) postReadAdjustments() error {
if config.IsSQLite() && config.SQLite3DataFile == "" {
return fmt.Errorf("SQLite3DataFile must be set")
}
return nil
}
// TODO: Simplify the callers and delete this function
func (config *Configuration) IsSQLite() bool {
return true
}
// TODO: Simplify the callers and delete this function
func (config *Configuration) IsMySQL() bool {
return false
}
// read reads configuration from given file, or silently skips if the file does not exist.
// If the file does exist, then it is expected to be in valid JSON format or the function bails out.
func read(fileName string) (*Configuration, error) {
if fileName == "" {
return Config, fmt.Errorf("Empty file name")
}
file, err := os.Open(fileName)
if err != nil {
return Config, err
}
decoder := json.NewDecoder(file)
err = decoder.Decode(Config)
if err == nil {
log.Infof("Read config: %s", fileName)
} else {
log.Fatal("Cannot read config file:", fileName, err)
}
if err := Config.postReadAdjustments(); err != nil {
log.Fatal(err)
}
return Config, err
}
// Read reads configuration from zero, either, some or all given files, in order of input.
// A file can override configuration provided in previous file.
func Read(fileNames ...string) *Configuration {
for _, fileName := range fileNames {
_, _ = read(fileName)
}
readFileNames = fileNames
return Config
}
// ForceRead reads configuration from given file name or bails out if it fails
func ForceRead(fileName string) *Configuration {
_, err := read(fileName)
if err != nil {
log.Fatal("Cannot read config file:", fileName, err)
}
readFileNames = []string{fileName}
return Config
}
// Reload re-reads configuration from last used files
func Reload(extraFileNames ...string) *Configuration {
for _, fileName := range readFileNames {
_, _ = read(fileName)
}
for _, fileName := range extraFileNames {
_, _ = read(fileName)
}
return Config
}
// MarkConfigurationLoaded is called once configuration has first been loaded.
// Listeners on ConfigurationLoaded will get a notification
func MarkConfigurationLoaded() {
go func() {
for {
configurationLoaded <- true
}
}()
// wait for it
<-configurationLoaded
}
// WaitForConfigurationToBeLoaded does just that. It will return after
// the configuration file has been read off disk.
func WaitForConfigurationToBeLoaded() {
<-configurationLoaded
}