diff --git a/filebeat/autodiscover/autodiscover.go b/filebeat/autodiscover/autodiscover.go deleted file mode 100644 index 85fa6cf65bd..00000000000 --- a/filebeat/autodiscover/autodiscover.go +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - "errors" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/bus" -) - -// AutodiscoverAdapter for Filebeat modules & input -type AutodiscoverAdapter struct { - inputFactory cfgfile.RunnerFactory - moduleFactory cfgfile.RunnerFactory -} - -// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Filebeat modules & input -func NewAutodiscoverAdapter(inputFactory, moduleFactory cfgfile.RunnerFactory) *AutodiscoverAdapter { - return &AutodiscoverAdapter{ - inputFactory: inputFactory, - moduleFactory: moduleFactory, - } -} - -// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` -func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) { - config, ok := e["config"].([]*common.Config) - if !ok { - return nil, errors.New("Got a wrong value in event `config` key") - } - return config, nil -} - -// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work -func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error { - var factory cfgfile.RunnerFactory - - if c.HasField("module") { - factory = m.moduleFactory - } else { - factory = m.inputFactory - } - - if checker, ok := factory.(cfgfile.ConfigChecker); ok { - return checker.CheckConfig(c) - } - - return nil -} - -// Create a module or input from the given config -func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - if c.HasField("module") { - return m.moduleFactory.Create(p, c, meta) - } - return m.inputFactory.Create(p, c, meta) -} - -// EventFilter returns the bus filter to retrieve runner start/stop triggering events -func (m *AutodiscoverAdapter) EventFilter() []string { - return []string{"config"} -} diff --git a/filebeat/autodiscover/include.go b/filebeat/autodiscover/include.go deleted file mode 100644 index de15185e716..00000000000 --- a/filebeat/autodiscover/include.go +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - // include all filebeat specific builders - _ "github.com/elastic/beats/v7/filebeat/autodiscover/builder/hints" -) diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index 99cd952e4e4..ede58e25bcf 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -76,27 +76,32 @@ func (c *crawler) Start( for _, inputConfig := range c.inputConfigs { err := c.startInput(pipeline, inputConfig, r.GetStates()) if err != nil { - return err + return fmt.Errorf("starting input failed: %+v", err) } } if configInputs.Enabled() { c.inputReloader = cfgfile.NewReloader(pipeline, configInputs) if err := c.inputReloader.Check(c.inputsFactory); err != nil { - return err + return fmt.Errorf("creating input reloader failed: %+v", err) } - go func() { - c.inputReloader.Run(c.inputsFactory) - }() } if configModules.Enabled() { c.modulesReloader = cfgfile.NewReloader(pipeline, configModules) if err := c.modulesReloader.Check(c.modulesFactory); err != nil { - return err + return fmt.Errorf("creating module reloader failed: %+v", err) } + } + + if c.inputReloader != nil { + go func() { + c.inputReloader.Run(c.inputsFactory) + }() + } + if c.modulesReloader != nil { go func() { c.modulesReloader.Run(c.modulesFactory) }() diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index e368cd89615..226646150e4 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -24,7 +24,6 @@ import ( "github.com/pkg/errors" - fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover" "github.com/elastic/beats/v7/filebeat/channel" cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/filebeat/fileset" @@ -43,9 +42,14 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + _ "github.com/elastic/beats/v7/filebeat/include" + // Add filebeat level processors _ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata" _ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields" + + // include all filebeat specific builders + _ "github.com/elastic/beats/v7/filebeat/autodiscover/builder/hints" ) const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" + @@ -282,7 +286,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules) if err != nil { crawler.Stop() - return err + return fmt.Errorf("Failed to start crawler: %+v", err) } // If run once, add crawler completion check as alternative to done signal @@ -304,8 +308,16 @@ func (fb *Filebeat) Run(b *beat.Beat) error { var adiscover *autodiscover.Autodiscover if fb.config.Autodiscover != nil { - adapter := fbautodiscover.NewAutodiscoverAdapter(inputLoader, moduleLoader) - adiscover, err = autodiscover.NewAutodiscover("filebeat", b.Publisher, adapter, config.Autodiscover) + adiscover, err = autodiscover.NewAutodiscover( + "filebeat", + b.Publisher, + cfgfile.MultiplexedRunnerFactory( + cfgfile.MatchHasField("module", moduleLoader), + cfgfile.MatchDefault(inputLoader), + ), + autodiscover.QueryConfig(), + config.Autodiscover, + ) if err != nil { return err } diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 8f253bb1323..73da881ec32 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -30,7 +30,7 @@ type ConnectorFunc func(*common.Config, beat.ClientConfig) (Outleter, error) type pipelineConnector struct { parent *OutletFactory - pipeline beat.Pipeline + pipeline beat.PipelineConnector } // Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function. diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index 3bfeccc0150..b145c4a34f5 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -87,7 +87,7 @@ func NewOutletFactory( // Inputs and all harvesters use the same pipeline client instance. // This guarantees ordering between events as required by the registrar for // file.State updates -func (f *OutletFactory) Create(p beat.Pipeline) Connector { +func (f *OutletFactory) Create(p beat.PipelineConnector) Connector { return &pipelineConnector{parent: f, pipeline: p} } diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 0069cda6f02..9df9ff584b3 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -23,7 +23,7 @@ import ( ) // Factory is used to create a new Outlet instance -type Factory func(beat.Pipeline) Connector +type Factory func(beat.PipelineConnector) Connector // Connector creates an Outlet connecting the event publishing with some internal pipeline. // type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 92e626795c9..46e86d7a33f 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/mitchellh/hashstructure" ) @@ -75,7 +76,7 @@ func NewFactory( } // Create creates a module based on a config -func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { // Start a registry of one module: m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false) if err != nil { @@ -114,6 +115,11 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP }, nil } +func (f *Factory) CheckConfig(c *common.Config) error { + _, err := f.Create(pubpipeline.NewNilPipeline(), c, nil) + return err +} + func (p *inputsRunner) Start() { // Load pipelines if p.pipelineLoaderFactory != nil { @@ -153,6 +159,7 @@ func (p *inputsRunner) Start() { moduleList.Add(m) } } + func (p *inputsRunner) Stop() { if p.pipelineCallbackID != uuid.Nil { elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID) diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index 4c26dfb47e3..027e9506dc0 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline" ) // SetupFactory is for loading module assets when running setup subcommand. @@ -41,7 +42,7 @@ func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFac } // Create creates a new SetupCfgRunner to setup module configuration. -func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) { +func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) { m, err := NewModuleRegistry([]*common.Config{c}, sf.beatInfo, false) if err != nil { return nil, err @@ -54,6 +55,11 @@ func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapS }, nil } +func (sf *SetupFactory) CheckConfig(c *common.Config) error { + _, err := sf.Create(pubpipeline.NewNilPipeline(), c, nil) + return err +} + // SetupCfgRunner is for loading assets of modules. type SetupCfgRunner struct { moduleRegistry *ModuleRegistry diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index d41382fe01f..a47e848bc6d 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" ) // RunnerFactory is a factory for registrars @@ -43,7 +44,7 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be // Create creates a input based on a config func (r *RunnerFactory) Create( - pipeline beat.Pipeline, + pipeline beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer, ) (cfgfile.Runner, error) { @@ -56,3 +57,8 @@ func (r *RunnerFactory) Create( return p, nil } + +func (r *RunnerFactory) CheckConfig(cfg *common.Config) error { + _, err := r.Create(pipeline.NewNilPipeline(), cfg, nil) + return err +} diff --git a/heartbeat/autodiscover/include.go b/heartbeat/autodiscover/include.go deleted file mode 100644 index 2ae869729c4..00000000000 --- a/heartbeat/autodiscover/include.go +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - // include all heartbeat specific builders - _ "github.com/elastic/beats/v7/heartbeat/autodiscover/builder/hints" -) diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 90eef546788..dbdfe1dd740 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -156,14 +156,7 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) { // makeAutodiscover creates an autodiscover object ready to be started. func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) { - adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory) - - ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover) - if err != nil { - return nil, err - } - - return ad, nil + return autodiscover.NewAutodiscover("heartbeat", b.Publisher, bt.dynamicFactory, autodiscover.QueryConfig(), bt.config.Autodiscover) } // Stop stops the beat. diff --git a/heartbeat/cmd/root.go b/heartbeat/cmd/root.go index 858cc2c0daf..8e24b28976b 100644 --- a/heartbeat/cmd/root.go +++ b/heartbeat/cmd/root.go @@ -20,9 +20,11 @@ package cmd import ( "fmt" - _ "github.com/elastic/beats/v7/heartbeat/autodiscover" "github.com/elastic/beats/v7/heartbeat/beater" + // include all heartbeat specific autodiscovery builders + _ "github.com/elastic/beats/v7/heartbeat/autodiscover/builder/hints" + // register default heartbeat monitors _ "github.com/elastic/beats/v7/heartbeat/monitors/defaults" cmd "github.com/elastic/beats/v7/libbeat/cmd" diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index b6c39d14d4d..26937020dd9 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -37,7 +37,7 @@ func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { } // Create makes a new Runner for a new monitor with the given Config. -func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches, meta) return monitor, err } diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 6ceb46fa794..c4941c4b176 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -37,18 +37,17 @@ const ( retryPeriod = 10 * time.Second ) -// TODO autodiscover providers config reload +// EventConfigurer is used to configure the creation of configuration objects +// from the autodiscover event bus. +type EventConfigurer interface { + // EventFilter returns the bus filter to retrieve runner start/stop triggering + // events. The bus will filter events to the ones, that contain *all* the + // the required top-level keys. + EventFilter() []string -// Adapter must be implemented by the beat in order to provide Autodiscover -type Adapter interface { - // CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` + // CreateConfig creates a list of configurations from a bus.Event. The + // received event will have all keys defined in `EventFilter`. CreateConfig(bus.Event) ([]*common.Config, error) - - // RunnerFactory provides runner creation by feeding valid configs - cfgfile.CheckableRunnerFactory - - // EventFilter returns the bus filter to retrieve runner start/stop triggering events - EventFilter() []string } // Autodiscover process, it takes a beat adapter and user config and runs autodiscover process, spawning @@ -56,7 +55,8 @@ type Adapter interface { type Autodiscover struct { bus bus.Bus defaultPipeline beat.Pipeline - adapter Adapter + factory cfgfile.RunnerFactory + configurer EventConfigurer providers []Provider configs map[string]map[uint64]*reload.ConfigWithMeta runners *cfgfile.RunnerList @@ -66,7 +66,13 @@ type Autodiscover struct { } // NewAutodiscover instantiates and returns a new Autodiscover manager -func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, config *Config) (*Autodiscover, error) { +func NewAutodiscover( + name string, + pipeline beat.Pipeline, + factory cfgfile.RunnerFactory, + configurer EventConfigurer, + config *Config, +) (*Autodiscover, error) { logger := logp.NewLogger("autodiscover") // Init Event bus @@ -86,9 +92,10 @@ func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, confi return &Autodiscover{ bus: bus, defaultPipeline: pipeline, - adapter: adapter, + factory: factory, + configurer: configurer, configs: map[string]map[uint64]*reload.ConfigWithMeta{}, - runners: cfgfile.NewRunnerList("autodiscover", adapter, pipeline), + runners: cfgfile.NewRunnerList("autodiscover", factory, pipeline), providers: providers, meta: meta.NewMap(), logger: logger, @@ -102,7 +109,7 @@ func (a *Autodiscover) Start() { } a.logger.Info("Starting autodiscover manager") - a.listener = a.bus.Subscribe(a.adapter.EventFilter()...) + a.listener = a.bus.Subscribe(a.configurer.EventFilter()...) // It is important to start the worker first before starting the producer. // In hosts that have large number of workloads, it is easy to have an initial @@ -175,7 +182,7 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { a.configs[eventID] = map[uint64]*reload.ConfigWithMeta{} } - configs, err := a.adapter.CreateConfig(event) + configs, err := a.configurer.CreateConfig(event) if err != nil { a.logger.Debugf("Could not generate config from event %v: %v", event, err) return false @@ -199,7 +206,7 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { continue } - err = a.adapter.CheckConfig(config) + err = a.factory.CheckConfig(config) if err != nil { a.logger.Error(errors.Wrap(err, fmt.Sprintf("Auto discover config check failed for config '%s', won't start runner", common.DebugString(config, true)))) continue diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index c466785d4b9..182cd99f3cb 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -92,7 +92,7 @@ func (m *mockAdapter) CheckConfig(c *common.Config) error { return nil } -func (m *mockAdapter) Create(_ beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (m *mockAdapter) Create(_ beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { runner := &mockRunner{ config: config, meta: meta, @@ -166,7 +166,7 @@ func TestAutodiscover(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } @@ -293,7 +293,7 @@ func TestAutodiscoverHash(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } @@ -359,7 +359,7 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", nil, &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config) if err != nil { t.Fatal(err) } diff --git a/libbeat/autodiscover/eventselect.go b/libbeat/autodiscover/eventselect.go new file mode 100644 index 00000000000..670fc84ed60 --- /dev/null +++ b/libbeat/autodiscover/eventselect.go @@ -0,0 +1,53 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package autodiscover + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/bus" +) + +type queryConfigFrom string + +var defaultConfigQuery = queryConfigFrom("config") + +// EventConfigQuery creates an EventConfigurer that tries to cast the given event +// field from from the buf event into a []*common.Config. +func EventConfigQuery(field string) EventConfigurer { + if field == "" || field == "config" { + return defaultConfigQuery + } + return queryConfigFrom(field) +} + +// QueryConfig extract an array of *common.Config from bus.Event. +// The configurations are expected to be in the 'config' field. +func QueryConfig() EventConfigurer { return defaultConfigQuery } + +func (q queryConfigFrom) EventFilter() []string { return []string{string(q)} } + +func (q queryConfigFrom) CreateConfig(e bus.Event) ([]*common.Config, error) { + fieldName := string(q) + config, ok := e[fieldName].([]*common.Config) + if !ok { + return nil, fmt.Errorf("Event field '%v' does not contain a valid configuration object", fieldName) + } + return config, nil +} diff --git a/libbeat/autodiscover/factoryadapter.go b/libbeat/autodiscover/factoryadapter.go deleted file mode 100644 index 64dc9f15ca7..00000000000 --- a/libbeat/autodiscover/factoryadapter.go +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - "errors" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/bus" -) - -// FactoryAdapter is an adapter that works with any cfgfile.RunnerFactory. -type FactoryAdapter struct { - factory cfgfile.CheckableRunnerFactory -} - -// NewFactoryAdapter builds and returns an autodiscover adapter that works with any cfgfile.RunnerFactory. -func NewFactoryAdapter(factory cfgfile.CheckableRunnerFactory) *FactoryAdapter { - return &FactoryAdapter{ - factory: factory, - } -} - -// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` -func (m *FactoryAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) { - config, ok := e["config"].([]*common.Config) - if !ok { - return nil, errors.New("Got a wrong value in event `config` key") - } - return config, nil -} - -// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work -func (m *FactoryAdapter) CheckConfig(c *common.Config) error { - return m.factory.CheckConfig(c) -} - -// Create a module or prospector from the given config -func (m *FactoryAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - return m.factory.Create(p, c, meta) -} - -// EventFilter returns the bus filter to retrieve runner start/stop triggering events -func (m *FactoryAdapter) EventFilter() []string { - return []string{"config"} -} diff --git a/libbeat/cfgfile/factories.go b/libbeat/cfgfile/factories.go new file mode 100644 index 00000000000..095a7077205 --- /dev/null +++ b/libbeat/cfgfile/factories.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cfgfile + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +type multiplexedFactory []FactoryMatcher + +// FactoryMatcher returns a RunnerFactory that can handle the given +// configuration if it supports it, otherwise it returns nil. +type FactoryMatcher func(cfg *common.Config) RunnerFactory + +var errConfigDoesNotMatch = errors.New("config does not match accepted configurations") + +// MultiplexedRunnerFactory is a RunnerFactory that uses a list of +// FactoryMatcher to choose which RunnerFactory should handle the configuration. +// When presented a Config object, MultiplexedRunnerFactory will query the +// matchers in the order given. The first RunnerFactory returned will be used +// to create the runner. +// Creating a runner or checking a configuration will return an error if no +// matcher was found. Use MatchDefault as last argument to +// MultiplexedRunnerFactory to configure a default RunnerFactory that shall +// always be used if no other factory was matched. +func MultiplexedRunnerFactory(matchers ...FactoryMatcher) RunnerFactory { + return multiplexedFactory(matchers) +} + +// MatchHasField returns a FactoryMatcher that returns the given RunnerFactory +// when the input config contains the given field name. +func MatchHasField(field string, factory RunnerFactory) FactoryMatcher { + return func(cfg *common.Config) RunnerFactory { + if cfg.HasField(field) { + return factory + } + return nil + } +} + +// MatchDefault return a FactoryMatcher that always returns returns the given +// RunnerFactory. +func MatchDefault(factory RunnerFactory) FactoryMatcher { + return func(cfg *common.Config) RunnerFactory { + return factory + } +} + +func (f multiplexedFactory) Create( + p beat.PipelineConnector, + config *common.Config, + meta *common.MapStrPointer, +) (Runner, error) { + factory, err := f.findFactory(config) + if err != nil { + return nil, err + } + return factory.Create(p, config, meta) +} + +func (f multiplexedFactory) CheckConfig(c *common.Config) error { + factory, err := f.findFactory(c) + if err == nil { + err = factory.CheckConfig(c) + } + return err +} + +func (f multiplexedFactory) findFactory(c *common.Config) (RunnerFactory, error) { + for _, matcher := range f { + if factory := matcher(c); factory != nil { + return factory, nil + } + } + + return nil, errConfigDoesNotMatch +} diff --git a/libbeat/cfgfile/list_test.go b/libbeat/cfgfile/list_test.go index b9ae5687831..9d28187a503 100644 --- a/libbeat/cfgfile/list_test.go +++ b/libbeat/cfgfile/list_test.go @@ -43,7 +43,7 @@ func (r *runner) Stop() { r.stopped = true } type runnerFactory struct{ runners []*runner } -func (r *runnerFactory) Create(x beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (Runner, error) { +func (r *runnerFactory) Create(x beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (Runner, error) { config := struct { ID int64 `config:"id"` }{} diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 6b1be3d16e5..990264c3a6b 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -69,23 +69,18 @@ type Reload struct { Enabled bool `config:"enabled"` } -// RunnerFactory is used for creating of new Runners +// RunnerFactory is used for validating generated configurations and creating +// of new Runners type RunnerFactory interface { - Create(p beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (Runner, error) -} + // Create creates a new Runner based on the given configuration. + Create(p beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer) (Runner, error) -// ConfigChecker is usually combined with a RunnerFactory for implementations that can check a config -// without a pipeline and metadata. -type ConfigChecker interface { + // CheckConfig tests if a confiugation can be used to create an input. If it + // is not possible to create an input using the configuration, an error must + // be returned. CheckConfig(config *common.Config) error } -// CheckableRunnerFactory is the union of RunnerFactory and ConfigChecker. -type CheckableRunnerFactory interface { - RunnerFactory - ConfigChecker -} - // Runner is a simple interface providing a simple way to // Start and Stop Reloader type Runner interface { @@ -145,7 +140,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { // Load all config objects configs, err := rl.loadConfigs(files) if err != nil { - return err + return errors.Wrap(err, "loading configs") } debugf("Number of module configs found: %v", len(configs)) @@ -157,13 +152,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { continue } - if checker, ok := runnerFactory.(ConfigChecker); ok { - err = checker.CheckConfig(c.Config) - } else { - _, err = runnerFactory.Create(rl.pipeline, c.Config, c.Meta) - } - - if err != nil { + if err = runnerFactory.CheckConfig(c.Config); err != nil { return err } } diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go new file mode 100644 index 00000000000..f32785a8d22 --- /dev/null +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import "github.com/elastic/beats/v7/libbeat/beat" + +type nilPipeline struct{} + +type nilClient struct { + eventer beat.ClientEventer + ackCount func(int) + ackEvents func([]interface{}) + ackLastEvent func(interface{}) +} + +var _nilPipeline = (*nilPipeline)(nil) + +// NewNilPipeline returns a new pipeline that is compatible with +// beats.PipelineConnector. The pipeline will discard all events that have been +// published. Client ACK handlers will still be executed, but the callbacks +// will be executed immediately when the event is published. +func NewNilPipeline() beat.PipelineConnector { return _nilPipeline } + +func (p *nilPipeline) Connect() (beat.Client, error) { + return p.ConnectWith(beat.ClientConfig{}) +} + +func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + return &nilClient{ + eventer: cfg.Events, + ackCount: cfg.ACKCount, + ackEvents: cfg.ACKEvents, + ackLastEvent: cfg.ACKLastEvent, + }, nil +} + +func (c *nilClient) Publish(event beat.Event) { + c.PublishAll([]beat.Event{event}) +} + +func (c *nilClient) PublishAll(events []beat.Event) { + L := len(events) + if L == 0 { + return + } + + if c.ackLastEvent != nil { + c.ackLastEvent(events[L-1].Private) + } + if c.ackEvents != nil { + tmp := make([]interface{}, L) + for i := range events { + tmp[i] = events[i].Private + } + c.ackEvents(tmp) + } + if c.ackCount != nil { + c.ackCount(L) + } +} + +func (c *nilClient) Close() error { + if c.eventer != nil { + c.eventer.Closing() + c.eventer.Closed() + } + return nil +} diff --git a/metricbeat/autodiscover/include.go b/metricbeat/autodiscover/include.go deleted file mode 100644 index 0843fe6c156..00000000000 --- a/metricbeat/autodiscover/include.go +++ /dev/null @@ -1,26 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package autodiscover - -import ( - // include all metricbeat specific builders - _ "github.com/elastic/beats/v7/metricbeat/autodiscover/builder/hints" - - // include all metricbeat specific appenders - _ "github.com/elastic/beats/v7/metricbeat/autodiscover/appender/kubernetes/token" -) diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index b396ac19c4b..050e7f265c1 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -33,8 +33,11 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/module" - // Add autodiscover builders / appenders - _ "github.com/elastic/beats/v7/metricbeat/autodiscover" + // include all metricbeat specific builders + _ "github.com/elastic/beats/v7/metricbeat/autodiscover/builder/hints" + + // include all metricbeat specific appenders + _ "github.com/elastic/beats/v7/metricbeat/autodiscover/appender/kubernetes/token" // Add metricbeat default processors _ "github.com/elastic/beats/v7/metricbeat/processor/add_kubernetes_metadata" @@ -177,8 +180,8 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe if config.Autodiscover != nil { var err error - adapter := autodiscover.NewFactoryAdapter(factory) - metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover) + metricbeat.autodiscover, err = autodiscover.NewAutodiscover( + "metricbeat", b.Publisher, factory, autodiscover.QueryConfig(), config.Autodiscover) if err != nil { return nil, err } diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 5d9c75a573c..ea2292bd74c 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -30,7 +30,7 @@ import ( // Connector configures and establishes a beat.Client for publishing events // to the publisher pipeline. type Connector struct { - pipeline beat.Pipeline + pipeline beat.PipelineConnector processors *processors.Processors eventMeta common.EventMetadata dynamicFields *common.MapStrPointer @@ -54,7 +54,7 @@ type metricSetRegister interface { } func NewConnector( - beatInfo beat.Info, pipeline beat.Pipeline, + beatInfo beat.Info, pipeline beat.PipelineConnector, c *common.Config, dynFields *common.MapStrPointer, ) (*Connector, error) { config := connectorConfig{} diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 5392701ccb0..9256fc5b5b1 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -40,7 +40,7 @@ func NewFactory(beatInfo beat.Info, options ...Option) *Factory { } // Create creates a new metricbeat module runner reporting events to the passed pipeline. -func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (r *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { module, metricSets, err := mb.NewModule(c, mb.Registry) if err != nil { return nil, err