Skip to content

Commit

Permalink
feat: add shutdown method for podmanreceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
rogercoll committed May 10, 2024
1 parent cde2b0a commit 1b6095d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 50 deletions.
8 changes: 7 additions & 1 deletion receiver/podmanreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,11 @@ func createMetricsReceiver(
consumer consumer.Metrics,
) (receiver.Metrics, error) {
podmanConfig := config.(*Config)
return newMetricsReceiver(ctx, params, podmanConfig, consumer, nil)

recv := newMetricsReceiver(params, podmanConfig, nil)
scrp, err := scraperhelper.NewScraper(metadata.Type.String(), recv.scrape, scraperhelper.WithStart(recv.start), scraperhelper.WithShutdown(recv.shutdown))
if err != nil {
return nil, err
}
return scraperhelper.NewScraperControllerReceiver(&recv.config.ControllerConfig, params, consumer, scraperhelper.AddScraper(scrp))
}
14 changes: 0 additions & 14 deletions receiver/podmanreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,3 @@ func TestCreateReceiver(t *testing.T) {
assert.NoError(t, err, "Metric receiver creation failed")
assert.NotNil(t, metricReceiver, "Receiver creation failed")
}

func TestCreateInvalidEndpoint(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig()
receiverCfg := config.(*Config)

receiverCfg.Endpoint = ""

params := receivertest.NewNopCreateSettings()
recv, err := factory.CreateMetricsReceiver(context.Background(), params, receiverCfg, consumertest.NewNop())
assert.Nil(t, recv)
assert.Error(t, err)
assert.Equal(t, "config.Endpoint must be specified", err.Error())
}
39 changes: 20 additions & 19 deletions receiver/podmanreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/scrapererror"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver/internal/metadata"
Expand All @@ -29,40 +27,32 @@ type metricsReceiver struct {
clientFactory clientFactory
scraper *ContainerScraper
mb *metadata.MetricsBuilder
cancel context.CancelFunc
}

func newMetricsReceiver(
_ context.Context,
set receiver.CreateSettings,
config *Config,
nextConsumer consumer.Metrics,
clientFactory clientFactory,
) (receiver.Metrics, error) {
err := config.Validate()
if err != nil {
return nil, err
}

) *metricsReceiver {
if clientFactory == nil {
clientFactory = newLibpodClient
}

recv := &metricsReceiver{
return &metricsReceiver{
config: config,
clientFactory: clientFactory,
set: set,
mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, set),
}
}

scrp, err := scraperhelper.NewScraper(metadata.Type.String(), recv.scrape, scraperhelper.WithStart(recv.start))
func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error {
err := r.config.Validate()
if err != nil {
return nil, err
return err
}
return scraperhelper.NewScraperControllerReceiver(&recv.config.ControllerConfig, set, nextConsumer, scraperhelper.AddScraper(scrp))
}

func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error {
var err error
podmanClient, err := r.clientFactory(r.set.Logger, r.config)
if err != nil {
return err
Expand All @@ -72,7 +62,19 @@ func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error {
if err = r.scraper.loadContainerList(ctx); err != nil {
return err
}
go r.scraper.containerEventLoop(ctx)

cctx, cancel := context.WithCancel(ctx)
r.cancel = cancel

go r.scraper.containerEventLoop(cctx)

return nil
}

func (r *metricsReceiver) shutdown(context.Context) error {
if r.cancel != nil {
r.cancel()
}
return nil
}

Expand Down Expand Up @@ -136,7 +138,6 @@ func (r *metricsReceiver) recordCPUMetrics(now pcommon.Timestamp, stats *contain
for i, cpu := range stats.PerCPU {
r.mb.RecordContainerCPUUsagePercpuDataPoint(now, int64(toSecondsWithNanosecondPrecision(cpu)), fmt.Sprintf("cpu%d", i))
}

}

func (r *metricsReceiver) recordNetworkMetrics(now pcommon.Timestamp, stats *containerStats) {
Expand Down
29 changes: 13 additions & 16 deletions receiver/podmanreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/receiver/scraperhelper"
Expand All @@ -31,21 +30,20 @@ func TestNewReceiver(t *testing.T) {
InitialDelay: time.Second,
},
}
nextConsumer := consumertest.NewNop()
mr, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), config, nextConsumer, nil)

mr := newMetricsReceiver(receivertest.NewNopCreateSettings(), config, nil)
assert.NotNil(t, mr)
assert.NoError(t, err)
}

func TestNewReceiverErrors(t *testing.T) {
r, err := newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), &Config{}, consumertest.NewNop(), nil)
assert.Nil(t, r)
func TestErrorsInStart(t *testing.T) {
recv := newMetricsReceiver(receivertest.NewNopCreateSettings(), &Config{}, nil)
assert.NotNil(t, recv)
err := recv.start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.Equal(t, "config.Endpoint must be specified", err.Error())

r, err = newMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), &Config{Endpoint: "someEndpoint"}, consumertest.NewNop(), nil)
assert.Nil(t, r)
recv = newMetricsReceiver(receivertest.NewNopCreateSettings(), &Config{Endpoint: "someEndpoint"}, nil)
assert.NotNil(t, recv)
err = recv.start(context.Background(), componenttest.NewNopHost())
require.Error(t, err)
assert.Equal(t, "config.CollectionInterval must be specified", err.Error())
}
Expand All @@ -55,13 +53,11 @@ func TestScraperLoop(t *testing.T) {
cfg.CollectionInterval = 100 * time.Millisecond

client := make(mockClient)
consumer := make(mockConsumer)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

r, err := newMetricsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, consumer, client.factory)
require.NoError(t, err)
r := newMetricsReceiver(receivertest.NewNopCreateSettings(), cfg, client.factory)
assert.NotNil(t, r)

go func() {
Expand All @@ -74,14 +70,15 @@ func TestScraperLoop(t *testing.T) {
}
}()

assert.NoError(t, r.Start(ctx, componenttest.NewNopHost()))
assert.NoError(t, r.start(ctx, componenttest.NewNopHost()))

md := <-consumer
md, err := r.scrape(ctx)
assert.NoError(t, err)
assert.Equal(t, 1, md.ResourceMetrics().Len())

assertStatsEqualToMetrics(t, genContainerStats(), md)

assert.NoError(t, r.Shutdown(ctx))
assert.NoError(t, r.shutdown(ctx))
}

type mockClient chan containerStatsReport
Expand Down

0 comments on commit 1b6095d

Please sign in to comment.