Skip to content

Commit

Permalink
Apply Autodiscovery dynamic fields in autoreloading (elastic#19135)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored and melchiormoulin committed Oct 14, 2020
1 parent f0569a0 commit f3f6980
Show file tree
Hide file tree
Showing 42 changed files with 232 additions and 169 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The Elasticsearch client settings expect the API key to be raw (not base64-encoded). {issue}18939[18939] {pull}18945[18945]
- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114]
- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114]
- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135]

==== Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *crawler) startInput(
return fmt.Errorf("input with same ID already exists: %v", id)
}

runner, err := c.inputsFactory.Create(pipeline, config, nil)
runner, err := c.inputsFactory.Create(pipeline, config)
if err != nil {
return fmt.Errorf("Error while initializing input: %+v", err)
}
Expand Down
13 changes: 4 additions & 9 deletions filebeat/channel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type onCreateFactory struct {
create onCreateWrapper
}

type onCreateWrapper func(cfgfile.RunnerFactory, beat.PipelineConnector, *common.Config, *common.MapStrPointer) (cfgfile.Runner, error)
type onCreateWrapper func(cfgfile.RunnerFactory, beat.PipelineConnector, *common.Config) (cfgfile.Runner, error)

// commonInputConfig defines common input settings
// for the publisher pipeline.
Expand Down Expand Up @@ -63,12 +63,8 @@ func (f *onCreateFactory) CheckConfig(cfg *common.Config) error {
return f.factory.CheckConfig(cfg)
}

func (f *onCreateFactory) Create(
pipeline beat.PipelineConnector,
cfg *common.Config,
meta *common.MapStrPointer,
) (cfgfile.Runner, error) {
return f.create(f.factory, pipeline, cfg, meta)
func (f *onCreateFactory) Create(pipeline beat.PipelineConnector, cfg *common.Config) (cfgfile.Runner, error) {
return f.create(f.factory, pipeline, cfg)
}

// RunnerFactoryWithCommonInputSettings wraps a runner factory, such that all runners
Expand All @@ -93,14 +89,13 @@ func RunnerFactoryWithCommonInputSettings(info beat.Info, f cfgfile.RunnerFactor
f cfgfile.RunnerFactory,
pipeline beat.PipelineConnector,
cfg *common.Config,
meta *common.MapStrPointer,
) (runner cfgfile.Runner, err error) {
pipeline, err = withClientConfig(info, pipeline, cfg)
if err != nil {
return nil, err
}

return f.Create(pipeline, cfg, meta)
return f.Create(pipeline, cfg)
})
}

Expand Down
6 changes: 3 additions & 3 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewFactory(
}

