Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: integrations v4 naming #112

Merged
merged 1 commit into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,18 @@ func (d *Definition) LabelsAndExtraAnnotations(extraLabels data.Map) (map[string

return labels, extraAnnotations
}
func (d *Definition) Run(ctx context.Context, bind *databind.Values) ([]IntegrationOutput, error) {

func (d *Definition) Run(ctx context.Context, bind *databind.Values) ([]Output, error) {
logger := elog.WithField("integration_name", d.Name)
logger.Debug("Running task.")
// no discovery data: execute a single instance
if bind == nil {
logger.Debug("Running single instance.")
return []IntegrationOutput{{Output: d.runnable.Execute(ctx)}}, nil
return []Output{{Receive: d.runnable.Execute(ctx)}}, nil
}

// apply discovered data to run multiple instances
var tasksOutput []IntegrationOutput
var tasksOutput []Output

// merges both runnable configuration and config template (if any) to avoid having different
// discoverable
Expand All @@ -115,6 +117,7 @@ func (d *Definition) Run(ctx context.Context, bind *databind.Values) ([]Integrat
if err != nil {
return nil, err
}

logger.Debug("Running through all discovery matches.")
for _, ir := range matches {
dc, ok := ir.Variables.(discoveredConfig)
Expand Down Expand Up @@ -162,7 +165,7 @@ func (d *Definition) Run(ctx context.Context, bind *databind.Values) ([]Integrat
if removeFile != nil {
go removeFile(taskOutput.Done)
}
tasksOutput = append(tasksOutput, IntegrationOutput{Output: taskOutput, ExtraLabels: ir.MetricAnnotations, EntityRewrite: ir.EntityRewrites})
tasksOutput = append(tasksOutput, Output{Receive: taskOutput, ExtraLabels: ir.MetricAnnotations, EntityRewrite: ir.EntityRewrites})
}
return tasksOutput, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func TestExec(t *testing.T) {
require.Len(t, outs, 1)

// THEN returns normally, forwarding the Standard Output&error
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Output.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Output.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[0].Output.Stderr))
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Receive.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Receive.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[0].Receive.Stderr))
}

func TestExec_NoDiscovery(t *testing.T) {
Expand Down Expand Up @@ -102,22 +102,22 @@ func TestExec_Discovery(t *testing.T) {
require.Len(t, outs, 3)

// THEN the tasks are executed with the given configuration
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Output.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Output.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[0].Output.Stderr))
assert.Equal(t, "hello-world", testhelp.ChannelRead(outs[0].Output.Stdout))
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Receive.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Receive.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[0].Receive.Stderr))
assert.Equal(t, "hello-world", testhelp.ChannelRead(outs[0].Receive.Stdout))
assert.Equal(t, data.Map{"label.one": "one", "special": "true"}, outs[0].ExtraLabels)

assert.NoError(t, testhelp.ChannelErrClosed(outs[1].Output.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[1].Output.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[1].Output.Stderr))
assert.Equal(t, "bye-people", testhelp.ChannelRead(outs[1].Output.Stdout))
assert.NoError(t, testhelp.ChannelErrClosed(outs[1].Receive.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[1].Receive.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[1].Receive.Stderr))
assert.Equal(t, "bye-people", testhelp.ChannelRead(outs[1].Receive.Stdout))
assert.Equal(t, data.Map{"label.two": "two", "special": "false"}, outs[1].ExtraLabels)

assert.NoError(t, testhelp.ChannelErrClosed(outs[2].Output.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[2].Output.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[2].Output.Stderr))
assert.Equal(t, "kon-nichiwa", testhelp.ChannelRead(outs[2].Output.Stdout))
assert.NoError(t, testhelp.ChannelErrClosed(outs[2].Receive.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[2].Receive.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[2].Receive.Stderr))
assert.Equal(t, "kon-nichiwa", testhelp.ChannelRead(outs[2].Receive.Stdout))
assert.Equal(t, data.Map{"label.tree": "three", "other_tag": "true"}, outs[2].ExtraLabels)
}

Expand All @@ -137,10 +137,10 @@ func TestExec_CmdSlice(t *testing.T) {
require.Len(t, outs, 1)

// THEN the tasks are executed with the given configuration
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Output.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Output.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[0].Output.Stderr))
assert.Equal(t, "-argument", testhelp.ChannelRead(outs[0].Output.Stdout))
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Receive.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Receive.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[0].Receive.Stderr))
assert.Equal(t, "-argument", testhelp.ChannelRead(outs[0].Receive.Stdout))
}

