-
Notifications
You must be signed in to change notification settings - Fork 133
/
config.go
240 lines (208 loc) · 7.87 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package pump
import (
"crypto/tls"
"flag"
"fmt"
"net"
"net/url"
"os"
"strings"
"time"
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/security"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb-binlog/pkg/version"
"github.com/pingcap/tidb-binlog/pump/storage"
)
const (
defaultEtcdDialTimeout = 5 * time.Second
defaultEtcdURLs = "http://127.0.0.1:2379"
defaultListenAddr = "127.0.0.1:8250"
defautMaxKafkaSize = 1024 * 1024 * 1024
defaultHeartbeatInterval = 2
defaultGC = 7
defaultDataDir = "data.pump"
// default interval time to generate fake binlog, the unit is second
defaultGenFakeBinlogInterval = 3
)
// globalConfig is global config of pump to be used in any where
type globalConfig struct {
// enable online debug log output
enableDebug bool
// max binlog message size limit
maxMsgSize int
}
// Config holds the configuration of pump
type Config struct {
*flag.FlagSet
LogLevel string `toml:"log-level" json:"log-level"`
NodeID string `toml:"node-id" json:"node-id"`
ListenAddr string `toml:"addr" json:"addr"`
AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"`
Socket string `toml:"socket" json:"socket"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
EtcdDialTimeout time.Duration
DataDir string `toml:"data-dir" json:"data-dir"`
HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"`
// pump only stores binlog events whose ts >= current time - GC(day)
GC int `toml:"gc" json:"gc"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
Security security.Config `toml:"security" json:"security"`
GenFakeBinlogInterval int `toml:"gen-binlog-interval" json:"gen-binlog-interval"`
MetricsAddr string
MetricsInterval int
configFile string
printVersion bool
tls *tls.Config
Storage storage.Config `toml:"storage" json:"storage"`
}
// NewConfig return an instance of configuration
func NewConfig() *Config {
cfg := &Config{
EtcdDialTimeout: defaultEtcdDialTimeout,
}
cfg.FlagSet = flag.NewFlagSet("pump", flag.ContinueOnError)
fs := cfg.FlagSet
fs.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of pump:")
fs.PrintDefaults()
}
fs.StringVar(&cfg.NodeID, "node-id", "", "the ID of pump node; if not specify, we will generate one from hostname and the listening port")
fs.StringVar(&cfg.ListenAddr, "addr", util.DefaultListenAddr(8250), "addr(i.e. 'host:port') to listen on for client traffic")
fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", "addr(i.e. 'host:port') to advertise to the public")
fs.StringVar(&cfg.Socket, "socket", "", "unix socket addr to listen on for client traffic")
fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of the PD endpoints")
fs.StringVar(&cfg.DataDir, "data-dir", "", "the path to store binlog data")
fs.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", defaultHeartbeatInterval, "number of seconds between heartbeat ticks")
fs.IntVar(&cfg.GC, "gc", defaultGC, "recycle binlog files older than gc days")
fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push")
fs.StringVar(&cfg.configFile, "config", "", "path to the pump configuration file")
fs.BoolVar(&cfg.printVersion, "V", false, "print version information and exit")
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
fs.StringVar(&cfg.LogRotate, "log-rotate", "", "log file rotate type, hour/day")
fs.IntVar(&cfg.GenFakeBinlogInterval, "fake-binlog-interval", defaultGenFakeBinlogInterval, "interval time to generate fake binlog, the unit is second")
// global config
fs.BoolVar(&GlobalConfig.enableDebug, "enable-debug", false, "enable print debug log")
fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defautMaxKafkaSize, "max msg size producer produce into kafka")
fs.Int64Var(new(int64), "binlog-file-size", 0, "DEPRECATED")
fs.BoolVar(new(bool), "enable-binlog-slice", false, "DEPRECATED")
fs.IntVar(new(int), "binlog-slice-size", 0, "DEPRECATED")
return cfg
}
// Parse parses all config from command-line flags, environment vars or configuration file
func (cfg *Config) Parse(arguments []string) error {
// Parse first to get config file
perr := cfg.FlagSet.Parse(arguments)
switch perr {
case nil:
case flag.ErrHelp:
os.Exit(0)
default:
os.Exit(2)
}
if cfg.printVersion {
version.PrintVersionInfo()
os.Exit(0)
}
// Load config file if specified
if cfg.configFile != "" {
if err := cfg.configFromFile(cfg.configFile); err != nil {
return errors.Trace(err)
}
}
// Parse again to replace with command line options
cfg.FlagSet.Parse(arguments)
if len(cfg.FlagSet.Args()) > 0 {
return errors.Errorf("'%s' is not a valid flag", cfg.FlagSet.Arg(0))
}
// replace with environment vars
err := flags.SetFlagsFromEnv("PUMP", cfg.FlagSet)
if err != nil {
return errors.Trace(err)
}
cfg.tls, err = cfg.Security.ToTLSConfig()
if err != nil {
return errors.Errorf("tls config %+v error %v", cfg.Security, err)
}
adjustString(&cfg.ListenAddr, defaultListenAddr)
adjustString(&cfg.AdvertiseAddr, cfg.ListenAddr)
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
cfg.AdvertiseAddr = "http://" + cfg.AdvertiseAddr // add 'http:' scheme to facilitate parsing
adjustDuration(&cfg.EtcdDialTimeout, defaultEtcdDialTimeout)
adjustString(&cfg.DataDir, defaultDataDir)
adjustInt(&cfg.HeartbeatInterval, defaultHeartbeatInterval)
return cfg.validate()
}
func (cfg *Config) configFromFile(path string) error {
_, err := toml.DecodeFile(path, cfg)
return errors.Trace(err)
}
func adjustString(v *string, defValue string) {
if len(*v) == 0 {
*v = defValue
}
}
func adjustInt(v *int, defValue int) {
if *v == 0 {
*v = defValue
}
}
func adjustDuration(v *time.Duration, defValue time.Duration) {
if *v == 0 {
*v = defValue
}
}
// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
// check GC
if cfg.GC <= 0 {
return errors.Errorf("GC is %d, must bigger than 0", cfg.GC)
}
// check ListenAddr
urllis, err := url.Parse(cfg.ListenAddr)
if err != nil {
return errors.Errorf("parse ListenAddr error: %s, %v", cfg.ListenAddr, err)
}
var host string
if _, _, err = net.SplitHostPort(urllis.Host); err != nil {
return errors.Errorf("bad ListenAddr host format: %s, %v", urllis.Host, err)
}
// check AdvertiseAddr
urladv, err := url.Parse(cfg.AdvertiseAddr)
if err != nil {
return errors.Errorf("parse AdvertiseAddr error: %s, %v", cfg.AdvertiseAddr, err)
}
host, _, err = net.SplitHostPort(urladv.Host)
if err != nil {
return errors.Errorf("bad AdvertiseAddr host format: %s, %v", urladv.Host, err)
}
if len(host) == 0 || host == "0.0.0.0" {
return errors.Errorf("invalid advertiseAddr host: %v", host)
}
// check socketAddr
if len(cfg.Socket) > 0 {
urlsock, err := url.Parse(cfg.Socket)
if err != nil {
return errors.Errorf("parse Socket error: %s, %v", cfg.Socket, err)
}
if len(strings.Split(urlsock.Path, "/")) < 2 {
return errors.Errorf("bad Socket addr format: %s", urlsock.Path)
}
}
// check EtcdEndpoints
urlv, err := flags.NewURLsValue(cfg.EtcdURLs)
if err != nil {
return errors.Errorf("parse EtcdURLs error: %s, %v", cfg.EtcdURLs, err)
}
for _, u := range urlv.URLSlice() {
if _, _, err := net.SplitHostPort(u.Host); err != nil {
return errors.Errorf("bad EtcdURL host format: %s, %v", u.Host, err)
}
}
return nil
}