Skip to content

Commit

Permalink
[WIP] Etcd integration for configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
titilambert committed Feb 20, 2016
1 parent 9edc259 commit 7849b11
Show file tree
Hide file tree
Showing 17 changed files with 662 additions and 68 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ github.com/aws/aws-sdk-go 87b1e60a50b09e4812dee560b33a238f67305804
github.com/beorn7/perks b965b613227fddccbfffe13eae360ed3fa822f8d
github.com/cenkalti/backoff 4dc77674aceaabba2c7e3da25d4c823edfb73f99
github.com/dancannon/gorethink 6f088135ff288deb9d5546f4c71919207f891a70
github.com/coreos/etcd bdee27b19e8601ffd7bd4f0481abe9bbae04bd09
github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d
github.com/eapache/go-resiliency b86b1ec0dd4209a588dc1285cdd471e73525c0b3
github.com/eapache/queue ded5959c0d4e360646dc9e9908cff48666781367
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ endif
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379

# Run docker containers necessary for CircleCI unit tests
docker-run-circle:
Expand All @@ -85,11 +86,12 @@ docker-run-circle:
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name snmp -p "31161:31161/udp" -d titilambert/snmpsim
docker run --name etcd -p "2379:2379" -d quay.io/coreos/etcd -name etcd0 -advertise-client-urls http://localhost:2379 -listen-client-urls http://0.0.0.0:2379

# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp
-docker kill nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd
-docker rm nsq aerospike redis opentsdb rabbitmq postgres memcached mysql kafka mqtt riemann snmp etcd

# Run full unit tests using docker containers (includes setup and teardown)
test: vet docker-kill docker-run
Expand Down
204 changes: 138 additions & 66 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/internal/config"
"github.com/influxdata/telegraf/internal/etcd"