func TestExec_CancelPropagation(t *testing.T) {
Expand All @@ -166,8 +166,8 @@ func TestExec_CancelPropagation(t *testing.T) {

// WHEN the tasks are running
for _, out := range outs {
assert.Equal(t, "starting", testhelp.ChannelRead(out.Output.Stdout))
assert.Error(t, testhelp.ChannelErrClosedTimeout(out.Output.Errors, 100*time.Millisecond))
assert.Equal(t, "starting", testhelp.ChannelRead(out.Receive.Stdout))
assert.Error(t, testhelp.ChannelErrClosedTimeout(out.Receive.Errors, 100*time.Millisecond))
}

// AND they are cancelled
Expand All @@ -176,18 +176,18 @@ func TestExec_CancelPropagation(t *testing.T) {
// THEN all the subtasks have reported errors
var openCh bool
for _, out := range outs {
err := testhelp.ChannelErrClosed(out.Output.Errors)
err := testhelp.ChannelErrClosed(out.Receive.Errors)
assert.Error(t, err)
assert.NotEqual(t, err, testhelp.ErrChannelTimeout)

// AND channels are closed
_, openCh = <-out.Output.Stdout
_, openCh = <-out.Receive.Stdout
assert.False(t, openCh)
_, openCh = <-out.Output.Stderr
_, openCh = <-out.Receive.Stderr
assert.False(t, openCh)

// AND ctx has been canceled
_, openCh = <-out.Output.Done
_, openCh = <-out.Receive.Done
assert.False(t, openCh)
}
}
Expand All @@ -214,18 +214,18 @@ func TestExec_CancelPropagationWithoutReads(t *testing.T) {
// THEN all the subtasks have reported errors
var openCh bool
for _, out := range outs {
err := testhelp.ChannelErrClosed(out.Output.Errors)
err := testhelp.ChannelErrClosed(out.Receive.Errors)
assert.Error(t, err)
assert.NotEqual(t, err, testhelp.ErrChannelTimeout)

// AND channels are closed
_, openCh = <-out.Output.Stdout
_, openCh = <-out.Receive.Stdout
assert.False(t, openCh)
_, openCh = <-out.Output.Stderr
_, openCh = <-out.Receive.Stderr
assert.False(t, openCh)

// AND ctx has been canceled
_, openCh = <-out.Output.Done
_, openCh = <-out.Receive.Done
assert.False(t, openCh)
}
}
Expand All @@ -251,16 +251,16 @@ func TestExec_Cancel_Partial(t *testing.T) {
require.Len(t, outs, 2)

// WHEN the tasks are running
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Output.Stdout))
assert.Equal(t, "starting", testhelp.ChannelRead(outs[1].Output.Stdout))
assert.Error(t, testhelp.ChannelErrClosedTimeout(outs[1].Output.Errors, 100*time.Millisecond))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Receive.Stdout))
assert.Equal(t, "starting", testhelp.ChannelRead(outs[1].Receive.Stdout))
assert.Error(t, testhelp.ChannelErrClosedTimeout(outs[1].Receive.Errors, 100*time.Millisecond))

// AND they are cancelled
cancel()

// THEN only the non-finished tasks have been cancelled
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Output.Errors))
assert.Error(t, testhelp.ChannelErrClosedTimeout(outs[1].Output.Errors, 100*time.Millisecond))
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Receive.Errors))
assert.Error(t, testhelp.ChannelErrClosedTimeout(outs[1].Receive.Errors, 100*time.Millisecond))
}

func TestExec_Directory(t *testing.T) {
Expand Down Expand Up @@ -293,9 +293,9 @@ func TestExec_Directory(t *testing.T) {
require.Len(t, outs, 1)

// THEN returns normally, forwarding the Standard Output&error
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Output.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Output.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[0].Output.Stderr))
assert.NoError(t, testhelp.ChannelErrClosed(outs[0].Receive.Errors))
assert.Equal(t, "stdout line", testhelp.ChannelRead(outs[0].Receive.Stdout))
assert.Equal(t, "error line", testhelp.ChannelRead(outs[0].Receive.Stderr))
}

func TestExec_RemoveExternalConfig(t *testing.T) {
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestExec_RemoveExternalConfig(t *testing.T) {
timeout := time.After(10 * time.Second)
for _, out := range outputs {
select {
case <-out.Output.Done:
case <-out.Receive.Done:
case <-timeout:
require.FailNow(t, "timeout waiting for the integrations to finish")
}
Expand Down
4 changes: 2 additions & 2 deletions internal/integrations/v4/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ const (

var ilog = log.WithComponent("integrations.Definition")

type IntegrationOutput struct {
Output executor.OutputReceive
type Output struct {
Receive executor.OutputReceive
ExtraLabels data.Map
EntityRewrite []data.EntityRewrite
}
Expand Down
4 changes: 2 additions & 2 deletions internal/integrations/v4/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestConfigTemplate(t *testing.T) {
// AND the integration has correctly accepted the file templates with the discovery matches
expectedIPs := map[string]struct{}{"1.2.3.4": {}, "5.6.7.8": {}}
for _, out := range outputs {
line := testhelp.ChannelRead(out.Output.Stdout)
line := testhelp.ChannelRead(out.Receive.Stdout)
assert.Containsf(t, expectedIPs, line, "unexpected value: %v", line)
delete(expectedIPs, line)
assert.Len(t, out.ExtraLabels, 2)
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestEmbeddedConfig_String(t *testing.T) {
// AND the integration has correctly accepted the file templates with the discovery matches
expectedIPs := map[string]struct{}{"1.2.3.4": {}, "5.6.7.8": {}}
for _, out := range outputs {
line := testhelp.ChannelRead(out.Output.Stdout)
line := testhelp.ChannelRead(out.Receive.Stdout)
assert.Containsf(t, expectedIPs, line, "unexpected value: %q", line)
delete(expectedIPs, line)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/integrations/v4/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ func (r *runner) execute(ctx context.Context, matches *databind.Values) {
instances.Add(len(output))
for _, out := range output {
o := out
go r.handleLines(o.Output.Stdout, o.ExtraLabels, o.EntityRewrite)
go r.handleStderr(o.Output.Stderr)
go r.handleLines(o.Receive.Stdout, o.ExtraLabels, o.EntityRewrite)
go r.handleStderr(o.Receive.Stderr)
go func() {
defer instances.Done()
r.handleErrors(o.Output.Errors)
r.handleErrors(o.Receive.Errors)
}()
}

Expand Down