Skip to content

Commit

Permalink
extension: Init ECS service discovery in internal package
Browse files Browse the repository at this point in the history
This is the first (divided) implementation PR for #1395.
In order to reduce the size of the pr, the feature is incomplete.

- mock api client is used, no actual aws api calls
- Only docker label based filter/matcher is included
- Extension impl under `ecsobserver` is excluded

Those excluded logic are tested in other branches, so it works with real
API end to end (eventually).

The package is located under internal/aws/ecssd because

- it can be used by other aws specific packages
- we may extract it out to a standalone repo in the future
  as it can run as a binary and support other prometheus
  compatiable collectors as a sidecar
  • Loading branch information
at15 committed Mar 18, 2021
1 parent f8c550f commit cf211b3
Show file tree
Hide file tree
Showing 21 changed files with 1,837 additions and 3 deletions.
91 changes: 91 additions & 0 deletions internal/aws/ecssd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecssd

import (
"time"

"gopkg.in/yaml.v2"
)

const (
DefaultRefreshInterval = 30 * time.Second
DefaultJobLabelName = "prometheus_job"
AWSRegionEnvKey = "AWS_REGION"
)

type Config struct {
// ClusterName is the target ECS cluster name for service discovery.
ClusterName string `mapstructure:"cluster_name" yaml:"cluster_name"`
// ClusterRegion is the target ECS cluster's AWS region.
ClusterRegion string `mapstructure:"cluster_region" yaml:"cluster_region"`
// RefreshInterval determines how frequency at which the observer
// needs to poll for collecting information about new processes.
RefreshInterval time.Duration `mapstructure:"refresh_interval" yaml:"refresh_interval"`
// ResultFile is the output path of the discovered targets YAML file (optional).
// This is mainly used in conjunction with the Prometheus receiver.
ResultFile string `mapstructure:"result_file" yaml:"result_file"`
// JobLabelName is the override for prometheus job label, using `job` literal will cause error
// in otel prometheus receiver. See https://github.com/open-telemetry/opentelemetry-collector/issues/575
JobLabelName string `mapstructure:"job_label_name" yaml:"job_label_name"`
// Services is a list of service name patterns for filtering tasks.
Services []ServiceConfig `mapstructure:"services" yaml:"services"`
// TaskDefinitions is a list of task definition arn patterns for filtering tasks.
TaskDefinitions []TaskDefinitionConfig `mapstructure:"task_definitions" yaml:"task_definitions"`
// DockerLabels is a list of docker labels for filtering containers within tasks.
DockerLabels []DockerLabelConfig `mapstructure:"docker_labels" yaml:"docker_labels"`
}

// LoadConfig use yaml.v2 to decode the struct.
// It returns the yaml decode error directly.
func LoadConfig(b []byte) (Config, error) {
var c Config
if err := yaml.Unmarshal(b, &c); err != nil {
return Config{}, err
}
return c, nil
}

// ExampleConfig returns an example instance that matches testdata/config_example.yaml.
// It can be used to validate if the struct tags like mapstructure, yaml are working properly.
func ExampleConfig() Config {
return Config{
ClusterName: "ecs-sd-test-1",
ClusterRegion: "us-west-2",
ResultFile: "/etc/ecs_sd_targets.yaml",
RefreshInterval: 15 * time.Second,
JobLabelName: DefaultJobLabelName,
Services: []ServiceConfig{
{
NamePattern: "^retail-.*$",
},
},
TaskDefinitions: []TaskDefinitionConfig{
{
CommonExporterConfig: CommonExporterConfig{
JobName: "task_def_1",
MetricsPath: "/not/metrics",
MetricsPorts: []int{9113, 9090},
},
ArnPattern: ".*:task-definition/nginx:[0-9]+",
},
},
DockerLabels: []DockerLabelConfig{
{
PortLabel: "ECS_PROMETHEUS_EXPORTER_PORT",
},
},
}
}
29 changes: 29 additions & 0 deletions internal/aws/ecssd/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecssd

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLoadConfig(t *testing.T) {
b := mustReadFile(t, "testdata/config_example.yaml")
c, err := LoadConfig(b)
require.Nil(t, err)
assert.Equal(t, ExampleConfig(), c)
}
116 changes: 116 additions & 0 deletions internal/aws/ecssd/docker_label.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecssd

import (
"fmt"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ecs"
"go.uber.org/zap"
)

// DockerLabelConfig matches all tasks based on their docker label.
//
// NOTE: it's possible to make DockerLabelConfig part of CommonExporterConfig
// and use it both ServiceConfig and TaskDefinitionConfig.
// However, based on existing users, few people mix different types of filters.
// If that usecase arises in the future, we can rewrite the top level docker lable filter
// using a task definition filter with arn_pattern:*.
type DockerLabelConfig struct {
CommonExporterConfig `mapstructure:",squash" yaml:",inline"`

// PortLabel is mandetory, empty string means docker label based match is skipped.
PortLabel string `mapstructure:"port_label" yaml:"port_label"`
JobNameLabel string `mapstructure:"job_name_label" yaml:"job_name_label"`
MetricsPathLabel string `mapstructure:"metrics_path_label" yaml:"metrics_path_label"`
}