_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
Expand All @@ -21,6 +22,11 @@ var fDebug = flag.Bool("debug", false,
var fQuiet = flag.Bool("quiet", false,
"run in quiet mode")
var fTest = flag.Bool("test", false, "gather metrics, print them out, and exit")
var fEtcd = flag.String("etcd", "", "etcd url where configuration is stored")
var fEtcdSendConfigDir = flag.String("etcdwriteconfigdir", "", "store the following config dir to etcd")
var fEtcdSendConfig = flag.String("etcdwriteconfig", "", "store the following config file to etcd")
var fEtcdSendLabel = flag.String("etcdwritelabel", "", "store config file to etcd with this label")
var fEtcdReadLabels = flag.String("etcdreadlabels", "", "read config from etcd using labels (comma-separated)")
var fConfig = flag.String("config", "", "configuration file to load")
var fConfigDirectory = flag.String("config-directory", "",
"directory containing additional *.conf files")
Expand Down Expand Up @@ -54,16 +60,20 @@ Usage:
The flags are:
-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-input-filter filter the input plugins to enable, separator is :
-output-filter filter the output plugins to enable, separator is :
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout
-config <file> configuration file to load
-test gather metrics once, print them to stdout, and exit
-sample-config print out full sample configuration to stdout
-config-directory directory containing additional *.conf files
-etcdwriteconfigdir store the following config dir to etcd
-etcdwriteconfig store the following config file to etcd
-etcdwritelabel store config file to etcd with this label
-etcdreadlabels read config from etcd using labels (comma-separated)
-input-filter filter the input plugins to enable, separator is :
-output-filter filter the output plugins to enable, separator is :
-usage print usage for a plugin, ie, 'telegraf -usage mysql'
-debug print metrics as they're generated to stdout
-quiet run in quiet mode
-version print the version to stdout
Examples:
Expand All @@ -84,63 +94,138 @@ Examples:
`

func main() {
// Read flags
flag.Usage = func() { usageExit(0) }
flag.Parse()
if flag.NFlag() == 0 {
usageExit(0)
}

// Prepare signals handling
reload := make(chan bool, 1)
reload <- true
for <-reload {
reload <- false
flag.Usage = func() { usageExit(0) }
flag.Parse()

if flag.NFlag() == 0 {
usageExit(0)
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)

// Prepare etcd if needed
var e *etcd.EtcdClient
if *fEtcd != "" {
e = etcd.NewEtcdClient(*fEtcd, "/telegraf")
if *fEtcdSendConfig == "" && *fEtcdSendLabel == "" && *fEtcdSendConfigDir == "" {
go e.LaunchWatcher(shutdown, signals)
}
}

var inputFilters []string
if *fInputFiltersLegacy != "" {
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
// Handle signals
go func() {
for {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
} else if sig == syscall.SIGHUP {
log.Print("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}
}()

var outputFilters []string
if *fOutputFiltersLegacy != "" {
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
// Prepare inputs
var inputFilters []string
if *fInputFiltersLegacy != "" {
inputFilter := strings.TrimSpace(*fInputFiltersLegacy)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}
if *fInputFilters != "" {
inputFilter := strings.TrimSpace(*fInputFilters)
inputFilters = strings.Split(":"+inputFilter+":", ":")
}

if *fVersion {
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
}
// Prepare outputs
var outputFilters []string
if *fOutputFiltersLegacy != "" {
outputFilter := strings.TrimSpace(*fOutputFiltersLegacy)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}
if *fOutputFilters != "" {
outputFilter := strings.TrimSpace(*fOutputFilters)
outputFilters = strings.Split(":"+outputFilter+":", ":")
}

if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
return
}
// Print version
if *fVersion {
v := fmt.Sprintf("Telegraf - Version %s", Version)
fmt.Println(v)
return
}

if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
// Print sample config
if *fSampleConfig {
config.PrintSampleConfig(inputFilters, outputFilters)
return
}

// Print usage
if *fUsage != "" {
if err := config.PrintInputConfig(*fUsage); err != nil {
if err2 := config.PrintOutputConfig(*fUsage); err2 != nil {
log.Fatalf("%s and %s", err, err2)
}
return
}
return
}

for <-reload {
// Reset signal handler vars
shutdown = make(chan struct{})
reload <- false

// Prepare config
var (
c *config.Config
err error
)

if *fConfig != "" {
if *fEtcd != "" {
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
if *fEtcdSendConfigDir != "" {
// TODO check config format before write it
// Write config dir to etcd
err = c.LoadDirectory(*fEtcdSendConfigDir)
if err != nil {
log.Fatal(err)
}
err = e.WriteConfigDir(*fEtcdSendConfigDir)
if err != nil {
log.Fatal(err)
}
return
} else if *fEtcdSendConfig != "" && *fEtcdSendLabel != "" {
// TODO check config format before write it
// Write config to etcd
err = c.LoadConfig(*fEtcdSendConfig)
if err != nil {
log.Fatal(err)
}
err = e.WriteLabelConfig(*fEtcdSendLabel, *fEtcdSendConfig)
if err != nil {
log.Fatal(err)
}
return
} else {
// Read config to etcd
log.Printf("Config read from etcd with labels %s\n", *fEtcdReadLabels)
c, err = e.ReadConfig(c, *fEtcdReadLabels)
if err != nil {
log.Fatal(err)
}
}
} else if *fConfig != "" {
// Read config from file
c = config.NewConfig()
c.OutputFilters = outputFilters
c.InputFilters = inputFilters
Expand All @@ -153,19 +238,22 @@ func main() {
os.Exit(1)
}

// Read config dir
if *fConfigDirectoryLegacy != "" {
err = c.LoadDirectory(*fConfigDirectoryLegacy)
if err != nil {
log.Fatal(err)
}
}

// Read config dir
if *fConfigDirectory != "" {
err = c.LoadDirectory(*fConfigDirectory)
if err != nil {
log.Fatal(err)
}
}
// check config
if len(c.Outputs) == 0 {
log.Fatalf("Error: no outputs found, did you provide a valid config file?")
}
Expand Down Expand Up @@ -199,22 +287,6 @@ func main() {
log.Fatal(err)
}

shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP)
go func() {
sig := <-signals
if sig == os.Interrupt {
close(shutdown)
}
if sig == syscall.SIGHUP {
log.Printf("Reloading Telegraf config\n")
<-reload
reload <- true
close(shutdown)
}
}()

log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(c.OutputNames(), " "))
log.Printf("Loaded inputs: %s", strings.Join(c.InputNames(), " "))
Expand Down
9 changes: 9 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewConfig() *Config {
Agent: &AgentConfig{
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
Labels: make([]string, 0),
FlushInterval: internal.Duration{Duration: 10 * time.Second},
FlushJitter: internal.Duration{Duration: 5 * time.Second},
},
Expand Down Expand Up @@ -99,6 +100,9 @@ type AgentConfig struct {
// Quiet is the option for running in quiet mode
Quiet bool
Hostname string

// Etcd labels
Labels []string
}

// Inputs returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -329,7 +333,12 @@ func (c *Config) LoadConfig(path string) error {
if err != nil {
return err
}
err = c.LoadConfigFromTable(tbl)
return err
}

func (c *Config) LoadConfigFromTable(tbl *ast.Table) error {
var err error
for name, val := range tbl.Fields {
subTable, ok := val.(*ast.Table)
if !ok {
Expand Down
Loading

0 comments on commit 7849b11

Please sign in to comment.