Skip to content

Commit

Permalink
Databind boy-scout (#247)
Browse files Browse the repository at this point in the history
* refactor: boyscoutt

* refactor: bound DataSources global fn to YAMLConfig method

* refactor: bound selectDiscoverer() global fn to YAMLConfig method & simplify conditionals

* refactor: bound selectGatherer global fn to varEntry method

* refactor: boyscoutt

* refactor: unify LoadConfig API

* refactor: reorg LoadConfig

* test: restore commented out tests
  • Loading branch information
varas committed Nov 24, 2020
1 parent 34da997 commit 9d10258
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 186 deletions.
50 changes: 28 additions & 22 deletions cmd/newrelic-infra/newrelic-infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,55 +120,61 @@ func main() {

timedLog.Debug("Loading configuration.")

parsedConfig, err := config.LoadConfigWithVerbose(configFile, verbose)
cfg, err := config.LoadConfig(configFile)
if err != nil {
alog.WithError(err).Error("can't load configuration file")
os.Exit(1)
}
if parsedConfig.Verbose == config.SmartVerboseLogging {
wlog.EnableSmartVerboseMode(parsedConfig.SmartVerboseModeEntryLimit)

// override YAML with CLI flags
if verbose > config.NonVerboseLogging {
cfg.Verbose = verbose
}
if cpuprofile != "" {
cfg.CPUProfile = cpuprofile
}
if memprofile != "" {
cfg.MemProfile = memprofile
}

if cfg.Verbose == config.SmartVerboseLogging {
wlog.EnableSmartVerboseMode(cfg.SmartVerboseModeEntryLimit)
}

if debug || parsedConfig.WebProfile {
if debug || cfg.WebProfile {
alog.Info("starting pprof server at http://localhost:6060")
go recover.FuncWithPanicHandler(recover.LogAndContinue, func() {
alog.WithError(http.ListenAndServe("localhost:6060", nil)).Warn("trying to open a connection in :6060")
})
}

if cpuprofile != "" {
parsedConfig.CPUProfile = cpuprofile
}
if memprofile != "" {
parsedConfig.MemProfile = memprofile
}
configureLogFormat(cfg.LogFormat)

// Set the log format.
configureLogFormat(parsedConfig)
// Send logging where it's supposed to go.
agentLogsToFile := configureLogRedirection(parsedConfig, memLog)
trace.EnableOn(parsedConfig.FeatureTraces)
agentLogsToFile := configureLogRedirection(cfg, memLog)

trace.EnableOn(cfg.FeatureTraces)

// Runtime config setup.
troubleCfg := config.NewTroubleshootCfg(parsedConfig.IsTroubleshootMode(), agentLogsToFile, parsedConfig.GetLogFile())
logFwCfg := config.NewLogForward(parsedConfig, troubleCfg)
troubleCfg := config.NewTroubleshootCfg(cfg.IsTroubleshootMode(), agentLogsToFile, cfg.GetLogFile())
logFwCfg := config.NewLogForward(cfg, troubleCfg)

// If parsedConfig.MaxProcs < 1, leave GOMAXPROCS to its previous value,
// which, if not set by the environment, is the number of processors that
// have been detected by the system.
// Note that if the `max_procs` option is unset, default value for
// parsedConfig.MaxProcs is 1.
runtime.GOMAXPROCS(parsedConfig.MaxProcs)
runtime.GOMAXPROCS(cfg.MaxProcs)

logConfig(parsedConfig)
logConfig(cfg)

err = initialize.OsProcess(parsedConfig)
err = initialize.OsProcess(cfg)
if err != nil {
alog.WithError(err).Error("Performing OS-specific process initialization...")
os.Exit(1)
}

err = initializeAgentAndRun(parsedConfig, logFwCfg)
err = initializeAgentAndRun(cfg, logFwCfg)
if err != nil {
timedLog.WithError(err).Error("Agent run returned an error.")
os.Exit(1)
Expand Down Expand Up @@ -393,8 +399,8 @@ func newInstancesLookup(cfg v4.Configuration) integration.InstancesLookup {
}

// configureLogFormat checks the config and sets the log format accordingly.
func configureLogFormat(cfg *config.Config) {
if cfg.LogFormat == config.LogFormatJSON {
func configureLogFormat(logFormat string) {
if logFormat == config.LogFormatJSON {
jsonFormatter := &logrus.JSONFormatter{
DataKey: "context",

Expand Down
4 changes: 2 additions & 2 deletions internal/integrations/v4/runner/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// - catches errors and logs them
// - manages the cancellation of tasks, as this should-be hot-reloaded
type Group struct {
discovery *databind.Sources
dSources *databind.Sources
integrations []integration.Definition
emitter emitter.Emitter
// for testing purposes, allows defining which action to take when an execution
Expand Down Expand Up @@ -55,7 +55,7 @@ func NewGroup(
// provided context
func (g *Group) Run(ctx context.Context) (hasStartedAnyOHI bool) {
for _, integr := range g.integrations {
go NewRunner(integr, g.emitter, g.discovery, g.handleErrorsProvide, g.cmdReqHandle).Run(ctx, nil)
go NewRunner(integr, g.emitter, g.dSources, g.handleErrorsProvide, g.cmdReqHandle).Run(ctx, nil)
hasStartedAnyOHI = true
}

Expand Down
5 changes: 2 additions & 3 deletions internal/integrations/v4/runner/group_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package runner

import (
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/databind/pkg/databind"
"github.com/newrelic/infrastructure-agent/pkg/integrations/cmdrequest"
config2 "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/config"
)
Expand All @@ -18,13 +17,13 @@ type LoadFn func(dr integration.InstancesLookup, passthroughEnv []string, cfgPat
// disabled integrations.
func NewLoadFn(cfg config2.YAML, agentAndCCFeatures *Features) LoadFn {
return func(il integration.InstancesLookup, passthroughEnv []string, cfgPath string, cmdReqHandle cmdrequest.HandleFn) (g Group, c FeaturesCache, err error) {
discovery, err := databind.DataSources(&cfg.Databind)
dSources, err := cfg.Databind.DataSources()
if err != nil {
return
}

g = Group{
discovery: discovery,
dSources: dSources,
cmdReqHandle: cmdReqHandle,
}
c = make(FeaturesCache)
Expand Down
2 changes: 1 addition & 1 deletion internal/integrations/v4/runner/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ discovery:
te := &testemit.RecordEmitter{}
group := Group{
emitter: te,
discovery: discovery,
dSources: discovery,
integrations: []integration.Definition{integr},
handleErrorsProvide: func() runnerErrorHandler {
return func(_ context.Context, _ <-chan error) {}
Expand Down
14 changes: 7 additions & 7 deletions internal/integrations/v4/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type logParser func(line string) (fields logFields)
type runner struct {
emitter emitter.Emitter
handleCmdReq cmdrequest.HandleFn
discovery *databind.Sources
dSources *databind.Sources
log log.Entry
definition integration.Definition
handleErrors func(context.Context, <-chan error) // by default, runner.logErrors. Replaceable for testing purposes
Expand All @@ -59,14 +59,14 @@ type runner struct {
func NewRunner(
intDef integration.Definition,
emitter emitter.Emitter,
discoverySources *databind.Sources,
dSources *databind.Sources,
handleErrorsProvide func() runnerErrorHandler,
cmdReqHandle cmdrequest.HandleFn,
) *runner {
r := &runner{
emitter: emitter,
handleCmdReq: cmdReqHandle,
discovery: discoverySources,
dSources: dSources,
definition: intDef,
heartBeatFunc: func() {},
stderrParser: parseLogrusFields,
Expand Down Expand Up @@ -115,13 +115,13 @@ func LogFields(def integration.Definition) logrus.Fields {
return fields
}

// applies discovery and returns the discovered values, if any.
// applies dSources and returns the discovered values, if any.
func (r *runner) applyDiscovery() (*databind.Values, error) {
if r.discovery == nil {
if r.dSources == nil {
// nothing is discovered, but the integration can run (with the default configuration)
return nil, nil
}
if v, err := databind.Fetch(r.discovery); err != nil {
if v, err := databind.Fetch(r.dSources); err != nil {
return nil, err
} else {
return &v, nil
Expand All @@ -144,7 +144,7 @@ func (r *runner) heartBeat() {
r.heartBeatFunc()
}

// execute the integration and wait for all the possible instances (resulting of multiple discovery matches)
// execute the integration and wait for all the possible instances (resulting of multiple dSources matches)
// to finish
// For long-time running integrations, avoids starting the next
// discover-execute cycle until all the parallel processes have ended
Expand Down
9 changes: 0 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,10 +1194,6 @@ func (c *Config) toLogInfo() (map[string]string, error) {
}

func LoadConfig(configFile string) (*Config, error) {
return LoadConfigWithVerbose(configFile, 0)
}

func LoadConfigWithVerbose(configFile string, verbose int) (*Config, error) {
var filesToCheck []string
if configFile != "" {
filesToCheck = append(filesToCheck, configFile)
Expand All @@ -1215,11 +1211,6 @@ func LoadConfigWithVerbose(configFile string, verbose int) (*Config, error) {

cfg.RunMode, cfg.AgentUser, cfg.ExecutablePath = runtimeValues()

// override verbose when enabled from CLI flag
if verbose > NonVerboseLogging {
cfg.Verbose = verbose
}

// Move any other post processing steps that clean up or announce settings to be
// after both config file and env variable processing is complete. Need to review each of the items
// above and place each one at the bottom of this ordering
Expand Down
2 changes: 1 addition & 1 deletion pkg/databind/pkg/databind/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewValues(vars data.Map, discoveries ...discovery.Discovery) Values {
}
}

// NewDiscovery returns an instance of discovery.Discovery which is needed for populating Values
// NewDiscovery returns an instance of discovery.Discovery aimed to be used for testing as prod should come from unmarshalling.
func NewDiscovery(variables data.Map, metricAnnotations data.InterfaceMap, entityRewrites []data.EntityRewrite) discovery.Discovery {
return discovery.Discovery{
Variables: variables,
Expand Down

0 comments on commit 9d10258

Please sign in to comment.