diff --git a/pkg/metrics/process/harvester_darwin.go b/pkg/metrics/process/harvester_darwin.go new file mode 100644 index 000000000..a29c2dac9 --- /dev/null +++ b/pkg/metrics/process/harvester_darwin.go @@ -0,0 +1,162 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Package process provides all the tools and functionality for sampling processes. It is divided in three main +// components: +// - Snapshot: provides OS-level information of a process at a given spot +// - Harvester: manages process Snapshots to create actual Process Samples with the actual metrics. +// - Sampler: uses the harvester to coordinate the creation of the Process Samples dataset, as being reported to NR +package process + +import ( + "github.com/newrelic/infrastructure-agent/internal/agent" + "github.com/newrelic/infrastructure-agent/pkg/config" + "github.com/newrelic/infrastructure-agent/pkg/metrics" + "github.com/newrelic/infrastructure-agent/pkg/metrics/types" + "github.com/pkg/errors" + "github.com/shirou/gopsutil/process" + "github.com/sirupsen/logrus" +) + +func newHarvester(ctx agent.AgentContext) *darwinHarvester { + cfg := ctx.Config() + // If not config, assuming root mode as default + privileged := cfg == nil || cfg.RunMode == config.ModeRoot || cfg.RunMode == config.ModePrivileged + disableZeroRSSFilter := cfg != nil && cfg.DisableZeroRSSFilter + stripCommandLine := (cfg != nil && cfg.StripCommandLine) || (cfg == nil && config.DefaultStripCommandLine) + //decouple the process from the harvester + processRetriever := func(pid int32) (Process, error) { + proc, err := process.NewProcess(pid) + if err != nil { + return &ProcessWrapper{}, err + } + return &ProcessWrapper{proc}, nil + } + + return &darwinHarvester{ + privileged: privileged, + disableZeroRSSFilter: disableZeroRSSFilter, + stripCommandLine: stripCommandLine, + serviceForPid: ctx.GetServiceForPid, + processRetriever: processRetriever, + } +} + +// darwinHarvester is a Harvester implementation that uses various darwin sources and manages process caches +type darwinHarvester struct { + privileged bool + disableZeroRSSFilter bool + stripCommandLine bool + serviceForPid func(int) (string, bool) + processRetriever func(int32) (Process, error) +} + +var _ Harvester = (*darwinHarvester)(nil) // static interface assertion + +// Pids returns a slice of process IDs that are running now +func (*darwinHarvester) Pids() ([]int32, error) { + return process.Pids() +} + +// Do Returns a sample of a process whose PID is passed as argument. The 'elapsedSeconds' argument represents the +// time since this process was sampled for the last time. If the process has been sampled for the first time, this value +// will be ignored. In darwin implementation not used right now +func (dh *darwinHarvester) Do(pid int32, elapsedSeconds float64) (*types.ProcessSample, error) { + proc, err := dh.processRetriever(pid) + if err != nil { + return nil, errors.Wrap(err, "can't create process") + } + + procSnapshot, err := getDarwinProcess(proc, dh.privileged) + if err != nil { + return nil, errors.Wrap(err, "can't create process") + } + + // We don't need to report processes which are not using memory. This filters out certain kernel processes. + if !dh.disableZeroRSSFilter && procSnapshot.VmRSS() == 0 { + return nil, errors.New("process with zero rss") + } + + // Creates a fresh process sample and populates it with the metrics data + sample := metrics.NewProcessSample(pid) + + if err = dh.populateStaticData(sample, procSnapshot); err != nil { + return nil, errors.Wrap(err, "can't populate static attributes") + } + + if err = dh.populateGauges(sample, procSnapshot); err != nil { + return nil, errors.Wrap(err, "can't fetch gauge data") + } + + // This must happen every time, even if we already had a cached sample for the process, because + // the available process name metadata may have changed underneath us (if we pick up a new + // service/PID association, etc) + sample.ProcessDisplayName = dh.determineProcessDisplayName(sample) + sample.Type("ProcessSample") + + return sample, nil +} + +// populateStaticData populates the sample with the process data won't vary during the process life cycle +func (dh *darwinHarvester) populateStaticData(sample *types.ProcessSample, processSnapshot Snapshot) error { + var err error + + sample.CmdLine, err = processSnapshot.CmdLine(!dh.stripCommandLine) + if err != nil { + return errors.Wrap(err, "acquiring command line") + } + + sample.User, err = processSnapshot.Username() + if err != nil { + mplog.WithError(err).WithField("processID", sample.ProcessID).Debug("Can't get Username for process.") + } + + sample.ProcessID = processSnapshot.Pid() + sample.CommandName = processSnapshot.Command() + sample.ParentProcessID = processSnapshot.Ppid() + + return nil +} + +// populateGauges populates the sample with gauge data that represents the process state at a given point +func (dh *darwinHarvester) populateGauges(sample *types.ProcessSample, process Snapshot) error { + var err error + + cpuTimes, err := process.CPUTimes() + if err != nil { + return err + } + sample.CPUPercent = cpuTimes.Percent + + totalCPU := cpuTimes.User + cpuTimes.System + + if totalCPU > 0 { + sample.CPUUserPercent = (cpuTimes.User / totalCPU) * sample.CPUPercent + sample.CPUSystemPercent = (cpuTimes.System / totalCPU) * sample.CPUPercent + } else { + sample.CPUUserPercent = 0 + sample.CPUSystemPercent = 0 + } + + // Extra status data + sample.Status = process.Status() + sample.ThreadCount = process.NumThreads() + sample.MemoryVMSBytes = process.VmSize() + sample.MemoryRSSBytes = process.VmRSS() + + return nil +} + +// determineProcessDisplayName generates a human-friendly name for this process. By default, we use the command name. +// If we know of a service for this pid, that'll be the name. +func (dh *darwinHarvester) determineProcessDisplayName(sample *types.ProcessSample) string { + displayName := sample.CommandName + if serviceName, ok := dh.serviceForPid(int(sample.ProcessID)); ok && len(serviceName) > 0 { + mplog.WithFieldsF(func() logrus.Fields { + return logrus.Fields{"serviceName": serviceName, "displayName": displayName, "ProcessID": sample.ProcessID} + }).Debug("Using service name as display name.") + displayName = serviceName + } + + return displayName +} diff --git a/pkg/metrics/process/harvester_darwin_test.go b/pkg/metrics/process/harvester_darwin_test.go new file mode 100644 index 000000000..824759fa4 --- /dev/null +++ b/pkg/metrics/process/harvester_darwin_test.go @@ -0,0 +1,471 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package process + +import ( + "bytes" + "github.com/newrelic/infrastructure-agent/internal/agent/mocks" + "github.com/newrelic/infrastructure-agent/pkg/config" + "github.com/newrelic/infrastructure-agent/pkg/log" + "github.com/newrelic/infrastructure-agent/pkg/metrics/types" + "github.com/pkg/errors" + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/process" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "math" + "testing" +) + +func Test_newHarvester(t *testing.T) { + tests := []struct { + name string + cfg *config.Config + expectedPrivileged bool + expectedDisableZeroRSSFilter bool + expectedStripCommandLine bool + }{ + { + name: "no config", + cfg: nil, + expectedPrivileged: true, + expectedDisableZeroRSSFilter: false, + expectedStripCommandLine: config.DefaultStripCommandLine, + }, + { + name: "root mode", + cfg: &config.Config{RunMode: config.ModeRoot}, + expectedPrivileged: true, + expectedDisableZeroRSSFilter: false, + expectedStripCommandLine: false, + }, + { + name: "privileged mode", + cfg: &config.Config{RunMode: config.ModePrivileged}, + expectedPrivileged: true, + expectedDisableZeroRSSFilter: false, + expectedStripCommandLine: false, + }, + { + name: "unprivileged mode", + cfg: &config.Config{RunMode: config.ModeUnprivileged}, + expectedPrivileged: false, + expectedDisableZeroRSSFilter: false, + expectedStripCommandLine: false, + }, + { + name: "DisableZeroRSSFilter", + cfg: &config.Config{DisableZeroRSSFilter: true}, + expectedPrivileged: false, + expectedDisableZeroRSSFilter: true, + expectedStripCommandLine: false, + }, + { + name: "stripCommandLine", + cfg: &config.Config{StripCommandLine: true}, + expectedPrivileged: false, + expectedDisableZeroRSSFilter: false, + expectedStripCommandLine: true, + }, + { + name: "dont stripCommandLine", + cfg: &config.Config{StripCommandLine: false}, + expectedPrivileged: false, + expectedDisableZeroRSSFilter: false, + expectedStripCommandLine: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := new(mocks.AgentContext) + ctx.On("Config").Once().Return(tt.cfg) + h := newHarvester(ctx) + assert.Equal(t, tt.expectedPrivileged, h.privileged) + assert.Equal(t, tt.expectedDisableZeroRSSFilter, h.disableZeroRSSFilter) + assert.Equal(t, tt.expectedStripCommandLine, h.stripCommandLine) + ctx.AssertExpectations(t) + }) + } +} + +func TestDarwinHarvester_populateStaticData_OnErrorOnCmd(t *testing.T) { + ctx := new(mocks.AgentContext) + snapshot := &SnapshotMock{} + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + + h := newHarvester(ctx) + errorOnCmd := errors.New("this is some error") + snapshot.ShouldReturnCmdLine(!h.stripCommandLine, "", errorOnCmd) + + sample := &types.ProcessSample{} + err := h.populateStaticData(sample, snapshot) + + assert.Equal(t, errors.Cause(err), errorOnCmd) + assert.Equal(t, sample, &types.ProcessSample{}) + mock.AssertExpectationsForObjects(t, ctx, snapshot) +} + +func TestDarwinHarvester_populateStaticData_LogOnErrorOnUsername(t *testing.T) { + ctx := new(mocks.AgentContext) + snapshot := &SnapshotMock{} + cmdLine := "some cmd line" + command := "some command" + var pid int32 = 3 + var ppid int32 = 3 + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + + //Capture the logs + var output bytes.Buffer + log.SetOutput(&output) + log.SetLevel(logrus.DebugLevel) + + h := newHarvester(ctx) + errorOnUsername := errors.New("this is some username error") + snapshot.ShouldReturnCmdLine(!h.stripCommandLine, cmdLine, nil) + snapshot.ShouldReturnUsername("", errorOnUsername) + snapshot.ShouldReturnPid(pid) + snapshot.ShouldReturnPpid(ppid) + snapshot.ShouldReturnCommand(command) + + sample := &types.ProcessSample{} + err := h.populateStaticData(sample, snapshot) + assert.Nil(t, err) + + //get log output + written := output.String() + assert.Contains(t, written, "Can't get Username for process.") + + assert.Equal(t, sample, &types.ProcessSample{ + CmdLine: cmdLine, + CommandName: command, + ProcessID: pid, + ParentProcessID: ppid, + }) + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx, snapshot) +} + +func TestDarwinHarvester_populateStaticData_NoErrorOnUsername(t *testing.T) { + ctx := new(mocks.AgentContext) + snapshot := &SnapshotMock{} + cmdLine := "some cmd line" + command := "some command" + username := "some username" + var pid int32 = 3 + var ppid int32 = 3 + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + + //Capture the logs + var output bytes.Buffer + log.SetOutput(&output) + log.SetLevel(logrus.DebugLevel) + + h := newHarvester(ctx) + snapshot.ShouldReturnCmdLine(!h.stripCommandLine, cmdLine, nil) + snapshot.ShouldReturnUsername(username, nil) + snapshot.ShouldReturnPid(pid) + snapshot.ShouldReturnPpid(ppid) + snapshot.ShouldReturnCommand(command) + + sample := &types.ProcessSample{} + err := h.populateStaticData(sample, snapshot) + assert.Nil(t, err) + + //get log output + written := output.String() + assert.Equal(t, written, "") + + assert.Equal(t, sample, &types.ProcessSample{ + CmdLine: cmdLine, + CommandName: command, + ProcessID: pid, + ParentProcessID: ppid, + User: username, + }) + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx, snapshot) +} + +func TestDarwinHarvester_populateGauges(t *testing.T) { + ctx := new(mocks.AgentContext) + snapshot := &SnapshotMock{} + + tests := []struct { + name string + cpuInfo CPUInfo + status string + threadCount int32 + vms int64 + rss int64 + expectedSample *types.ProcessSample + }{ + { + name: "with cpu info", + cpuInfo: CPUInfo{ + Percent: 45.34, + User: 21.10, + System: 24.24, + }, + status: "some status", + threadCount: int32(4), + vms: int64(23), + rss: int64(34), + expectedSample: &types.ProcessSample{ + CPUPercent: 45.34, + CPUUserPercent: 21.10, + CPUSystemPercent: 24.24, + Status: "some status", + ThreadCount: int32(4), + MemoryVMSBytes: int64(23), + MemoryRSSBytes: int64(34), + }, + }, + { + name: "no cpu user/system info", + cpuInfo: CPUInfo{ + Percent: 56.34, + }, + status: "some other status", + threadCount: int32(2), + vms: int64(55), + rss: int64(66), + expectedSample: &types.ProcessSample{ + CPUPercent: 56.34, + CPUUserPercent: 0, + CPUSystemPercent: 0, + Status: "some other status", + ThreadCount: int32(2), + MemoryVMSBytes: int64(5), + MemoryRSSBytes: int64(66), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + + h := newHarvester(ctx) + + snapshot.ShouldReturnCPUTimes(tt.cpuInfo, nil) + snapshot.ShouldReturnStatus(tt.status) + snapshot.ShouldReturnNumThreads(tt.threadCount) + snapshot.ShouldReturnVmSize(tt.vms) + snapshot.ShouldReturnVmRSS(tt.rss) + + sample := &types.ProcessSample{} + err := h.populateGauges(sample, snapshot) + assert.Nil(t, err) + + assert.Equal(t, tt.cpuInfo.Percent, sample.CPUPercent) + assert.Equal(t, tt.cpuInfo.User, math.Round(sample.CPUUserPercent*100)/100) + assert.Equal(t, tt.cpuInfo.System, math.Round(sample.CPUSystemPercent*100)/100) + assert.Equal(t, tt.status, sample.Status) + assert.Equal(t, tt.threadCount, sample.ThreadCount) + assert.Equal(t, tt.vms, sample.MemoryVMSBytes) + assert.Equal(t, tt.rss, sample.MemoryRSSBytes) + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx, snapshot) + }) + } +} + +func TestDarwinHarvester_populateGauges_NoCpuInfo(t *testing.T) { + ctx := new(mocks.AgentContext) + snapshot := &SnapshotMock{} + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + + h := newHarvester(ctx) + + cpuInfoErr := errors.New("this is an error") + snapshot.ShouldReturnCPUTimes(CPUInfo{}, cpuInfoErr) + + sample := &types.ProcessSample{} + err := h.populateGauges(sample, snapshot) + assert.Equal(t, cpuInfoErr, err) + assert.Equal(t, sample, &types.ProcessSample{}) + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx, snapshot) +} + +func TestDarwinHarvester_determineProcessDisplayName_OnProcessIdInfoAvailable(t *testing.T) { + ctx := new(mocks.AgentContext) + commandName := "some command name" + processName := "some process name" + processId := 10 + ok := true + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + ctx.On("GetServiceForPid", processId).Once().Return(processName, ok) + + //Capture the logs + var output bytes.Buffer + log.SetOutput(&output) + log.SetLevel(logrus.DebugLevel) + + h := newHarvester(ctx) + + sample := &types.ProcessSample{CommandName: commandName, ProcessID: int32(processId)} + name := h.determineProcessDisplayName(sample) + + assert.Equal(t, processName, name) + + //get log output + written := output.String() + assert.Contains(t, written, "Using service name as display name.") + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx) +} + +func TestDarwinHarvester_determineProcessDisplayName_OnProcessIdInfoNotAvailable(t *testing.T) { + ctx := new(mocks.AgentContext) + commandName := "some command name" + processName := "" + processId := 10 + ok := false + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + ctx.On("GetServiceForPid", processId).Once().Return(processName, ok) + + h := newHarvester(ctx) + + sample := &types.ProcessSample{CommandName: commandName, ProcessID: int32(processId)} + name := h.determineProcessDisplayName(sample) + + assert.Equal(t, commandName, name) + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx) +} + +func TestDarwinHarvester_Do_DontReportIfMemoryZero(t *testing.T) { + ctx := new(mocks.AgentContext) + processId := int32(1) + elapsedSeconds := 12.0 //not used right now + + cfg := &config.Config{RunMode: config.ModeRoot, DisableZeroRSSFilter: false} + ctx.On("Config").Once().Return(cfg) + + proc := &ProcessMock{} + + proc.ShouldReturnName("some name", nil) + proc.ShouldReturnProcessIdMultipleTimes(processId, 2) + proc.ShouldReturnNumThreads(3, nil) + proc.ShouldReturnStatus("some status", nil) + proc.ShouldReturnMemoryInfo( + &process.MemoryInfoStat{ + RSS: 0, + VMS: 0, + }, nil) + proc.ShouldReturnCPUPercent(34.45, nil) + proc.ShouldReturnTimes(&cpu.TimesStat{User: 34, System: 0.45}, nil) + proc.ShouldReturnUsername("some username", nil) + + h := newHarvester(ctx) + h.processRetriever = func(int32) (Process, error) { + return proc, nil + } + + var expectedSample *types.ProcessSample + sample, err := h.Do(processId, elapsedSeconds) + assert.Error(t, err, "process with zero rss") + assert.Equal(t, expectedSample, sample) + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx) +} + +func TestDarwinHarvester_Do_NoError(t *testing.T) { + ctx := new(mocks.AgentContext) + processName := "some process name" + processId := int32(1) + elapsedSeconds := 12.0 //not used right now + ok := false + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + ctx.On("GetServiceForPid", int(processId)).Once().Return(processName, ok) + + proc := &ProcessMock{} + + proc.ShouldReturnName("some name", nil) + proc.ShouldReturnProcessIdMultipleTimes(processId, 2) + proc.ShouldReturnNumThreads(3, nil) + proc.ShouldReturnStatus("some status", nil) + proc.ShouldReturnMemoryInfo( + &process.MemoryInfoStat{ + RSS: 45, + VMS: 22, + }, nil) + proc.ShouldReturnCPUPercent(34.45, nil) + proc.ShouldReturnTimes(&cpu.TimesStat{User: 34, System: 0.45}, nil) + proc.ShouldReturnUsername("some username", nil) + + h := newHarvester(ctx) + h.processRetriever = func(int32) (Process, error) { + return proc, nil + } + + sample, err := h.Do(processId, elapsedSeconds) + + assert.Nil(t, err) + assert.Equal(t, "some name", sample.ProcessDisplayName) + assert.Equal(t, processId, sample.ProcessID) + assert.Equal(t, "some name", sample.CommandName) + assert.Equal(t, "some username", sample.User) + assert.Equal(t, int64(45), sample.MemoryRSSBytes) + assert.Equal(t, int64(22), sample.MemoryVMSBytes) + assert.Equal(t, 34.45, sample.CPUPercent) + assert.Equal(t, 34.0, math.Round(sample.CPUUserPercent*100)/100) + assert.Equal(t, 0.45, math.Round(sample.CPUSystemPercent*100)/100) + assert.Equal(t, "", sample.CmdLine) + assert.Equal(t, "some status", sample.Status) + assert.Equal(t, int32(0), sample.ParentProcessID) + assert.Equal(t, int32(3), sample.ThreadCount) + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx) +} + +func TestDarwinHarvester_Do_ErrorOnProcError(t *testing.T) { + ctx := new(mocks.AgentContext) + processId := int32(1) + elapsedSeconds := 12.0 //not used right now + var expectedSample *types.ProcessSample + + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Once().Return(cfg) + + procErr := errors.New("some error message") + h := newHarvester(ctx) + h.processRetriever = func(int32) (Process, error) { + return nil, procErr + } + + sample, err := h.Do(processId, elapsedSeconds) + assert.Equal(t, errors.Cause(err), procErr) + assert.Equal(t, expectedSample, sample) + + //mocked objects assertions + mock.AssertExpectationsForObjects(t, ctx) +} diff --git a/pkg/metrics/process/harvester_linux.go b/pkg/metrics/process/harvester_linux.go index cb3bd7c27..3358edbc0 100644 --- a/pkg/metrics/process/harvester_linux.go +++ b/pkg/metrics/process/harvester_linux.go @@ -10,7 +10,6 @@ package process import ( "github.com/newrelic/infrastructure-agent/internal/agent" "github.com/newrelic/infrastructure-agent/pkg/config" - "github.com/newrelic/infrastructure-agent/pkg/log" "github.com/newrelic/infrastructure-agent/pkg/metrics" "github.com/newrelic/infrastructure-agent/pkg/metrics/acquire" "github.com/newrelic/infrastructure-agent/pkg/metrics/types" @@ -20,18 +19,6 @@ import ( "github.com/sirupsen/logrus" ) -var mplog = log.WithField("component", "Metrics Process") - -// Harvester manages sampling for individual processes. It is used by the Process Sampler to get information about the -// existing processes. -type Harvester interface { - // Pids return the IDs of all the processes that are currently running - Pids() ([]int32, error) - // Do performs the actual harvesting operation, returning a process sample containing all the metrics data - // for the last elapsedSeconds - Do(pid int32, elapsedSeconds float64) (*types.ProcessSample, error) -} - func newHarvester(ctx agent.AgentContext, cache *cache) *linuxHarvester { cfg := ctx.Config() // If not config, assuming root mode as default diff --git a/pkg/metrics/process/harvester_unix.go b/pkg/metrics/process/harvester_unix.go new file mode 100644 index 000000000..374b867c5 --- /dev/null +++ b/pkg/metrics/process/harvester_unix.go @@ -0,0 +1,27 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +// +build linux darwin + +// Package process provides all the tools and functionality for sampling processes. It is divided in three main +// components: +// - Snapshot: provides OS-level information of a process at a given spot +// - Harvester: manages process Snapshots to create actual Process Samples with the actual metrics. +// - Sampler: uses the harvester to coordinate the creation of the Process Samples dataset, as being reported to NR +package process + +import ( + "github.com/newrelic/infrastructure-agent/pkg/log" + "github.com/newrelic/infrastructure-agent/pkg/metrics/types" +) + +var mplog = log.WithField("component", "Metrics Process") + +// Harvester manages sampling for individual processes. It is used by the Process Sampler to get information about the +// existing processes. +type Harvester interface { + // Pids return the IDs of all the processes that are currently running + Pids() ([]int32, error) + // Do performs the actual harvesting operation, returning a process sample containing all the metrics data + // for the last elapsedSeconds + Do(pid int32, elapsedSeconds float64) (*types.ProcessSample, error) +} diff --git a/pkg/metrics/process/harvester_unix_mock_test.go b/pkg/metrics/process/harvester_unix_mock_test.go new file mode 100644 index 000000000..480cabe6a --- /dev/null +++ b/pkg/metrics/process/harvester_unix_mock_test.go @@ -0,0 +1,40 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +// +build linux darwin + +package process + +import ( + "github.com/newrelic/infrastructure-agent/pkg/metrics/types" + "github.com/stretchr/testify/mock" +) + +type HarvesterMock struct { + mock.Mock +} + +func (h *HarvesterMock) Pids() ([]int32, error) { + args := h.Called() + + return args.Get(0).([]int32), args.Error(1) +} + +func (h *HarvesterMock) ShouldReturnPids(pids []int32, err error) { + h. + On("Pids"). + Once(). + Return(pids, err) +} + +func (h *HarvesterMock) Do(pid int32, elapsedSeconds float64) (*types.ProcessSample, error) { + args := h.Called(pid, elapsedSeconds) + + return args.Get(0).(*types.ProcessSample), args.Error(1) +} + +func (h *HarvesterMock) ShouldDo(pid int32, elapsedSeconds float64, sample *types.ProcessSample, err error) { + h. + On("Do", pid, elapsedSeconds). + Once(). + Return(sample, err) +} diff --git a/pkg/metrics/process/process.go b/pkg/metrics/process/process.go new file mode 100644 index 000000000..538718c28 --- /dev/null +++ b/pkg/metrics/process/process.go @@ -0,0 +1,38 @@ +package process + +import ( + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/process" +) + +// Process it's an interface to abstract gopsutil process so we can mock it and test not coupling to infra +type Process interface { + Username() (string, error) + Name() (string, error) + ProcessId() int32 + Parent() (Process, error) + NumThreads() (int32, error) + Status() (string, error) + MemoryInfo() (*process.MemoryInfoStat, error) + CPUPercent() (float64, error) + Times() (*cpu.TimesStat, error) +} + +// ProcessWrapper is necessary to implement the interface as gopsutil process is not exporting Pid() +type ProcessWrapper struct { + *process.Process +} + +// ProcessId returns the Pid of the process +func (p *ProcessWrapper) ProcessId() int32 { + return p.Process.Pid +} + +// Parent return the process' parent +func (p *ProcessWrapper) Parent() (Process, error) { + par, err := p.Process.Parent() + if err != nil { + return &ProcessWrapper{}, err + } + return &ProcessWrapper{par}, nil +} diff --git a/pkg/metrics/process/process_mock_test.go b/pkg/metrics/process/process_mock_test.go new file mode 100644 index 000000000..a8dedb6d2 --- /dev/null +++ b/pkg/metrics/process/process_mock_test.go @@ -0,0 +1,128 @@ +package process + +import ( + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/process" + "github.com/stretchr/testify/mock" +) + +type ProcessMock struct { + mock.Mock +} + +func (s *ProcessMock) Username() (string, error) { + args := s.Called() + + return args.String(0), args.Error(1) +} + +func (s *ProcessMock) ShouldReturnUsername(username string, err error) { + s. + On("Username"). + Once(). + Return(username, err) +} + +func (s *ProcessMock) Name() (string, error) { + args := s.Called() + + return args.String(0), args.Error(1) +} + +func (s *ProcessMock) ShouldReturnName(name string, err error) { + s. + On("Name"). + Once(). + Return(name, err) +} + +func (s *ProcessMock) ProcessId() int32 { + args := s.Called() + + return args.Get(0).(int32) +} + +func (s *ProcessMock) ShouldReturnProcessId(processId int32) { + s.ShouldReturnProcessIdMultipleTimes(processId, 1) +} + +func (s *ProcessMock) ShouldReturnProcessIdMultipleTimes(processId int32, times int) { + s. + On("ProcessId"). + Times(times). + Return(processId) +} + +func (s *ProcessMock) Parent() (Process, error) { + args := s.Called() + + return args.Get(0).(Process), args.Error(1) +} + +func (s *ProcessMock) ShouldReturnParent(process Process, err error) { + s. + On("Parent"). + Once(). + Return(process, err) +} + +func (s *ProcessMock) NumThreads() (int32, error) { + args := s.Called() + + return args.Get(0).(int32), args.Error(1) +} + +func (s *ProcessMock) ShouldReturnNumThreads(num int32, err error) { + s. + On("NumThreads"). + Once(). + Return(num, err) +} + +func (s *ProcessMock) Status() (string, error) { + args := s.Called() + + return args.String(0), args.Error(1) +} +func (s *ProcessMock) ShouldReturnStatus(status string, err error) { + s. + On("Status"). + Once(). + Return(status, err) +} + +func (s *ProcessMock) MemoryInfo() (*process.MemoryInfoStat, error) { + args := s.Called() + + return args.Get(0).(*process.MemoryInfoStat), args.Error(1) +} +func (s *ProcessMock) ShouldReturnMemoryInfo(memInfo *process.MemoryInfoStat, err error) { + s. + On("MemoryInfo"). + Once(). + Return(memInfo, err) +} + +func (s *ProcessMock) CPUPercent() (float64, error) { + args := s.Called() + + return args.Get(0).(float64), args.Error(1) +} +func (s *ProcessMock) ShouldReturnCPUPercent(percent float64, err error) { + s. + On("CPUPercent"). + Once(). + Return(percent, err) +} + +func (s *ProcessMock) Times() (*cpu.TimesStat, error) { + args := s.Called() + + return args.Get(0).(*cpu.TimesStat), args.Error(1) +} +func (s *ProcessMock) ShouldReturnTimes(times *cpu.TimesStat, err error) { + s. + On("Times"). + Once(). + Return(times, err) +} diff --git a/pkg/metrics/process/sampler_darwin.go b/pkg/metrics/process/sampler_darwin.go new file mode 100644 index 000000000..7b92cb664 --- /dev/null +++ b/pkg/metrics/process/sampler_darwin.go @@ -0,0 +1,156 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +package process + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/newrelic/infrastructure-agent/internal/agent" + "github.com/newrelic/infrastructure-agent/pkg/config" + "github.com/newrelic/infrastructure-agent/pkg/metrics" + "github.com/newrelic/infrastructure-agent/pkg/metrics/sampler" + "github.com/newrelic/infrastructure-agent/pkg/metrics/types" + "github.com/newrelic/infrastructure-agent/pkg/sample" +) + +// processSampler is an implementation of the metrics_sender.Sampler interface, which returns runtime information about +// the currently running processes +type processSampler struct { + harvest Harvester + containerSampler metrics.ContainerSampler + lastRun time.Time + hasAlreadyRun bool + interval time.Duration +} + +var ( + _ sampler.Sampler = (*processSampler)(nil) // static interface assertion + containerNotRunningErrs = map[string]struct{}{} +) + +// NewProcessSampler creates and returns a new process Sampler, given an agent context. +func NewProcessSampler(ctx agent.AgentContext) sampler.Sampler { + hasConfig := ctx != nil && ctx.Config() != nil + + ttlSecs := config.DefaultContainerCacheMetadataLimit + apiVersion := "" + interval := config.FREQ_INTERVAL_FLOOR_PROCESS_METRICS + if hasConfig { + cfg := ctx.Config() + ttlSecs = cfg.ContainerMetadataCacheLimit + apiVersion = cfg.DockerApiVersion + interval = cfg.MetricsProcessSampleRate + } + harvester := newHarvester(ctx) + dockerSampler := metrics.NewDockerSampler(time.Duration(ttlSecs)*time.Second, apiVersion) + + return &processSampler{ + harvest: harvester, + containerSampler: dockerSampler, + interval: time.Second * time.Duration(interval), + } + +} + +func (ps *processSampler) OnStartup() {} + +func (ps *processSampler) Name() string { + return "ProcessSampler" +} + +func (ps *processSampler) Interval() time.Duration { + return ps.interval +} + +func (ps *processSampler) Disabled() bool { + return ps.Interval() <= config.FREQ_DISABLE_SAMPLING +} + +// Sample returns samples for all the running processes, decorated with Docker runtime information, if applies. +func (ps *processSampler) Sample() (results sample.EventBatch, err error) { + var elapsedMs int64 + var elapsedSeconds float64 + now := time.Now() + if ps.hasAlreadyRun { + elapsedMs = (now.UnixNano() - ps.lastRun.UnixNano()) / 1000000 + } + elapsedSeconds = float64(elapsedMs) / 1000 + ps.lastRun = now + + pids, err := ps.harvest.Pids() + if err != nil { + return nil, err + } + + var dockerDecorator metrics.ProcessDecorator = nil + if ps.containerSampler.Enabled() { + dockerDecorator, err = ps.containerSampler.NewDecorator() + if err != nil { + if id := containerIDFromNotRunningErr(err); id != "" { + if _, ok := containerNotRunningErrs[id]; !ok { + containerNotRunningErrs[id] = struct{}{} + mplog.WithError(err).Warn("instantiating docker sampler process decorator") + } + } else { + mplog.WithError(err).Warn("instantiating docker sampler process decorator") + if strings.Contains(err.Error(), "client is newer than server") { + mplog.WithError(err).Error("Only docker api version from 1.24 upwards are officially supported. You can still use the docker_api_version configuration to work with older versions. You can check https://docs.docker.com/develop/sdk/ what api version maps with each docker version.") + } + } + } + } + + for _, pid := range pids { + var processSample *types.ProcessSample + var err error + + processSample, err = ps.harvest.Do(pid, elapsedSeconds) + if err != nil { + mplog.WithError(err).WithField("pid", pid).Debug("Skipping process.") + continue + } + + if dockerDecorator != nil { + dockerDecorator.Decorate(processSample) + } + + results = append(results, ps.normalizeSample(processSample)) + } + + ps.hasAlreadyRun = true + return results, nil +} + +func (ps *processSampler) normalizeSample(s *types.ProcessSample) sample.Event { + if len(s.ContainerLabels) > 0 { + sb, err := json.Marshal(s) + if err == nil { + bm := &types.FlatProcessSample{} + if err = json.Unmarshal(sb, bm); err == nil { + for name, value := range s.ContainerLabels { + key := fmt.Sprintf("containerLabel_%s", name) + (*bm)[key] = value + } + return bm + } + } else { + mplog.WithError(err).WithField("sample", fmt.Sprintf("%+v", s)).Debug("normalizeSample can't operate on the sample.") + } + } + return s +} + +func containerIDFromNotRunningErr(err error) string { + prefix := "Error response from daemon: Container " + suffix := " is not running" + msg := err.Error() + i := strings.Index(msg, prefix) + j := strings.Index(msg, suffix) + if i == -1 || j == -1 { + return "" + } + return msg[len(prefix):j] +} diff --git a/pkg/metrics/process/sampler_darwin_test.go b/pkg/metrics/process/sampler_darwin_test.go new file mode 100644 index 000000000..4b6ae6f78 --- /dev/null +++ b/pkg/metrics/process/sampler_darwin_test.go @@ -0,0 +1,152 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +package process + +import ( + "errors" + "github.com/newrelic/infrastructure-agent/internal/agent/mocks" + "github.com/newrelic/infrastructure-agent/pkg/config" + "github.com/newrelic/infrastructure-agent/pkg/metrics/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "testing" +) + +func TestProcessSampler_Sample(t *testing.T) { + ctx := new(mocks.AgentContext) + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Times(3).Return(cfg) + + harvester := &HarvesterMock{} + sampler := NewProcessSampler(ctx).(*processSampler) + sampler.harvest = harvester + + samples := []*types.ProcessSample{ + { + ProcessDisplayName: "proc 1", + ProcessID: 1, + }, + { + ProcessDisplayName: "proc 2", + ProcessID: 2, + }, + { + ProcessDisplayName: "proc 3", + ProcessID: 3, + }, + } + var pids []int32 + for _, s := range samples { + pids = append(pids, s.ProcessID) + } + + harvester.ShouldReturnPids(pids, nil) + for _, s := range samples { + harvester.ShouldDo(s.ProcessID, 0, s, nil) + } + + eventBatch, err := sampler.Sample() + assert.Nil(t, err) + assert.Len(t, eventBatch, len(samples)) + for i, e := range eventBatch { + assert.Equal(t, samples[i], e) + } + + mock.AssertExpectationsForObjects(t, ctx, harvester) +} + +func TestProcessSampler_Sample_ErrorOnProcessShouldNotStop(t *testing.T) { + ctx := new(mocks.AgentContext) + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Times(3).Return(cfg) + + harvester := &HarvesterMock{} + sampler := NewProcessSampler(ctx).(*processSampler) + sampler.harvest = harvester + + samples := []struct { + pid int32 + name string + err error + }{ + { + name: "proc 1", + pid: 1, + }, + { + name: "proc 2", + pid: 2, + err: errors.New("some error"), + }, + { + name: "proc 3", + pid: 3, + }, + } + var pids []int32 + for _, s := range samples { + pids = append(pids, s.pid) + } + + harvester.ShouldReturnPids(pids, nil) + for _, s := range samples { + harvester.ShouldDo(s.pid, 0, &types.ProcessSample{ProcessID: s.pid, ProcessDisplayName: s.name}, s.err) + } + + eventBatch, err := sampler.Sample() + assert.Nil(t, err) + assert.Len(t, eventBatch, 2) + assert.Equal(t, int32(1), eventBatch[0].(*types.ProcessSample).ProcessID) + assert.Equal(t, int32(3), eventBatch[1].(*types.ProcessSample).ProcessID) + + mock.AssertExpectationsForObjects(t, ctx, harvester) +} + +func TestProcessSampler_Sample_DockerDecorator(t *testing.T) { + ctx := new(mocks.AgentContext) + cfg := &config.Config{RunMode: config.ModeRoot} + ctx.On("Config").Times(3).Return(cfg) + + harvester := &HarvesterMock{} + sampler := NewProcessSampler(ctx).(*processSampler) + sampler.harvest = harvester + sampler.containerSampler = &fakeContainerSampler{} + + samples := []*types.ProcessSample{ + { + ProcessDisplayName: "proc 1", + ProcessID: 1, + }, + { + ProcessDisplayName: "proc 2", + ProcessID: 2, + }, + { + ProcessDisplayName: "proc 3", + ProcessID: 3, + }, + } + var pids []int32 + for _, s := range samples { + pids = append(pids, s.ProcessID) + } + + harvester.ShouldReturnPids(pids, nil) + for _, s := range samples { + harvester.ShouldDo(s.ProcessID, 0, s, nil) + } + + eventBatch, err := sampler.Sample() + assert.Nil(t, err) + assert.Len(t, eventBatch, len(samples)) + for i, e := range eventBatch { + flatProcessSample := e.(*types.FlatProcessSample) + assert.Equal(t, float64(samples[i].ProcessID), (*flatProcessSample)["processId"]) + assert.Equal(t, samples[i].ProcessDisplayName, (*flatProcessSample)["processDisplayName"]) + assert.Equal(t, "decorated", (*flatProcessSample)["containerImage"]) + assert.Equal(t, "value1", (*flatProcessSample)["containerLabel_label1"]) + assert.Equal(t, "value2", (*flatProcessSample)["containerLabel_label2"]) + } + + mock.AssertExpectationsForObjects(t, ctx, harvester) +} diff --git a/pkg/metrics/process/sampler_linux_test.go b/pkg/metrics/process/sampler_linux_test.go index 456da0b9b..f83747223 100644 --- a/pkg/metrics/process/sampler_linux_test.go +++ b/pkg/metrics/process/sampler_linux_test.go @@ -17,7 +17,6 @@ import ( "github.com/newrelic/infrastructure-agent/pkg/config" "github.com/newrelic/infrastructure-agent/pkg/entity" "github.com/newrelic/infrastructure-agent/pkg/entity/host" - "github.com/newrelic/infrastructure-agent/pkg/metrics" "github.com/newrelic/infrastructure-agent/pkg/metrics/types" "github.com/newrelic/infrastructure-agent/pkg/plugins/ids" "github.com/newrelic/infrastructure-agent/pkg/sample" @@ -82,26 +81,6 @@ func (hm *harvesterMock) Do(pid int32, _ float64) (*types.ProcessSample, error) return hm.samples[pid], nil } -type fakeContainerSampler struct{} - -func (cs *fakeContainerSampler) Enabled() bool { - return true -} - -func (*fakeContainerSampler) NewDecorator() (metrics.ProcessDecorator, error) { - return &fakeDecorator{}, nil -} - -type fakeDecorator struct{} - -func (pd *fakeDecorator) Decorate(process *types.ProcessSample) { - process.ContainerImage = "decorated" - process.ContainerLabels = map[string]string{ - "label1": "value1", - "label2": "value2", - } -} - func BenchmarkProcessSampler(b *testing.B) { pm := NewProcessSampler(&dummyAgentContext{}) diff --git a/pkg/metrics/process/sampler_unix_test.go b/pkg/metrics/process/sampler_unix_test.go new file mode 100644 index 000000000..7247ba561 --- /dev/null +++ b/pkg/metrics/process/sampler_unix_test.go @@ -0,0 +1,26 @@ +package process + +import ( + "github.com/newrelic/infrastructure-agent/pkg/metrics" + "github.com/newrelic/infrastructure-agent/pkg/metrics/types" +) + +type fakeContainerSampler struct{} + +func (cs *fakeContainerSampler) Enabled() bool { + return true +} + +func (*fakeContainerSampler) NewDecorator() (metrics.ProcessDecorator, error) { + return &fakeDecorator{}, nil +} + +type fakeDecorator struct{} + +func (pd *fakeDecorator) Decorate(process *types.ProcessSample) { + process.ContainerImage = "decorated" + process.ContainerLabels = map[string]string{ + "label1": "value1", + "label2": "value2", + } +} diff --git a/pkg/metrics/process/snapshot.go b/pkg/metrics/process/snapshot.go new file mode 100644 index 000000000..a6a5a8144 --- /dev/null +++ b/pkg/metrics/process/snapshot.go @@ -0,0 +1,47 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +package process + +import ( + "github.com/shirou/gopsutil/process" +) + +// CPUInfo represents CPU usage statistics at a given point +type CPUInfo struct { + // Percent is the total CPU usage percent + Percent float64 + // User is the CPU user time + User float64 + // System is the CPU system time + System float64 +} + +// Snapshot represents the status of a process at a given time. Instances of Snapshot must not be +// reused for different samples +type Snapshot interface { + // Pid returns the Process ID + Pid() int32 + // Ppid returns the Parent Process ID + Ppid() int32 + // Status returns the state of the process: R (running or runnable), D (uninterruptible sleep), S (interruptible + // sleep), Z (defunct/zombie) or T (stopped) + Status() string + // Command returns the process Command name + Command() string + // CmdLine returns the process invoking command line, with or without arguments + CmdLine(withArgs bool) (string, error) + // Username returns the name of the process owner user + Username() (string, error) + // CPUTimes returns the CPU consumption percentages for the process + CPUTimes() (CPUInfo, error) + // IOCounters returns the I/O statistics for the process + IOCounters() (*process.IOCountersStat, error) + // NumThreads returns the number of threads that are being used by the process + NumThreads() int32 + // NumFDs returns the number of File Descriptors that are open by the process + NumFDs() (int32, error) + // VmRSS returns the Resident Set Size (memory in RAM) of the process + VmRSS() int64 + // VmSize returns the total memory of the process (RSS + virtual memory) + VmSize() int64 +} diff --git a/pkg/metrics/process/snapshot_darwin.go b/pkg/metrics/process/snapshot_darwin.go new file mode 100644 index 000000000..8113241ca --- /dev/null +++ b/pkg/metrics/process/snapshot_darwin.go @@ -0,0 +1,218 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +package process + +import ( + "github.com/shirou/gopsutil/process" + "os" + "runtime" + "time" +) + +// darwinProcess is an implementation of the process.Snapshot interface for darwin hosts. +type darwinProcess struct { + // if privileged == false, some operations will be avoided: FD and IO count + privileged bool + + stats procStats + process Process + lastCPU CPUInfo + lastTime time.Time + + // data that will be reused between samples of the same process + pid int32 + user string + cmdLine string +} + +// needed to calculate RSS +var pageSize int64 + +func init() { + pageSize = int64(os.Getpagesize()) + if pageSize <= 0 { + pageSize = 4096 // default value + } +} + +var _ Snapshot = (*darwinProcess)(nil) // static interface assertion + +// getDarwinProcess returns a darwin process snapshot, trying to reuse the data from a previous snapshot of the same +// process. +func getDarwinProcess(proc Process, privileged bool) (*darwinProcess, error) { + + stats, err := collectProcStats(proc) + if err != nil { + return nil, err + } + + return &darwinProcess{ + privileged: privileged, + pid: proc.ProcessId(), + process: proc, + stats: stats, + }, nil +} + +func (pw *darwinProcess) Pid() int32 { + return pw.pid +} + +func (pw *darwinProcess) Username() (string, error) { + var err error + if pw.user == "" { // caching user + pw.user, err = pw.process.Username() + if err != nil { + return "", err + } + } + return pw.user, nil +} + +func (pw *darwinProcess) IOCounters() (*process.IOCountersStat, error) { + //Not implemented in darwin yet + return nil, nil +} + +// NumFDs returns the number of file descriptors. It returns -1 (and nil error) if the Agent does not have privileges to +// access this information. +func (pw *darwinProcess) NumFDs() (int32, error) { + //Not implemented in darwin yet + return -1, nil +} + +///////////////////////////// +// Data to be derived from /proc//stat in linux systems. In darwin this structure will be populated +// if no error happens retrieving the information from process and will allow to cache some process vallues +// to avoid calling multiple times to same method +///////////////////////////// +type procStats struct { + command string + ppid int32 + numThreads int32 + state string + vmRSS int64 + vmSize int64 + cpu CPUInfo +} + +// collectProcStats will gather information about the process and will return procStats struct with the necessary information +// not to call process methods more than once per iteration. It will return error if any of the expected +// items returns an error. +func collectProcStats(p Process) (procStats, error) { + var s procStats + name, err := p.Name() + if err != nil { + return s, err + } + + var ppid int32 + var parent Process + if p.ProcessId() != 1 { + parent, err = p.Parent() + if err == nil { + ppid = parent.ProcessId() + } + } + numThreads, err := p.NumThreads() + if err != nil { + return s, err + } + status, err := p.Status() + if err != nil { + return s, err + } + memInfo, err := p.MemoryInfo() + if err != nil { + return s, err + } + cpuPercent, err := p.CPUPercent() + if err != nil { + return s, err + } + times, err := p.Times() + if err != nil { + return s, err + } + + // unit64 to int64 conversion so there are options to lose data if rss > 9,223 PetaBytes + rss := int64(memInfo.RSS) + if rss > 0 { + s.vmRSS = rss + } + vms := int64(memInfo.VMS) + if vms > 0 { + s.vmSize = vms + } + + s.command = name + s.ppid = ppid + s.numThreads = numThreads + s.state = status + s.cpu = CPUInfo{ + Percent: cpuPercent, + User: times.User, + System: times.System, + } + + return s, nil +} + +func (pw *darwinProcess) CPUTimes() (CPUInfo, error) { + now := time.Now() + + if pw.lastTime.IsZero() { + // invoked first time + pw.lastCPU = pw.stats.cpu + pw.lastTime = now + return pw.stats.cpu, nil + } + + // Calculate CPU percent from user time, system time, and last harvested cpu counters + numcpu := runtime.NumCPU() + delta := (now.Sub(pw.lastTime).Seconds()) * float64(numcpu) + pw.stats.cpu.Percent = calculatePercent(pw.lastCPU, pw.stats.cpu, delta, numcpu) + pw.lastCPU = pw.stats.cpu + pw.lastTime = now + + return pw.stats.cpu, nil +} + +func calculatePercent(t1, t2 CPUInfo, delta float64, numcpu int) float64 { + if delta <= 0 { + return 0 + } + deltaProc := t2.User + t2.System - t1.User - t1.System + overallPercent := ((deltaProc / delta) * 100) * float64(numcpu) + return overallPercent +} + +func (pw *darwinProcess) Ppid() int32 { + return pw.stats.ppid +} + +func (pw *darwinProcess) NumThreads() int32 { + return pw.stats.numThreads +} + +func (pw *darwinProcess) Status() string { + return pw.stats.state +} + +func (pw *darwinProcess) VmRSS() int64 { + return pw.stats.vmRSS +} + +func (pw *darwinProcess) VmSize() int64 { + return pw.stats.vmSize +} + +func (pw *darwinProcess) Command() string { + return pw.stats.command +} + +// CmdLine Data to be derived from /proc//cmdline in linux systems +// not supported in darwin for now +func (pw *darwinProcess) CmdLine(withArgs bool) (string, error) { + return "", nil +} diff --git a/pkg/metrics/process/snapshot_darwin_test.go b/pkg/metrics/process/snapshot_darwin_test.go new file mode 100644 index 000000000..835233139 --- /dev/null +++ b/pkg/metrics/process/snapshot_darwin_test.go @@ -0,0 +1,369 @@ +// Copyright 2020 New Relic Corporation. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +package process + +import ( + "errors" + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/process" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "testing" +) + +func Test_collectProcStats_NameError(t *testing.T) { + proc := &ProcessMock{} + expectedError := errors.New("some error") + + proc.ShouldReturnName("", expectedError) + + stats, err := collectProcStats(proc) + + assert.Equal(t, expectedError, err) + assert.Equal(t, procStats{}, stats) + //mocked objects assertions + mock.AssertExpectationsForObjects(t, proc) +} + +func Test_collectProcStats_NumThreadsError(t *testing.T) { + proc := &ProcessMock{} + expectedError := errors.New("some error") + + proc.ShouldReturnName("some name", nil) + proc.ShouldReturnProcessId(1) + proc.ShouldReturnNumThreads(0, expectedError) + + stats, err := collectProcStats(proc) + + assert.Equal(t, expectedError, err) + assert.Equal(t, procStats{}, stats) + //mocked objects assertions + mock.AssertExpectationsForObjects(t, proc) +} + +func Test_collectProcStats_StatusError(t *testing.T) { + proc := &ProcessMock{} + expectedError := errors.New("some error") + + proc.ShouldReturnName("some name", nil) + proc.ShouldReturnProcessId(1) + proc.ShouldReturnNumThreads(4, nil) + proc.ShouldReturnStatus("", expectedError) + + stats, err := collectProcStats(proc) + + assert.Equal(t, expectedError, err) + assert.Equal(t, procStats{}, stats) + //mocked objects assertions + mock.AssertExpectationsForObjects(t, proc) +} + +func Test_collectProcStats_MemoryInfoError(t *testing.T) { + proc := &ProcessMock{} + expectedError := errors.New("some error") + + proc.ShouldReturnName("some name", nil) + proc.ShouldReturnProcessId(1) + proc.ShouldReturnNumThreads(4, nil) + proc.ShouldReturnStatus("some status", nil) + proc.ShouldReturnMemoryInfo(&process.MemoryInfoStat{}, expectedError) + + stats, err := collectProcStats(proc) + + assert.Equal(t, expectedError, err) + assert.Equal(t, procStats{}, stats) + //mocked objects assertions + mock.AssertExpectationsForObjects(t, proc) +} + +func Test_collectProcStats_CpuPercentError(t *testing.T) { + proc := &ProcessMock{} + expectedError := errors.New("some error") + + proc.ShouldReturnName("some name", nil) + proc.ShouldReturnProcessId(1) + proc.ShouldReturnNumThreads(4, nil) + proc.ShouldReturnStatus("some status", nil) + proc.ShouldReturnMemoryInfo(&process.MemoryInfoStat{}, nil) + proc.ShouldReturnCPUPercent(0, expectedError) + + stats, err := collectProcStats(proc) + + assert.Equal(t, expectedError, err) + assert.Equal(t, procStats{}, stats) + //mocked objects assertions + mock.AssertExpectationsForObjects(t, proc) +} + +func Test_collectProcStats_CpuTimesError(t *testing.T) { + proc := &ProcessMock{} + expectedError := errors.New("some error") + + proc.ShouldReturnName("some name", nil) + proc.ShouldReturnProcessId(1) + proc.ShouldReturnNumThreads(4, nil) + proc.ShouldReturnStatus("some status", nil) + proc.ShouldReturnMemoryInfo(&process.MemoryInfoStat{}, nil) + proc.ShouldReturnCPUPercent(0, nil) + proc.ShouldReturnTimes(&cpu.TimesStat{}, expectedError) + + stats, err := collectProcStats(proc) + + assert.Equal(t, expectedError, err) + assert.Equal(t, procStats{}, stats) + //mocked objects assertions + mock.AssertExpectationsForObjects(t, proc) +} + +func Test_collectProcStats_NoErrorsInitProcess(t *testing.T) { + tests := []struct { + name string + command string + processId int32 + numThreads int32 + status string + memStat *process.MemoryInfoStat + cpuPercent float64 + timesStat *cpu.TimesStat + expected procStats + }{ + { + name: "invalid rss", + command: "some command", + processId: 1, + numThreads: 3, + status: "some status", + memStat: &process.MemoryInfoStat{ + RSS: 0, + VMS: 232, + }, + cpuPercent: 10, + timesStat: &cpu.TimesStat{User: 2, System: 8}, + expected: procStats{ + command: "some command", + ppid: 0, + numThreads: 3, + state: "some status", + vmRSS: 0, + vmSize: 232, + cpu: CPUInfo{ + Percent: 10, + User: 2, + System: 8, + }, + }, + }, + { + name: "invalid vmsize", + command: "some command", + processId: 1, + numThreads: 3, + status: "some status", + memStat: &process.MemoryInfoStat{ + RSS: 45, + VMS: 0, + }, + cpuPercent: 10, + timesStat: &cpu.TimesStat{User: 2, System: 8}, + expected: procStats{ + command: "some command", + ppid: 0, + numThreads: 3, + state: "some status", + vmRSS: 45, + vmSize: 0, + cpu: CPUInfo{ + Percent: 10, + User: 2, + System: 8, + }, + }, + }, + { + name: "happy path", + command: "some command", + processId: 1, + numThreads: 3, + status: "some status", + memStat: &process.MemoryInfoStat{ + RSS: 45, + VMS: 22, + }, + cpuPercent: 10, + timesStat: &cpu.TimesStat{User: 2, System: 8}, + expected: procStats{ + command: "some command", + ppid: 0, + numThreads: 3, + state: "some status", + vmRSS: 45, + vmSize: 22, + cpu: CPUInfo{ + Percent: 10, + User: 2, + System: 8, + }, + }, + }, + } + + proc := &ProcessMock{} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + proc.ShouldReturnName(tt.command, nil) + proc.ShouldReturnProcessId(tt.processId) + proc.ShouldReturnNumThreads(tt.numThreads, nil) + proc.ShouldReturnStatus(tt.status, nil) + proc.ShouldReturnMemoryInfo(tt.memStat, nil) + proc.ShouldReturnCPUPercent(tt.cpuPercent, nil) + proc.ShouldReturnTimes(tt.timesStat, nil) + + stats, err := collectProcStats(proc) + + assert.Nil(t, err) + assert.Equal(t, tt.expected, stats) + //mocked objects assertions + mock.AssertExpectationsForObjects(t, proc) + }) + } +} + +func Test_collectProcStats_NoErrorsProcessWithParent(t *testing.T) { + tests := []struct { + name string + command string + processId int32 + parentProcessId int32 + parentProcessErr error + numThreads int32 + status string + memStat *process.MemoryInfoStat + cpuPercent float64 + timesStat *cpu.TimesStat + expected procStats + }{ + { + name: "parent process ok", + command: "some command", + processId: 16, + parentProcessId: 11, + parentProcessErr: nil, + numThreads: 3, + status: "some status", + memStat: &process.MemoryInfoStat{ + RSS: 0, + VMS: 232, + }, + cpuPercent: 10, + timesStat: &cpu.TimesStat{User: 2, System: 8}, + expected: procStats{ + command: "some command", + ppid: 11, + numThreads: 3, + state: "some status", + vmRSS: 0, + vmSize: 232, + cpu: CPUInfo{ + Percent: 10, + User: 2, + System: 8, + }, + }, + }, + { + name: "error getting parent process", + command: "some command", + processId: 16, + parentProcessId: 11, + parentProcessErr: errors.New("some error"), + numThreads: 3, + status: "some status", + memStat: &process.MemoryInfoStat{ + RSS: 0, + VMS: 232, + }, + cpuPercent: 10, + timesStat: &cpu.TimesStat{User: 2, System: 8}, + expected: procStats{ + command: "some command", + ppid: 0, + numThreads: 3, + state: "some status", + vmRSS: 0, + vmSize: 232, + cpu: CPUInfo{ + Percent: 10, + User: 2, + System: 8, + }, + }, + }, + } + + parentProc := &ProcessMock{} + proc := &ProcessMock{} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + proc.ShouldReturnName(tt.command, nil) + proc.ShouldReturnParent(parentProc, tt.parentProcessErr) + if tt.parentProcessErr == nil { + parentProc.ShouldReturnProcessId(tt.parentProcessId) + } + proc.ShouldReturnProcessId(tt.processId) + proc.ShouldReturnNumThreads(tt.numThreads, nil) + proc.ShouldReturnStatus(tt.status, nil) + proc.ShouldReturnMemoryInfo(tt.memStat, nil) + proc.ShouldReturnCPUPercent(tt.cpuPercent, nil) + proc.ShouldReturnTimes(tt.timesStat, nil) + + stats, err := collectProcStats(proc) + + assert.Nil(t, err) + assert.Equal(t, tt.expected, stats) + //mocked objects assertions + mock.AssertExpectationsForObjects(t, proc) + }) + } +} + +func Test_calculatePercent(t *testing.T) { + tests := []struct { + name string + t1 CPUInfo + t2 CPUInfo + delta float64 + numcpu int + expectedPercent float64 + }{ + { + name: "delta 0", + expectedPercent: 0, + }, + { + name: "bad delta", + delta: -1, + expectedPercent: 0, + }, + { + name: "good delta", + delta: 10, + numcpu: 2, + t1: CPUInfo{ + User: 24, + System: 33, + }, + t2: CPUInfo{ + User: 42, + System: 55, + }, + expectedPercent: ((44 / 10) * 100) * 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + percent := calculatePercent(tt.t1, tt.t2, tt.delta, tt.numcpu) + assert.Equal(t, tt.expectedPercent, percent) + }) + } +} diff --git a/pkg/metrics/process/snapshot_linux.go b/pkg/metrics/process/snapshot_linux.go index 38c3695f9..51d1b0294 100644 --- a/pkg/metrics/process/snapshot_linux.go +++ b/pkg/metrics/process/snapshot_linux.go @@ -17,46 +17,6 @@ import ( "github.com/shirou/gopsutil/process" ) -// CPUInfo represents CPU usage statistics at a given point -type CPUInfo struct { - // Percent is the total CPU usage percent - Percent float64 - // User is the CPU user time - User float64 - // System is the CPU system time - System float64 -} - -// Snapshot represents the status of a process at a given time. Instances of Snapshot must not be -// reused for different samples -type Snapshot interface { - // Pid returns the Process ID - Pid() int32 - // Ppid returns the Parent Process ID - Ppid() int32 - // Status returns the state of the process: R (running or runnable), D (uninterruptible sleep), S (interruptible - // sleep), Z (defunct/zombie) or T (stopped) - Status() string - // Command returns the process Command name - Command() string - // CmdLine returns the process invoking command line, with or without arguments - CmdLine(withArgs bool) (string, error) - // Username returns the name of the process owner user - Username() (string, error) - // CPUTimes returns the CPU consumption percentages for the process - CPUTimes() (CPUInfo, error) - // IOCounters returns the I/O statistics for the process - IOCounters() (*process.IOCountersStat, error) - // NumThreads returns the number of threads that are being used by the process - NumThreads() int32 - // NumFDs returns the number of File Descriptors that are open by the process - NumFDs() (int32, error) - // VmRSS returns the Resident Set Size (memory in RAM) of the process - VmRSS() int64 - // VmSize returns the total memory of the process (RSS + virtual memory) - VmSize() int64 -} - // linuxProcess is an implementation of the process.Snapshot interface for linux hosts. It is designed to be highly // optimized and avoid unnecessary/duplicated system calls type linuxProcess struct { diff --git a/pkg/metrics/process/snapshot_mock_test.go b/pkg/metrics/process/snapshot_mock_test.go new file mode 100644 index 000000000..a8f4356d5 --- /dev/null +++ b/pkg/metrics/process/snapshot_mock_test.go @@ -0,0 +1,166 @@ +package process + +import ( + "github.com/shirou/gopsutil/process" + "github.com/stretchr/testify/mock" +) + +type SnapshotMock struct { + mock.Mock +} + +func (s *SnapshotMock) Pid() int32 { + args := s.Called() + + return args.Get(0).(int32) +} + +func (s *SnapshotMock) ShouldReturnPid(pid int32) { + s. + On("Pid"). + Once(). + Return(pid) +} + +func (s *SnapshotMock) Ppid() int32 { + args := s.Called() + + return args.Get(0).(int32) +} + +func (s *SnapshotMock) ShouldReturnPpid(ppid int32) { + s. + On("Ppid"). + Once(). + Return(ppid) +} + +func (s *SnapshotMock) Status() string { + args := s.Called() + + return args.String(0) +} + +func (s *SnapshotMock) ShouldReturnStatus(status string) { + s. + On("Status"). + Once(). + Return(status) +} + +func (s *SnapshotMock) Command() string { + args := s.Called() + + return args.String(0) +} + +func (s *SnapshotMock) ShouldReturnCommand(command string) { + s. + On("Command"). + Once(). + Return(command) +} + +func (s *SnapshotMock) CmdLine(withArgs bool) (string, error) { + args := s.Called(withArgs) + + return args.String(0), args.Error(1) +} + +func (s *SnapshotMock) ShouldReturnCmdLine(withArgs bool, cmd string, err error) { + s. + On("CmdLine", withArgs). + Once(). + Return(cmd, err) +} + +func (s *SnapshotMock) Username() (string, error) { + args := s.Called() + + return args.String(0), args.Error(1) +} + +func (s *SnapshotMock) ShouldReturnUsername(cmd string, err error) { + s. + On("Username"). + Once(). + Return(cmd, err) +} + +func (s *SnapshotMock) CPUTimes() (CPUInfo, error) { + args := s.Called() + + return args.Get(0).(CPUInfo), args.Error(1) +} + +func (s *SnapshotMock) ShouldReturnCPUTimes(cpu CPUInfo, err error) { + s. + On("CPUTimes"). + Once(). + Return(cpu, err) +} + +func (s *SnapshotMock) IOCounters() (*process.IOCountersStat, error) { + args := s.Called() + + return args.Get(0).(*process.IOCountersStat), args.Error(1) +} + +func (s *SnapshotMock) ShouldReturnIOCounters(io *process.IOCountersStat, err error) { + s. + On("IOCounters"). + Once(). + Return(io, err) +} + +func (s *SnapshotMock) NumThreads() int32 { + args := s.Called() + + return args.Get(0).(int32) +} + +func (s *SnapshotMock) ShouldReturnNumThreads(num int32) { + s. + On("NumThreads"). + Once(). + Return(num) +} + +func (s *SnapshotMock) NumFDs() (int32, error) { + args := s.Called() + + return args.Get(0).(int32), args.Error(1) +} + +func (s *SnapshotMock) ShouldReturnNumFDs(num int32, err error) { + s. + On("NumFDs"). + Once(). + Return(num, err) +} + +func (s *SnapshotMock) VmRSS() int64 { + args := s.Called() + + return args.Get(0).(int64) +} + +func (s *SnapshotMock) ShouldReturnVmRSS(rss int64) { + s. + On("VmRSS"). + Once(). + Return(rss) +} + +func (s *SnapshotMock) VmSize() int64 { + args := s.Called() + + return args.Get(0).(int64) +} + +func (s *SnapshotMock) ShouldReturnVmSize(size int64) { + s. + On("VmSize"). + Once(). + Return(size) +} diff --git a/pkg/plugins/plugins_darwin.go b/pkg/plugins/plugins_darwin.go index 4f1c32e4a..53a83e131 100644 --- a/pkg/plugins/plugins_darwin.go +++ b/pkg/plugins/plugins_darwin.go @@ -7,6 +7,7 @@ import ( "github.com/newrelic/infrastructure-agent/internal/plugins/darwin" "github.com/newrelic/infrastructure-agent/pkg/metrics" "github.com/newrelic/infrastructure-agent/pkg/metrics/network" + "github.com/newrelic/infrastructure-agent/pkg/metrics/process" metricsSender "github.com/newrelic/infrastructure-agent/pkg/metrics/sender" "github.com/newrelic/infrastructure-agent/pkg/metrics/storage" "github.com/newrelic/infrastructure-agent/pkg/plugins/ids" @@ -29,7 +30,7 @@ func RegisterPlugins(a *agent.Agent) error { } sender := metricsSender.NewSender(a.Context) - //procSampler := process.NewProcessSampler(a.Context) + procSampler := process.NewProcessSampler(a.Context) storageSampler := storage.NewSampler(a.Context) //nfsSampler := nfs.NewSampler(a.Context) networkSampler := network.NewNetworkSampler(a.Context) @@ -39,7 +40,7 @@ func RegisterPlugins(a *agent.Agent) error { sender.RegisterSampler(storageSampler) //sender.RegisterSampler(nfsSampler) sender.RegisterSampler(networkSampler) - //sender.RegisterSampler(procSampler) + sender.RegisterSampler(procSampler) a.RegisterMetricsSender(sender)