// Create creates a module based on a config
func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false)
if err != nil {
Expand All @@ -98,7 +98,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *commo

inputs := make([]cfgfile.Runner, len(pConfigs))
for i, pConfig := range pConfigs {
inputs[i], err = f.inputFactory.Create(p, pConfig, meta)
inputs[i], err = f.inputFactory.Create(p, pConfig)
if err != nil {
logp.Err("Error creating input: %s", err)
return nil, err
Expand All @@ -116,7 +116,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *commo
}

func (f *Factory) CheckConfig(c *common.Config) error {
_, err := f.Create(pubpipeline.NewNilPipeline(), c, nil)
_, err := f.Create(pubpipeline.NewNilPipeline(), c)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions filebeat/fileset/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFac
}

// Create creates a new SetupCfgRunner to setup module configuration.
func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) {
func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) {
m, err := NewModuleRegistry([]*common.Config{c}, sf.beatInfo, false)
if err != nil {
return nil, err
Expand All @@ -56,7 +56,7 @@ func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *co
}

func (sf *SetupFactory) CheckConfig(c *common.Config) error {
_, err := sf.Create(pubpipeline.NewNilPipeline(), c, nil)
_, err := sf.Create(pubpipeline.NewNilPipeline(), c)
return err
}

Expand Down
10 changes: 4 additions & 6 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func New(
connector channel.Connector,
beatDone chan struct{},
states []file.State,
dynFields *common.MapStrPointer,
) (*Runner, error) {
input := &Runner{
config: defaultConfig,
Expand All @@ -82,11 +81,10 @@ func New(
}

context := Context{
States: states,
Done: input.done,
BeatDone: input.beatDone,
DynamicFields: dynFields,
Meta: nil,
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
}
var ipt Input
ipt, err = f(conf, connector, context)
Expand Down
3 changes: 0 additions & 3 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ func NewInput(
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
ACKEvents: func(events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,7 @@ func NewInput(
// The outlet generated here is the underlying outlet, only closed
// once all workers have been shut down.
// For state updates and events, separate sub-outlets will be used.
out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/mqtt/client_mocked.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ type mockedConnector struct {

var _ channel.Connector = new(mockedConnector)

func (m *mockedConnector) Connect(*common.Config) (channel.Outleter, error) {
panic("implement me")
func (m *mockedConnector) Connect(c *common.Config) (channel.Outleter, error) {
return m.ConnectWith(c, beat.ClientConfig{})
}

func (m *mockedConnector) ConnectWith(*common.Config, beat.ClientConfig) (channel.Outleter, error) {
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/mqtt/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ func newInput(
return nil, errors.Wrap(err, "reading mqtt input config")
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
})
out, err := connector.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
7 changes: 1 addition & 6 deletions filebeat/input/redis/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -59,11 +58,7 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con
return nil, err
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := connector.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ import (
)

type Context struct {
States []file.State
Done chan struct{}
BeatDone chan struct{}
DynamicFields *common.MapStrPointer
Meta map[string]string
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
}

// Factory is used to register functions creating new Input instances.
Expand Down
5 changes: 2 additions & 3 deletions filebeat/input/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be
func (r *RunnerFactory) Create(
pipeline beat.PipelineConnector,
c *common.Config,
meta *common.MapStrPointer,
) (cfgfile.Runner, error) {
connector := r.outlet(pipeline)
p, err := New(c, connector, r.beatDone, r.registrar.GetStates(), meta)
p, err := New(c, connector, r.beatDone, r.registrar.GetStates())
if err != nil {
// In case of error with loading state, input is still returned
return p, err
Expand All @@ -59,6 +58,6 @@ func (r *RunnerFactory) Create(
}

func (r *RunnerFactory) CheckConfig(cfg *common.Config) error {
_, err := r.Create(pipeline.NewNilPipeline(), cfg, nil)
_, err := r.Create(pipeline.NewNilPipeline(), cfg)
return err
}
7 changes: 1 addition & 6 deletions filebeat/input/stdin/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/filebeat/input/log"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand All @@ -49,11 +48,7 @@ type Input struct {
// NewInput creates a new stdin input
// This input contains one harvester which is reading from stdin
func NewInput(cfg *common.Config, outlet channel.Connector, context input.Context) (input.Input, error) {
out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/syslog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ func NewInput(

log := logp.NewLogger("syslog")

out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ func NewInput(
context input.Context,
) (input.Input, error) {

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := connector.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ func NewInput(
context input.Context,
) (input.Input, error) {

out, err := outlet.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/unix/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ func NewInput(
) (input.Input, error) {
cfgwarn.Beta("Unix socket support is beta.")

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: context.DynamicFields,
},
})
out, err := connector.Connect(cfg)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
factory := monitors.NewFactory(bt.scheduler, true)

for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg, nil)
created, err := factory.Create(b.Publisher, cfg)
if err != nil {
return errors.Wrap(err, "could not create monitor")
}
Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
}

// Create makes a new Runner for a new monitor with the given Config.
func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches, meta)
func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) {
monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches)
return monitor, err
}

Expand Down
10 changes: 3 additions & 7 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ type Monitor struct {

// stats is the countersRecorder used to record lifecycle events
// for global metrics + telemetry
stats registryRecorder
factoryMetadata *common.MapStrPointer
stats registryRecorder
}

// String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe
Expand All @@ -73,7 +72,7 @@ func (m *Monitor) String() string {
}

func checkMonitorConfig(config *common.Config, registrar *pluginsReg, allowWatches bool) error {
m, err := newMonitor(config, registrar, nil, nil, allowWatches, nil)
m, err := newMonitor(config, registrar, nil, nil, allowWatches)
if m != nil {
m.Stop() // Stop the monitor to free up the ID from uniqueness checks
}
Expand Down Expand Up @@ -101,9 +100,8 @@ func newMonitor(
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
factoryMetadata *common.MapStrPointer,
) (*Monitor, error) {
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches, factoryMetadata)
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches)
if m != nil && err != nil {
m.Stop()
}
Expand All @@ -118,7 +116,6 @@ func newMonitorUnsafe(
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
factoryMetadata *common.MapStrPointer,
) (*Monitor, error) {
// Extract just the Id, Type, and Enabled fields from the config
// We'll parse things more precisely later once we know what exact type of
Expand All @@ -145,7 +142,6 @@ func newMonitorUnsafe(
internalsMtx: sync.Mutex{},
config: config,
stats: monitorPlugin.stats,
factoryMetadata: factoryMetadata,
}

if m.id != "" {
Expand Down
Loading

0 comments on commit f3f6980

Please sign in to comment.