Skip to content

Commit

Permalink
Center more functionality around RunnerFactory (elastic#16715)
Browse files Browse the repository at this point in the history
* Center more functionality around RunnerFactory

Remove CheckableRunnerFactory and require RunnerFactory to implement
CheckConfig. CheckableRunnerFactory more and more superseded
RunnerFactory. As we want more config validation support in the future
as well I combined the two into RunnerFactory

Remove autodiscover.Adapter. The adapter did inherit from
CheckableRunnerFactory, giving us some inheritance chain between
RunnerFactory, CheckableRunnerFactory, and Adapter for autodiscovery.
By removig the Adapter and CheckableRunnerFactory we have one common
type (RunnerFactory) to integrate with config file reloading, static
input/module setup, and autodiscovery.

Add selectors for autodiscovery event selection that are used as
additional parameters when creating a new Autodiscover instance. This
gives us some more composability, yet I wonder if we can even remove those,
as every instance of NewAutodiscover did look for events with a 'config'
field of type []*common.Config.
  • Loading branch information
Steffen Siering committed Mar 26, 2020
1 parent 778156c commit 257fdfc
Show file tree
Hide file tree
Showing 26 changed files with 336 additions and 290 deletions.
80 changes: 0 additions & 80 deletions filebeat/autodiscover/autodiscover.go

This file was deleted.

23 changes: 0 additions & 23 deletions filebeat/autodiscover/include.go

This file was deleted.

17 changes: 11 additions & 6 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,32 @@ func (c *crawler) Start(
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
if err != nil {
return err
return fmt.Errorf("starting input failed: %+v", err)
}
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return err
return fmt.Errorf("creating input reloader failed: %+v", err)
}

go func() {
c.inputReloader.Run(c.inputsFactory)
}()
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return err
return fmt.Errorf("creating module reloader failed: %+v", err)
}

}

if c.inputReloader != nil {
go func() {
c.inputReloader.Run(c.inputsFactory)
}()
}
if c.modulesReloader != nil {
go func() {
c.modulesReloader.Run(c.modulesFactory)
}()
Expand Down
20 changes: 16 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pkg/errors"

fbautodiscover "github.com/elastic/beats/v7/filebeat/autodiscover"
"github.com/elastic/beats/v7/filebeat/channel"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/fileset"
Expand All @@ -43,9 +42,14 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"

_ "github.com/elastic/beats/v7/filebeat/include"

// Add filebeat level processors
_ "github.com/elastic/beats/v7/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"

// include all filebeat specific builders
_ "github.com/elastic/beats/v7/filebeat/autodiscover/builder/hints"
)

const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" +
Expand Down Expand Up @@ -282,7 +286,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
return err
return fmt.Errorf("Failed to start crawler: %+v", err)
}

// If run once, add crawler completion check as alternative to done signal
Expand All @@ -304,8 +308,16 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adapter := fbautodiscover.NewAutodiscoverAdapter(inputLoader, moduleLoader)
adiscover, err = autodiscover.NewAutodiscover("filebeat", b.Publisher, adapter, config.Autodiscover)
adiscover, err = autodiscover.NewAutodiscover(
"filebeat",
b.Publisher,
cfgfile.MultiplexedRunnerFactory(
cfgfile.MatchHasField("module", moduleLoader),
cfgfile.MatchDefault(inputLoader),
),
autodiscover.QueryConfig(),
config.Autodiscover,
)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ConnectorFunc func(*common.Config, beat.ClientConfig) (Outleter, error)

type pipelineConnector struct {
parent *OutletFactory
pipeline beat.Pipeline
pipeline beat.PipelineConnector
}

// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewOutletFactory(
// Inputs and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(p beat.Pipeline) Connector {
func (f *OutletFactory) Create(p beat.PipelineConnector) Connector {
return &pipelineConnector{parent: f, pipeline: p}
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

// Factory is used to create a new Outlet instance
type Factory func(beat.Pipeline) Connector
type Factory func(beat.PipelineConnector) Connector

// Connector creates an Outlet connecting the event publishing with some internal pipeline.
// type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
Expand Down
9 changes: 8 additions & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline"

"github.com/mitchellh/hashstructure"
)
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewFactory(
}

// Create creates a module based on a config
func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false)
if err != nil {
Expand Down Expand Up @@ -114,6 +115,11 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
}, nil
}

func (f *Factory) CheckConfig(c *common.Config) error {
_, err := f.Create(pubpipeline.NewNilPipeline(), c, nil)
return err
}

func (p *inputsRunner) Start() {
// Load pipelines
if p.pipelineLoaderFactory != nil {
Expand Down Expand Up @@ -153,6 +159,7 @@ func (p *inputsRunner) Start() {
moduleList.Add(m)
}
}

func (p *inputsRunner) Stop() {
if p.pipelineCallbackID != uuid.Nil {
elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID)
Expand Down
8 changes: 7 additions & 1 deletion filebeat/fileset/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)

// SetupFactory is for loading module assets when running setup subcommand.
Expand All @@ -41,7 +42,7 @@ func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFac
}

// Create creates a new SetupCfgRunner to setup module configuration.
func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
m, err := NewModuleRegistry([]*common.Config{c}, sf.beatInfo, false)
if err != nil {
return nil, err
Expand All @@ -54,6 +55,11 @@ func (sf *SetupFactory) Create(_ beat.Pipeline, c *common.Config, _ *common.MapS
}, nil
}

func (sf *SetupFactory) CheckConfig(c *common.Config) error {
_, err := sf.Create(pubpipeline.NewNilPipeline(), c, nil)
return err
}

// SetupCfgRunner is for loading assets of modules.
type SetupCfgRunner struct {
moduleRegistry *ModuleRegistry
Expand Down
8 changes: 7 additions & 1 deletion filebeat/input/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)

// RunnerFactory is a factory for registrars
Expand All @@ -43,7 +44,7 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be

// Create creates a input based on a config
func (r *RunnerFactory) Create(
pipeline beat.Pipeline,
pipeline beat.PipelineConnector,
c *common.Config,
meta *common.MapStrPointer,
) (cfgfile.Runner, error) {
Expand All @@ -56,3 +57,8 @@ func (r *RunnerFactory) Create(

return p, nil
}

func (r *RunnerFactory) CheckConfig(cfg *common.Config) error {
_, err := r.Create(pipeline.NewNilPipeline(), cfg, nil)
return err
}
23 changes: 0 additions & 23 deletions heartbeat/autodiscover/include.go

This file was deleted.

9 changes: 1 addition & 8 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,7 @@ func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {

// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory)

ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover)
if err != nil {
return nil, err
}

return ad, nil
return autodiscover.NewAutodiscover("heartbeat", b.Publisher, bt.dynamicFactory, autodiscover.QueryConfig(), bt.config.Autodiscover)
}

// Stop stops the beat.
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package cmd
import (
"fmt"

_ "github.com/elastic/beats/v7/heartbeat/autodiscover"
"github.com/elastic/beats/v7/heartbeat/beater"

// include all heartbeat specific autodiscovery builders
_ "github.com/elastic/beats/v7/heartbeat/autodiscover/builder/hints"

// register default heartbeat monitors
_ "github.com/elastic/beats/v7/heartbeat/monitors/defaults"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
}

// Create makes a new Runner for a new monitor with the given Config.
func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches, meta)
return monitor, err
}
Expand Down

0 comments on commit 257fdfc

Please sign in to comment.