Skip to content

Commit

Permalink
feat: integrations run by cmd channel submit event when finished cont…
Browse files Browse the repository at this point in the history
…aining exit code
  • Loading branch information
varas committed Dec 3, 2020
1 parent 79cad20 commit 5bc79f9
Show file tree
Hide file tree
Showing 22 changed files with 274 additions and 178 deletions.
9 changes: 5 additions & 4 deletions cmd/newrelic-infra/newrelic-infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/files"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/v3legacy"
"github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable"
"github.com/newrelic/infrastructure-agent/pkg/integrations/track"
"github.com/sirupsen/logrus"

"github.com/newrelic/infrastructure-agent/cmd/newrelic-infra/initialize"
Expand Down Expand Up @@ -293,15 +293,16 @@ func initializeAgentAndRun(c *config.Config, logFwCfg config.LogForward) error {
// queues integration run requests
definitionQ := make(chan integration.Definition, 100)

// track stoppable integrations
tracker := stoppable.NewTracker()

var dmEmitter dm.Emitter
if enabled, exists := ffManager.GetFeatureFlag(fflag.FlagDMRegisterEnable); exists && enabled {
dmEmitter = dm.NewEmitter(agt.GetContext(), dmSender, registerClient)
} else {
dmEmitter = dm.NewNonRegisterEmitter(agt.GetContext(), dmSender)
}

// track stoppable integrations
tracker := track.NewTracker(dmEmitter)

integrationEmitter := emitter.NewIntegrationEmittor(agt, dmEmitter, ffManager)
integrationManager := v4.NewManager(integrationCfg, integrationEmitter, il, definitionQ, tracker)

Expand Down
8 changes: 6 additions & 2 deletions internal/agent/cmdchannel/runintegration/runintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import (
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
"github.com/newrelic/infrastructure-agent/pkg/fwrequest"
ctx2 "github.com/newrelic/infrastructure-agent/pkg/integrations/track/ctx"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/config"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/protocol"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/newrelic/infrastructure-agent/pkg/trace"
)

const cmdName = "run_integration"

// Errors
var (
ErrNoIntName = errors.New("missing required \"integration_name\"")
Expand Down Expand Up @@ -56,7 +59,8 @@ func NewHandler(definitionQ chan<- integration.Definition, il integration.Instan
LogDecorated(logger, cmd, args).WithError(err).Warn("cannot create handler for cmd channel run_integration requests")
return
}
def.CmdChannelHash = args.Hash()
trackCtx := ctx2.NewTrackCtx(cmdName, args.Hash(), args.IntegrationName, args.IntegrationArgs)
def.CmdChanReq = &trackCtx

definitionQ <- def

Expand All @@ -67,7 +71,7 @@ func NewHandler(definitionQ chan<- integration.Definition, il integration.Instan
return
}

return cmdchannel.NewCmdHandler("run_integration", handleF)
return cmdchannel.NewCmdHandler(cmdName, handleF)
}

func NotifyPlatform(dmEmitter dm.Emitter, def integration.Definition, ev protocol.EventData) {
Expand Down
6 changes: 3 additions & 3 deletions internal/agent/cmdchannel/stopintegration/stopintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel/runintegration"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
"github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable"
"github.com/newrelic/infrastructure-agent/pkg/integrations/track"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/newrelic/infrastructure-agent/pkg/trace"
Expand All @@ -24,7 +24,7 @@ const (
)

// NewHandler creates a cmd-channel handler for stop-integration requests.
func NewHandler(tracker *stoppable.Tracker, il integration.InstancesLookup, dmEmitter dm.Emitter, l log.Entry) *cmdchannel.CmdHandler {
func NewHandler(tracker *track.Tracker, il integration.InstancesLookup, dmEmitter dm.Emitter, l log.Entry) *cmdchannel.CmdHandler {
handleF := func(ctx context.Context, cmd commandapi.Command, initialFetch bool) (err error) {
if runtime.GOOS == "windows" {
return cmdchannel.ErrOSNotSupported
Expand Down Expand Up @@ -105,7 +105,7 @@ func notifyPlatform(dmEmitter dm.Emitter, il integration.InstancesLookup, cmd co
return err
}

def.CmdChannelHash = args.Hash()
def.CmdChanReq.CmdChannelCmdHash = args.Hash()
ev := cmd.Event(args.IntegrationName, args.IntegrationArgs)
ev["cmd_stop_hash"] = args.Hash()
ev["cmd_stop_mode"] = stopModeUsed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/agent/cmdchannel/runintegration"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/integration"
"github.com/newrelic/infrastructure-agent/pkg/backend/commandapi"
"github.com/newrelic/infrastructure-agent/pkg/integrations/stoppable"
"github.com/newrelic/infrastructure-agent/pkg/integrations/track"
dm "github.com/newrelic/infrastructure-agent/pkg/integrations/v4/dm/testutils"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/shirou/gopsutil/process"
Expand All @@ -30,13 +30,14 @@ func TestHandle_returnsErrorOnMissingName(t *testing.T) {
t.Skip("CC stop-intergation is not supported on Windows")
}

h := NewHandler(stoppable.NewTracker(), integration.ErrLookup, dm.NewNoopEmitter(), l)
h := NewHandler(track.NewTracker(nil), integration.ErrLookup, dm.NewNoopEmitter(), l)

cmdArgsMissingPID := commandapi.Command{
Args: []byte(`{ "integration_args": ["foo"] }`),
}

err := h.Handle(context.Background(), cmdArgsMissingPID, false)
require.Error(t, err)
assert.Equal(t, cmdchannel.NewArgsErr(runintegration.ErrNoIntName).Error(), err.Error())
}

Expand All @@ -46,12 +47,12 @@ func TestHandle_signalStopProcess(t *testing.T) {
}

// Given a handler with an stoppables tracker
tracker := stoppable.NewTracker()
tracker := track.NewTracker(nil)
h := NewHandler(tracker, integration.ErrLookup, dm.NewNoopEmitter(), l)

// When a process context is tracked
ctx := context.Background()
ctx, pidC := tracker.Track(ctx, "foo#")
ctx, pidC := tracker.Track(ctx, "foo#", nil)

proc := exec.CommandContext(ctx, "sleep", "5")

Expand Down
7 changes: 6 additions & 1 deletion internal/integrations/v4/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func FromCmdSlice(cmd []string, cfg *Config) Executor {
// The executed process can be cancelled via the provided Context.
// When writable PID channel is provided, generated PID will be written, so process could be signaled by 3rd parties.
// When the process ends, all the channels are closed.
func (r *Executor) Execute(ctx context.Context, pidChan chan<- int) OutputReceive {
func (r *Executor) Execute(ctx context.Context, pidChan, exitCodeCh chan<- int) OutputReceive {
out, receiver := NewOutput()
commandCtx, cancelCommand := context.WithCancel(ctx)

Expand Down Expand Up @@ -99,6 +99,11 @@ func (r *Executor) Execute(ctx context.Context, pidChan chan<- int) OutputReceiv
<-commandCtx.Done()
if err := cmd.Wait(); err != nil {
out.Errors <- err
if exitCodeCh != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCodeCh <- exitError.ExitCode()
}
}
}
allOutputForwarded.Wait() // waiting again to avoid closing output before the data is received during cancellation
out.Close()
Expand Down
22 changes: 11 additions & 11 deletions internal/integrations/v4/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestRunnable_CLI_Execute(t *testing.T) {
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmd), execConfig(t))

// WHEN it is executed
to := r.Execute(context.Background(), nil)
to := r.Execute(context.Background(), nil, nil)

// THEN no errors are returned
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
Expand All @@ -51,7 +51,7 @@ func TestRunnable_CLI_Execute_with_spaces(t *testing.T) {
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmdWithSpace), execConfig(t))

// WHEN it is executed
to := r.Execute(context.Background(), nil)
to := r.Execute(context.Background(), nil, nil)

// THEN no errors are returned
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
Expand All @@ -76,7 +76,7 @@ func TestRunnable_Execute_WithUser(t *testing.T) {
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmd), cfg)

// WHEN it is executed
to := r.Execute(context.Background(), nil)
to := r.Execute(context.Background(), nil, nil)

// THEN no errors are returned
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
Expand All @@ -95,7 +95,7 @@ func TestRunnable_Execute_WithArgs(t *testing.T) {
cfg := execConfig(t)
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmd, "world"), cfg)

to := r.Execute(context.Background(), nil)
to := r.Execute(context.Background(), nil, nil)
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(to.Stdout))
assert.Equal(t, "-world", testhelp.ChannelRead(to.Stdout))
Expand All @@ -113,7 +113,7 @@ func TestRunnable_Execute_WithArgs_WithEnv(t *testing.T) {
cfg.Environment = map[string]string{"PREFIX": "hello"}
r := FromCmdSlice(testhelp.Command(fixtures.BasicCmd, "world"), cfg)

to := r.Execute(context.Background(), nil)
to := r.Execute(context.Background(), nil, nil)
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(to.Stdout))
assert.Equal(t, "hello-world", testhelp.ChannelRead(to.Stdout))
Expand All @@ -127,7 +127,7 @@ func TestRunnable_Execute_Error(t *testing.T) {
r := FromCmdSlice(testhelp.Command(fixtures.ErrorCmd), execConfig(t))

// WHEN it is executed
to := r.Execute(context.Background(), nil)
to := r.Execute(context.Background(), nil, nil)

// THEN an error is returned
assert.Error(t, testhelp.ChannelErrClosed(to.Errors))
Expand All @@ -149,7 +149,7 @@ func TestRunnable_Execute_Blocked(t *testing.T) {
r := FromCmdSlice(testhelp.Command(fixtures.BlockedCmd), cfg)

// THAT is normally working
to := r.Execute(ctx, nil)
to := r.Execute(ctx, nil, nil)
assert.Equal(t, "starting", testhelp.ChannelRead(to.Stdout))
assert.Error(t, testhelp.ChannelErrClosedTimeout(to.Errors, 100*time.Millisecond))

Expand All @@ -176,7 +176,7 @@ func TestNoRaces(t *testing.T) {
for i := 0; i < 100; i++ {
ctx, cancel := context.WithCancel(context.Background())
cmd := FromCmdSlice([]string{"echo", hugeLine}, &Config{})
go cmd.Execute(ctx, nil)
go cmd.Execute(ctx, nil, nil)
cancel()
}
}
Expand All @@ -191,7 +191,7 @@ func TestRunnable_Execute_Verbose(t *testing.T) {
ctx := context.WithValue(context.Background(), constants.EnableVerbose, 1)

// WHEN it is executed
to := r.Execute(ctx, nil)
to := r.Execute(ctx, nil, nil)

// THEN no errors are returned
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
Expand All @@ -213,7 +213,7 @@ func TestRunnable_Execute_VerboseFalse(t *testing.T) {
ctx := context.WithValue(context.Background(), constants.EnableVerbose, 0)

// WHEN it is executed
to := r.Execute(ctx, nil)
to := r.Execute(ctx, nil, nil)

// THEN no errors are returned
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
Expand All @@ -232,7 +232,7 @@ func TestRunnable_Execute_NoVerboseSet(t *testing.T) {
r := FromCmdSlice(testhelp.Command(fixtures.IntegrationVerboseScript), execConfig(t))

// WHEN it is executed
to := r.Execute(context.Background(), nil)
to := r.Execute(context.Background(), nil, nil)

// THEN no errors are returned
assert.NoError(t, testhelp.ChannelErrClosed(to.Errors))
Expand Down
13 changes: 9 additions & 4 deletions internal/integrations/v4/integration/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/executor"
"github.com/newrelic/infrastructure-agent/internal/integrations/v4/when"
"github.com/newrelic/infrastructure-agent/pkg/databind/pkg/databind"
"github.com/newrelic/infrastructure-agent/pkg/integrations/track/ctx"
"github.com/newrelic/infrastructure-agent/pkg/integrations/v4/config"
"github.com/newrelic/infrastructure-agent/pkg/log"
"github.com/newrelic/infrastructure-agent/pkg/plugins/ids"
Expand Down Expand Up @@ -38,7 +39,7 @@ type Definition struct {
ConfigTemplate []byte // external configuration file, if provided
InventorySource ids.PluginID
WhenConditions []when.Condition
CmdChannelHash string // not empty: generated by command-channel "run/stop_integration", contains name+args hash
CmdChanReq *ctx.CmdChannelRequest // not empty: generated by command-channel "run/stop_integration", contains name+args hash
runnable executor.Executor
newTempFile func(template []byte) (string, error)
}
Expand All @@ -47,6 +48,10 @@ func (d *Definition) TimeoutEnabled() bool {
return d.Timeout > 0
}

func (d *Definition) SingleRun() bool {
return d.Interval == 0
}

// PluginID returns inventory plugin ID
func (d *Definition) PluginID(integrationName string) ids.PluginID {
// user specified an inventory source has precedence
Expand All @@ -62,13 +67,13 @@ func (d *Definition) PluginID(integrationName string) ids.PluginID {
return ids.NewDefaultInventoryPluginID(d.Name)
}

func (d *Definition) Run(ctx context.Context, bind *databind.Values, pidC chan<- int) ([]Output, error) {
func (d *Definition) Run(ctx context.Context, bind *databind.Values, pidC, exitCodeC chan<- int) ([]Output, error) {
logger := elog.WithField("integration_name", d.Name)
logger.Debug("Running task.")
// no discovery data: execute a single instance
if bind == nil {
logger.Debug("Running single instance.")
return []Output{{Receive: d.runnable.Execute(ctx, pidC)}}, nil
return []Output{{Receive: d.runnable.Execute(ctx, pidC, exitCodeC)}}, nil
}

// apply discovered data to run multiple instances
Expand Down Expand Up @@ -138,7 +143,7 @@ func (d *Definition) Run(ctx context.Context, bind *databind.Values, pidC chan<-
}

logger.Debug("Executing task.")
taskOutput := dc.Executor.Execute(ctx, nil)
taskOutput := dc.Executor.Execute(ctx, nil, nil)
if removeFile != nil {
go removeFile(taskOutput.Done)
}
Expand Down
18 changes: 9 additions & 9 deletions internal/integrations/v4/integration/definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestRun(t *testing.T) {
require.NoError(t, err)

// WHEN it is executed
outs, err := def.Run(context.Background(), nil, nil)
outs, err := def.Run(context.Background(), nil, nil, nil)
require.NoError(t, err)
require.Len(t, outs, 1)

Expand All @@ -58,7 +58,7 @@ func TestRun_NoDiscovery(t *testing.T) {
require.NoError(t, err)

// WHEN the def is executed with no discovery matches
outs, err := def.Run(context.Background(), &databind.Values{}, nil)
outs, err := def.Run(context.Background(), &databind.Values{}, nil, nil)
require.NoError(t, err)

// THEN no tasks are executed
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestRun_Discovery(t *testing.T) {
databind.NewDiscovery(data.Map{"prefix": "bye", "argument": "people"}, data.InterfaceMap{"special": false, "label.two": "two"}, nil),
databind.NewDiscovery(data.Map{"prefix": "kon", "argument": "nichiwa"}, data.InterfaceMap{"other_tag": "true", "label.tree": "three"}, nil),
)
outs, err := def.Run(context.Background(), &vals, nil)
outs, err := def.Run(context.Background(), &vals, nil, nil)
require.NoError(t, err)
require.Len(t, outs, 3)

Expand Down Expand Up @@ -122,7 +122,7 @@ func TestRun_CmdSlice(t *testing.T) {
require.NoError(t, err)

// WHEN the def is executed
outs, err := def.Run(context.Background(), &databind.Values{}, nil)
outs, err := def.Run(context.Background(), &databind.Values{}, nil, nil)
require.NoError(t, err)
require.Len(t, outs, 1)

Expand Down Expand Up @@ -150,7 +150,7 @@ func TestRun_CancelPropagation(t *testing.T) {
)

parentContext, cancel := context.WithCancel(context.Background())
outs, err := def.Run(parentContext, &vals, nil)
outs, err := def.Run(parentContext, &vals, nil, nil)
require.NoError(t, err)
require.Len(t, outs, 3)

Expand Down Expand Up @@ -194,7 +194,7 @@ func TestRun_CancelPropagationWithoutReads(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

outs, err := def.Run(ctx, nil, nil)
outs, err := def.Run(ctx, nil, nil, nil)
require.NoError(t, err)
require.Len(t, outs, 1)

Expand Down Expand Up @@ -236,7 +236,7 @@ func TestRun_Cancel_Partial(t *testing.T) {
)

parentContext, cancel := context.WithCancel(context.Background())
outs, err := def.Run(parentContext, &vals, nil)
outs, err := def.Run(parentContext, &vals, nil, nil)
require.NoError(t, err)
require.Len(t, outs, 2)

Expand Down Expand Up @@ -278,7 +278,7 @@ func TestRun_Directory(t *testing.T) {
require.NoError(t, err)

// WHEN it is executed
outs, err := def.Run(context.Background(), nil, nil)
outs, err := def.Run(context.Background(), nil, nil, nil)
require.NoError(t, err)
require.Len(t, outs, 1)

Expand Down Expand Up @@ -321,7 +321,7 @@ func TestRun_RemoveExternalConfig(t *testing.T) {
}
return path, err
}
outputs, err := def.Run(ctx, &vals, nil)
outputs, err := def.Run(ctx, &vals, nil, nil)
require.NoError(t, err)
require.Len(t, outputs, 2)
require.Len(t, createdConfigs, 2)
Expand Down
Loading

0 comments on commit 5bc79f9

Please sign in to comment.