func (d *DockerLabelConfig) Init() error {
// It's possible to support it in the future, but for now just fail at config,
// so user don't need to wonder which port is used in the exported target.
if len(d.MetricsPorts) != 0 {
return fmt.Errorf("metrics_ports is not supported in docker_labels, got %v", d.MetricsPorts)
}
return nil
}

func (d *DockerLabelConfig) NewMatcher(options MatcherOptions) (Matcher, error) {
return &DockerLabelMatcher{
logger: options.Logger,
cfg: *d,
}, nil
}

func dockerLabelConfigToMatchers(cfgs []DockerLabelConfig) []MatcherConfig {
var matchers []MatcherConfig
for _, cfg := range cfgs {
// NOTE: &cfg points to the temp var, whose value would end up be the last one in the slice.
copied := cfg
matchers = append(matchers, &copied)
}
return matchers
}

type DockerLabelMatcher struct {
logger *zap.Logger
cfg DockerLabelConfig
}

func (d *DockerLabelMatcher) Type() MatcherType {
return MatcherTypeDockerLabel
}

func (d *DockerLabelMatcher) ExporterConfig() CommonExporterConfig {
return d.cfg.CommonExporterConfig
}

func (d *DockerLabelMatcher) MatchTargets(t *Task, c *ecs.ContainerDefinition) ([]MatchedTarget, error) {
portLabel := d.cfg.PortLabel
// Skip if portLabel is not set
if portLabel == "" {
return nil, errNotMatched
}

// Only check port label
ps, ok := c.DockerLabels[portLabel]
if !ok {
return nil, errNotMatched
}

// Convert port
s := aws.StringValue(ps)
port, err := strconv.Atoi(s)
if err != nil {
return nil, fmt.Errorf("invalid port_label value, container=%s labelKey=%s labelValue=%s: %w",
aws.StringValue(c.Name), d.cfg.PortLabel, s, err)
}
// Export only one target based on docker port label.
target := MatchedTarget{
Port: port,
}
if v, ok := c.DockerLabels[d.cfg.MetricsPathLabel]; ok {
target.MetricsPath = aws.StringValue(v)
}
if v, ok := c.DockerLabels[d.cfg.JobNameLabel]; ok {
target.Job = aws.StringValue(v)
}
// NOTE: we only override job name but keep port and metrics from docker label instead of using common export config.
if d.cfg.JobName != "" {
target.Job = d.cfg.JobName
}
return []MatchedTarget{target}, nil
}
112 changes: 112 additions & 0 deletions internal/aws/ecssd/docker_label_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecssd

import (
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/stretchr/testify/assert"
)

func TestDockerLabelMatcher_Match(t *testing.T) {
t.Run("skip empty config", func(t *testing.T) {
res := newMatcherAndMatch(t, &DockerLabelConfig{}, nil)
assert.Len(t, res.Tasks, 0)
})

t.Run("metrics_ports not supported", func(t *testing.T) {
cfg := DockerLabelConfig{
CommonExporterConfig: CommonExporterConfig{
MetricsPorts: []int{404}, // they should be ignored
},
}
assert.Error(t, cfg.Init())
})

portLabel := "MY_PROMETHEUS_PORT"
jobLabel := "MY_PROMETHEUS_JOB"
tasks := []*Task{
{
Definition: &ecs.TaskDefinition{
ContainerDefinitions: []*ecs.ContainerDefinition{
{
DockerLabels: map[string]*string{
portLabel: aws.String("2112"),
jobLabel: aws.String("PROM_JOB_1"),
},
},
{
DockerLabels: map[string]*string{
"not" + portLabel: aws.String("bar"),
},
},
},
},
},
}

t.Run("port label", func(t *testing.T) {
cfg := DockerLabelConfig{
PortLabel: portLabel,
JobNameLabel: jobLabel,
}
res := newMatcherAndMatch(t, &cfg, tasks)
assert.Equal(t, &MatchResult{
Tasks: []int{0},
Containers: []MatchedContainer{
{
TaskIndex: 0,
ContainerIndex: 0,
Targets: []MatchedTarget{
{
MatcherType: MatcherTypeDockerLabel,
Port: 2112,
Job: "PROM_JOB_1",
},
},
},
},
}, res)
})

t.Run("override job label", func(t *testing.T) {
cfg := DockerLabelConfig{
PortLabel: portLabel,
JobNameLabel: jobLabel,
CommonExporterConfig: CommonExporterConfig{
JobName: "override docker label",
},
}
res := newMatcherAndMatch(t, &cfg, tasks)
assert.Equal(t, &MatchResult{
Tasks: []int{0},
Containers: []MatchedContainer{
{
TaskIndex: 0,
ContainerIndex: 0,
Targets: []MatchedTarget{
{
MatcherType: MatcherTypeDockerLabel,
Port: 2112,
Job: "override docker label",
},
},
},
},
}, res)
})
}

0 comments on commit cf211b3

Please sign in to comment.