diff --git a/sdk/config_helpers_test.go b/sdk/config_helpers_test.go index 4cae45d43..086a49243 100644 --- a/sdk/config_helpers_test.go +++ b/sdk/config_helpers_test.go @@ -2104,7 +2104,7 @@ func TestSslDirectives(t *testing.T) { }` // preparing test cases as well as expected results - var tests = []struct { + tests := []struct { algoName string config string expected struct { @@ -2251,6 +2251,5 @@ func TestSslDirectives(t *testing.T) { assert.Equal(t, certMeta.PublicKeyAlgorithm, cert.PublicKeyAlgorithm) } }) - } } diff --git a/src/core/environment.go b/src/core/environment.go index 8cfab0119..b80a0112d 100644 --- a/src/core/environment.go +++ b/src/core/environment.go @@ -30,6 +30,7 @@ import ( "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/process" log "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" "golang.org/x/sys/unix" "github.com/nginx/agent/sdk/v2/files" @@ -52,7 +53,9 @@ type Environment interface { DeleteFile(backup ConfigApplyMarker, fileName string) error Processes() (result []*Process) FileStat(path string) (os.FileInfo, error) + Disks() ([]*proto.DiskPartition, error) DiskDevices() ([]string, error) + DiskUsage(mountPoint string) (*DiskUsage, error) GetContainerID() (string, error) GetNetOverflow() (float64, error) IsContainer() bool @@ -80,6 +83,13 @@ type Process struct { Command string } +type DiskUsage struct { + Total float64 + Used float64 + Free float64 + UsedPercentage float64 +} + const ( lengthOfContainerId = 64 versionId = "VERSION_ID" @@ -91,16 +101,19 @@ const ( KernProc = 14 // struct: process entries KernProcPathname = 12 // path to executable SYS_SYSCTL = 202 + IsContainerKey = "isContainer" + GetContainerIDKey = "GetContainerID" + GetSystemUUIDKey = "GetSystemUUIDKey" ) var ( - virtualizationFunc = host.VirtualizationWithContext - _ Environment = &EnvironmentType{} - basePattern = regexp.MustCompile("/([a-f0-9]{64})$") - colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") - scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) - containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") - containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") + virtualizationFunc = host.VirtualizationWithContext + singleflightGroup = &singleflight.Group{} + basePattern = regexp.MustCompile("/([a-f0-9]{64})$") + colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") + scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) + containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") + containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") ) func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { @@ -123,6 +136,12 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer tags = &[]string{} } + disks, err := env.Disks() + if err != nil { + log.Warnf("Unable to get disks information from the host: %v", err) + disks = nil + } + hostInfoFacacde := &proto.HostInfo{ Agent: agentVersion, Boot: hostInformation.BootTime, @@ -131,7 +150,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer OsType: hostInformation.OS, Uuid: env.GetSystemUUID(), Uname: getUnixName(), - Partitons: diskPartitions(), + Partitons: disks, Network: env.networks(), Processor: processors(hostInformation.KernelArch), Release: releaseInfo("/etc/os-release"), @@ -194,24 +213,32 @@ func (env *EnvironmentType) GetHostname() string { } func (env *EnvironmentType) GetSystemUUID() string { - ctx := context.Background() - defer ctx.Done() + res, err, _ := singleflightGroup.Do(GetSystemUUIDKey, func() (interface{}, error) { + var err error + ctx := context.Background() + defer ctx.Done() - if env.IsContainer() { - containerID, err := env.GetContainerID() - if err != nil { - log.Errorf("Unable to read docker container ID: %v", err) - return "" + if env.IsContainer() { + containerID, err := env.GetContainerID() + if err != nil { + return "", fmt.Errorf("unable to read docker container ID: %v", err) + } + return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String(), err } - return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String() - } - hostInfo, err := host.InfoWithContext(ctx) + hostID, err := host.HostIDWithContext(ctx) + if err != nil { + log.Warnf("Unable to read host id from dataplane, defaulting value. Error: %v", err) + return "", err + } + return uuid.NewMD5(uuid.Nil, []byte(hostID)).String(), err + }) if err != nil { - log.Infof("Unable to read host id from dataplane, defaulting value. Error: %v", err) + log.Warnf("Unable to set hostname due to %v", err) return "" } - return uuid.NewMD5(uuid.Nil, []byte(hostInfo.HostID)).String() + + return res.(string) } func (env *EnvironmentType) ReadDirectory(dir string, ext string) ([]string, error) { @@ -307,18 +334,25 @@ func (env *EnvironmentType) IsContainer() bool { k8sServiceAcct = "/var/run/secrets/kubernetes.io/serviceaccount" ) - for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { - if _, err := os.Stat(filename); err == nil { - log.Debugf("is a container because (%s) exists", filename) - return true + res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) { + for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { + if _, err := os.Stat(filename); err == nil { + log.Debugf("Is a container because (%s) exists", filename) + return true, nil + } } - } - // v1 check - if result, err := cGroupV1Check(selfCgroup); err == nil && result { - return result + // v1 check + if result, err := cGroupV1Check(selfCgroup); err == nil && result { + return result, err + } + return false, nil + }) + + if err != nil { + log.Warnf("Unable to retrieve values from cache (%v)", err) } - return false + return res.(bool) } // cGroupV1Check returns if running cgroup v1 @@ -345,20 +379,24 @@ func cGroupV1Check(cgroupFile string) (bool, error) { // GetContainerID returns the ID of the current environment if running in a container func (env *EnvironmentType) GetContainerID() (string, error) { - const mountInfo = "/proc/self/mountinfo" + res, err, _ := singleflightGroup.Do(GetContainerIDKey, func() (interface{}, error) { + const mountInfo = "/proc/self/mountinfo" - if !env.IsContainer() { - return "", errors.New("not in docker") - } + if !env.IsContainer() { + return "", errors.New("not in docker") + } - containerID, err := getContainerID(mountInfo) - if err != nil { - return "", fmt.Errorf("could not get container ID: %v", err) - } + containerID, err := getContainerID(mountInfo) + if err != nil { + return "", fmt.Errorf("could not get container ID: %v", err) + } + + log.Debugf("Container ID: %s", containerID) - log.Debugf("Container ID: %s", containerID) + return containerID, err + }) - return containerID, err + return res.(string), err } // getContainerID returns the container ID of the current running environment. @@ -428,6 +466,42 @@ func (env *EnvironmentType) DiskDevices() ([]string, error) { } } +func (env *EnvironmentType) Disks() (partitions []*proto.DiskPartition, err error) { + ctx := context.Background() + defer ctx.Done() + parts, err := disk.PartitionsWithContext(ctx, false) + if err != nil { + // return an array of 0 + log.Errorf("Could not read disk partitions for host: %v", err) + return []*proto.DiskPartition{}, err + } + for _, part := range parts { + pm := proto.DiskPartition{ + MountPoint: part.Mountpoint, + Device: part.Device, + FsType: part.Fstype, + } + partitions = append(partitions, &pm) + } + return partitions, nil +} + +func (env *EnvironmentType) DiskUsage(mountPoint string) (*DiskUsage, error) { + ctx := context.Background() + defer ctx.Done() + usage, err := disk.UsageWithContext(ctx, mountPoint) + if err != nil { + return nil, errors.New("unable to obtain disk usage stats") + } + + return &DiskUsage{ + Total: float64(usage.Total), + Used: float64(usage.Used), + Free: float64(usage.Free), + UsedPercentage: float64(usage.UsedPercent), + }, nil +} + func (env *EnvironmentType) GetNetOverflow() (float64, error) { return network.GetNetOverflow() } @@ -795,26 +869,6 @@ func virtualization() (string, string) { return virtualizationSystem, virtualizationRole } -func diskPartitions() (partitions []*proto.DiskPartition) { - ctx := context.Background() - defer ctx.Done() - parts, err := disk.PartitionsWithContext(ctx, false) - if err != nil { - // return an array of 0 - log.Errorf("Could not read disk partitions for host: %v", err) - return []*proto.DiskPartition{} - } - for _, part := range parts { - pm := proto.DiskPartition{ - MountPoint: part.Mountpoint, - Device: part.Device, - FsType: part.Fstype, - } - partitions = append(partitions, &pm) - } - return partitions -} - func releaseInfo(osReleaseFile string) (release *proto.ReleaseInfo) { hostReleaseInfo := getHostReleaseInfo() osRelease, err := getOsRelease(osReleaseFile) diff --git a/src/core/fake_environment_test.go b/src/core/fake_environment_test.go index 02b4da669..4de09b7a2 100644 --- a/src/core/fake_environment_test.go +++ b/src/core/fake_environment_test.go @@ -33,6 +33,31 @@ type FakeEnvironment struct { result1 []string result2 error } + DiskUsageStub func(string) (*DiskUsage, error) + diskUsageMutex sync.RWMutex + diskUsageArgsForCall []struct { + arg1 string + } + diskUsageReturns struct { + result1 *DiskUsage + result2 error + } + diskUsageReturnsOnCall map[int]struct { + result1 *DiskUsage + result2 error + } + DisksStub func() ([]*proto.DiskPartition, error) + disksMutex sync.RWMutex + disksArgsForCall []struct { + } + disksReturns struct { + result1 []*proto.DiskPartition + result2 error + } + disksReturnsOnCall map[int]struct { + result1 []*proto.DiskPartition + result2 error + } FileStatStub func(string) (fs.FileInfo, error) fileStatMutex sync.RWMutex fileStatArgsForCall []struct { @@ -287,6 +312,126 @@ func (fake *FakeEnvironment) DiskDevicesReturnsOnCall(i int, result1 []string, r }{result1, result2} } +func (fake *FakeEnvironment) DiskUsage(arg1 string) (*DiskUsage, error) { + fake.diskUsageMutex.Lock() + ret, specificReturn := fake.diskUsageReturnsOnCall[len(fake.diskUsageArgsForCall)] + fake.diskUsageArgsForCall = append(fake.diskUsageArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.DiskUsageStub + fakeReturns := fake.diskUsageReturns + fake.recordInvocation("DiskUsage", []interface{}{arg1}) + fake.diskUsageMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeEnvironment) DiskUsageCallCount() int { + fake.diskUsageMutex.RLock() + defer fake.diskUsageMutex.RUnlock() + return len(fake.diskUsageArgsForCall) +} + +func (fake *FakeEnvironment) DiskUsageCalls(stub func(string) (*DiskUsage, error)) { + fake.diskUsageMutex.Lock() + defer fake.diskUsageMutex.Unlock() + fake.DiskUsageStub = stub +} + +func (fake *FakeEnvironment) DiskUsageArgsForCall(i int) string { + fake.diskUsageMutex.RLock() + defer fake.diskUsageMutex.RUnlock() + argsForCall := fake.diskUsageArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeEnvironment) DiskUsageReturns(result1 *DiskUsage, result2 error) { + fake.diskUsageMutex.Lock() + defer fake.diskUsageMutex.Unlock() + fake.DiskUsageStub = nil + fake.diskUsageReturns = struct { + result1 *DiskUsage + result2 error + }{result1, result2} +} + +func (fake *FakeEnvironment) DiskUsageReturnsOnCall(i int, result1 *DiskUsage, result2 error) { + fake.diskUsageMutex.Lock() + defer fake.diskUsageMutex.Unlock() + fake.DiskUsageStub = nil + if fake.diskUsageReturnsOnCall == nil { + fake.diskUsageReturnsOnCall = make(map[int]struct { + result1 *DiskUsage + result2 error + }) + } + fake.diskUsageReturnsOnCall[i] = struct { + result1 *DiskUsage + result2 error + }{result1, result2} +} + +func (fake *FakeEnvironment) Disks() ([]*proto.DiskPartition, error) { + fake.disksMutex.Lock() + ret, specificReturn := fake.disksReturnsOnCall[len(fake.disksArgsForCall)] + fake.disksArgsForCall = append(fake.disksArgsForCall, struct { + }{}) + stub := fake.DisksStub + fakeReturns := fake.disksReturns + fake.recordInvocation("Disks", []interface{}{}) + fake.disksMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeEnvironment) DisksCallCount() int { + fake.disksMutex.RLock() + defer fake.disksMutex.RUnlock() + return len(fake.disksArgsForCall) +} + +func (fake *FakeEnvironment) DisksCalls(stub func() ([]*proto.DiskPartition, error)) { + fake.disksMutex.Lock() + defer fake.disksMutex.Unlock() + fake.DisksStub = stub +} + +func (fake *FakeEnvironment) DisksReturns(result1 []*proto.DiskPartition, result2 error) { + fake.disksMutex.Lock() + defer fake.disksMutex.Unlock() + fake.DisksStub = nil + fake.disksReturns = struct { + result1 []*proto.DiskPartition + result2 error + }{result1, result2} +} + +func (fake *FakeEnvironment) DisksReturnsOnCall(i int, result1 []*proto.DiskPartition, result2 error) { + fake.disksMutex.Lock() + defer fake.disksMutex.Unlock() + fake.DisksStub = nil + if fake.disksReturnsOnCall == nil { + fake.disksReturnsOnCall = make(map[int]struct { + result1 []*proto.DiskPartition + result2 error + }) + } + fake.disksReturnsOnCall[i] = struct { + result1 []*proto.DiskPartition + result2 error + }{result1, result2} +} + func (fake *FakeEnvironment) FileStat(arg1 string) (fs.FileInfo, error) { fake.fileStatMutex.Lock() ret, specificReturn := fake.fileStatReturnsOnCall[len(fake.fileStatArgsForCall)] @@ -943,6 +1088,10 @@ func (fake *FakeEnvironment) Invocations() map[string][][]interface{} { defer fake.deleteFileMutex.RUnlock() fake.diskDevicesMutex.RLock() defer fake.diskDevicesMutex.RUnlock() + fake.diskUsageMutex.RLock() + defer fake.diskUsageMutex.RUnlock() + fake.disksMutex.RLock() + defer fake.disksMutex.RUnlock() fake.fileStatMutex.RLock() defer fake.fileStatMutex.RUnlock() fake.getContainerIDMutex.RLock() diff --git a/src/core/metrics/collectors/system.go b/src/core/metrics/collectors/system.go index 0aa1060b3..a92c4a7b8 100644 --- a/src/core/metrics/collectors/system.go +++ b/src/core/metrics/collectors/system.go @@ -40,7 +40,7 @@ func NewSystemCollector(env core.Environment, conf *config.Config) *SystemCollec systemSources = []metrics.Source{ sources.NewVirtualMemorySource(sources.SystemNamespace, env), sources.NewCPUTimesSource(sources.SystemNamespace, env), - sources.NewDiskSource(sources.SystemNamespace), + sources.NewDiskSource(sources.SystemNamespace, env), sources.NewDiskIOSource(sources.SystemNamespace, env), sources.NewNetIOSource(sources.SystemNamespace, env), sources.NewLoadSource(sources.SystemNamespace), diff --git a/src/core/metrics/sources/disk.go b/src/core/metrics/sources/disk.go index 29b788953..472133097 100644 --- a/src/core/metrics/sources/disk.go +++ b/src/core/metrics/sources/disk.go @@ -13,8 +13,8 @@ import ( "sync" "github.com/nginx/agent/sdk/v2/proto" + "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - "github.com/shirou/gopsutil/v3/disk" ) const MOUNT_POINT = "mount_point" @@ -22,25 +22,24 @@ const MOUNT_POINT = "mount_point" type Disk struct { logger *MetricSourceLogger *namedMetric - disks []disk.PartitionStat + disks []*proto.DiskPartition + env core.Environment } -func NewDiskSource(namespace string) *Disk { - ctx := context.Background() - defer ctx.Done() - disks, _ := disk.PartitionsWithContext(ctx, false) - return &Disk{NewMetricSourceLogger(), &namedMetric{namespace, "disk"}, disks} +func NewDiskSource(namespace string, env core.Environment) *Disk { + disks, _ := env.Disks() + return &Disk{NewMetricSourceLogger(), &namedMetric{namespace, "disk"}, disks, env} } func (c *Disk) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) { defer wg.Done() for _, part := range c.disks { - if part.Device == "" || part.Fstype == "" { + if part.Device == "" || part.FsType == "" { continue } - usage, err := disk.UsageWithContext(ctx, part.Mountpoint) + usage, err := c.env.DiskUsage(part.MountPoint) if err != nil { - c.logger.Log(fmt.Sprintf("Failed to get disk metrics for mount point %s, %v", part.Mountpoint, err)) + c.logger.Log(fmt.Sprintf("Failed to get disk metrics for mount point %s, %v", part.MountPoint, err)) continue } @@ -48,14 +47,14 @@ func (c *Disk) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metric "total": float64(usage.Total), "used": float64(usage.Used), "free": float64(usage.Free), - "in_use": float64(usage.UsedPercent), + "in_use": float64(usage.UsedPercentage), }) select { case <-ctx.Done(): return // mount point is not a common dim - case m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{{Name: MOUNT_POINT, Value: part.Mountpoint}}, simpleMetrics, proto.MetricsReport_SYSTEM): + case m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{{Name: MOUNT_POINT, Value: part.MountPoint}}, simpleMetrics, proto.MetricsReport_SYSTEM): } } } diff --git a/src/core/metrics/sources/disk_test.go b/src/core/metrics/sources/disk_test.go index 44a54bfc6..9e0047c03 100644 --- a/src/core/metrics/sources/disk_test.go +++ b/src/core/metrics/sources/disk_test.go @@ -14,22 +14,25 @@ import ( "testing" "github.com/nginx/agent/v2/src/core/metrics" + tutils "github.com/nginx/agent/v2/test/utils" "github.com/stretchr/testify/assert" ) func TestNewDiskSource(t *testing.T) { namespace := "test" - actual := NewDiskSource(namespace) + env := tutils.GetMockEnv() + actual := NewDiskSource(namespace, env) assert.Equal(t, "disk", actual.group) assert.Equal(t, namespace, actual.namespace) - assert.Greater(t, len(actual.disks), 1) + assert.Equal(t, len(actual.disks), 2) } func TestDiskCollect(t *testing.T) { namespace := "test" - disk := NewDiskSource(namespace) + env := tutils.GetMockEnv() + disk := NewDiskSource(namespace, env) ctx := context.TODO() wg := &sync.WaitGroup{} diff --git a/src/core/nginx.go b/src/core/nginx.go index 04dcc944e..f29f410ee 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -131,7 +131,6 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []*Proc n.nginxWorkersMap[nginxDetails.GetNginxId()] = append(n.nginxWorkersMap[nginxDetails.GetNginxId()], nginxDetails) } } - } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index d1aef6ae7..d9f14f549 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -210,7 +210,7 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send - hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, true) + hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil } diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go index 8cfab0119..b80a0112d 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -30,6 +30,7 @@ import ( "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/process" log "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" "golang.org/x/sys/unix" "github.com/nginx/agent/sdk/v2/files" @@ -52,7 +53,9 @@ type Environment interface { DeleteFile(backup ConfigApplyMarker, fileName string) error Processes() (result []*Process) FileStat(path string) (os.FileInfo, error) + Disks() ([]*proto.DiskPartition, error) DiskDevices() ([]string, error) + DiskUsage(mountPoint string) (*DiskUsage, error) GetContainerID() (string, error) GetNetOverflow() (float64, error) IsContainer() bool @@ -80,6 +83,13 @@ type Process struct { Command string } +type DiskUsage struct { + Total float64 + Used float64 + Free float64 + UsedPercentage float64 +} + const ( lengthOfContainerId = 64 versionId = "VERSION_ID" @@ -91,16 +101,19 @@ const ( KernProc = 14 // struct: process entries KernProcPathname = 12 // path to executable SYS_SYSCTL = 202 + IsContainerKey = "isContainer" + GetContainerIDKey = "GetContainerID" + GetSystemUUIDKey = "GetSystemUUIDKey" ) var ( - virtualizationFunc = host.VirtualizationWithContext - _ Environment = &EnvironmentType{} - basePattern = regexp.MustCompile("/([a-f0-9]{64})$") - colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") - scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) - containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") - containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") + virtualizationFunc = host.VirtualizationWithContext + singleflightGroup = &singleflight.Group{} + basePattern = regexp.MustCompile("/([a-f0-9]{64})$") + colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") + scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) + containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") + containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") ) func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { @@ -123,6 +136,12 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer tags = &[]string{} } + disks, err := env.Disks() + if err != nil { + log.Warnf("Unable to get disks information from the host: %v", err) + disks = nil + } + hostInfoFacacde := &proto.HostInfo{ Agent: agentVersion, Boot: hostInformation.BootTime, @@ -131,7 +150,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer OsType: hostInformation.OS, Uuid: env.GetSystemUUID(), Uname: getUnixName(), - Partitons: diskPartitions(), + Partitons: disks, Network: env.networks(), Processor: processors(hostInformation.KernelArch), Release: releaseInfo("/etc/os-release"), @@ -194,24 +213,32 @@ func (env *EnvironmentType) GetHostname() string { } func (env *EnvironmentType) GetSystemUUID() string { - ctx := context.Background() - defer ctx.Done() + res, err, _ := singleflightGroup.Do(GetSystemUUIDKey, func() (interface{}, error) { + var err error + ctx := context.Background() + defer ctx.Done() - if env.IsContainer() { - containerID, err := env.GetContainerID() - if err != nil { - log.Errorf("Unable to read docker container ID: %v", err) - return "" + if env.IsContainer() { + containerID, err := env.GetContainerID() + if err != nil { + return "", fmt.Errorf("unable to read docker container ID: %v", err) + } + return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String(), err } - return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String() - } - hostInfo, err := host.InfoWithContext(ctx) + hostID, err := host.HostIDWithContext(ctx) + if err != nil { + log.Warnf("Unable to read host id from dataplane, defaulting value. Error: %v", err) + return "", err + } + return uuid.NewMD5(uuid.Nil, []byte(hostID)).String(), err + }) if err != nil { - log.Infof("Unable to read host id from dataplane, defaulting value. Error: %v", err) + log.Warnf("Unable to set hostname due to %v", err) return "" } - return uuid.NewMD5(uuid.Nil, []byte(hostInfo.HostID)).String() + + return res.(string) } func (env *EnvironmentType) ReadDirectory(dir string, ext string) ([]string, error) { @@ -307,18 +334,25 @@ func (env *EnvironmentType) IsContainer() bool { k8sServiceAcct = "/var/run/secrets/kubernetes.io/serviceaccount" ) - for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { - if _, err := os.Stat(filename); err == nil { - log.Debugf("is a container because (%s) exists", filename) - return true + res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) { + for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { + if _, err := os.Stat(filename); err == nil { + log.Debugf("Is a container because (%s) exists", filename) + return true, nil + } } - } - // v1 check - if result, err := cGroupV1Check(selfCgroup); err == nil && result { - return result + // v1 check + if result, err := cGroupV1Check(selfCgroup); err == nil && result { + return result, err + } + return false, nil + }) + + if err != nil { + log.Warnf("Unable to retrieve values from cache (%v)", err) } - return false + return res.(bool) } // cGroupV1Check returns if running cgroup v1 @@ -345,20 +379,24 @@ func cGroupV1Check(cgroupFile string) (bool, error) { // GetContainerID returns the ID of the current environment if running in a container func (env *EnvironmentType) GetContainerID() (string, error) { - const mountInfo = "/proc/self/mountinfo" + res, err, _ := singleflightGroup.Do(GetContainerIDKey, func() (interface{}, error) { + const mountInfo = "/proc/self/mountinfo" - if !env.IsContainer() { - return "", errors.New("not in docker") - } + if !env.IsContainer() { + return "", errors.New("not in docker") + } - containerID, err := getContainerID(mountInfo) - if err != nil { - return "", fmt.Errorf("could not get container ID: %v", err) - } + containerID, err := getContainerID(mountInfo) + if err != nil { + return "", fmt.Errorf("could not get container ID: %v", err) + } + + log.Debugf("Container ID: %s", containerID) - log.Debugf("Container ID: %s", containerID) + return containerID, err + }) - return containerID, err + return res.(string), err } // getContainerID returns the container ID of the current running environment. @@ -428,6 +466,42 @@ func (env *EnvironmentType) DiskDevices() ([]string, error) { } } +func (env *EnvironmentType) Disks() (partitions []*proto.DiskPartition, err error) { + ctx := context.Background() + defer ctx.Done() + parts, err := disk.PartitionsWithContext(ctx, false) + if err != nil { + // return an array of 0 + log.Errorf("Could not read disk partitions for host: %v", err) + return []*proto.DiskPartition{}, err + } + for _, part := range parts { + pm := proto.DiskPartition{ + MountPoint: part.Mountpoint, + Device: part.Device, + FsType: part.Fstype, + } + partitions = append(partitions, &pm) + } + return partitions, nil +} + +func (env *EnvironmentType) DiskUsage(mountPoint string) (*DiskUsage, error) { + ctx := context.Background() + defer ctx.Done() + usage, err := disk.UsageWithContext(ctx, mountPoint) + if err != nil { + return nil, errors.New("unable to obtain disk usage stats") + } + + return &DiskUsage{ + Total: float64(usage.Total), + Used: float64(usage.Used), + Free: float64(usage.Free), + UsedPercentage: float64(usage.UsedPercent), + }, nil +} + func (env *EnvironmentType) GetNetOverflow() (float64, error) { return network.GetNetOverflow() } @@ -795,26 +869,6 @@ func virtualization() (string, string) { return virtualizationSystem, virtualizationRole } -func diskPartitions() (partitions []*proto.DiskPartition) { - ctx := context.Background() - defer ctx.Done() - parts, err := disk.PartitionsWithContext(ctx, false) - if err != nil { - // return an array of 0 - log.Errorf("Could not read disk partitions for host: %v", err) - return []*proto.DiskPartition{} - } - for _, part := range parts { - pm := proto.DiskPartition{ - MountPoint: part.Mountpoint, - Device: part.Device, - FsType: part.Fstype, - } - partitions = append(partitions, &pm) - } - return partitions -} - func releaseInfo(osReleaseFile string) (release *proto.ReleaseInfo) { hostReleaseInfo := getHostReleaseInfo() osRelease, err := getOsRelease(osReleaseFile) diff --git a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 04dcc944e..f29f410ee 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -131,7 +131,6 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []*Proc n.nginxWorkersMap[nginxDetails.GetNginxId()] = append(n.nginxWorkersMap[nginxDetails.GetNginxId()], nginxDetails) } } - } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { diff --git a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/environment.go b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/environment.go index da6787a24..2039a2ce2 100644 --- a/test/integration/vendor/github.com/nginx/agent/v2/test/utils/environment.go +++ b/test/integration/vendor/github.com/nginx/agent/v2/test/utils/environment.go @@ -12,6 +12,31 @@ type MockEnvironment struct { mock.Mock } +// Disks implements core.Environment. +func (*MockEnvironment) Disks() ([]*proto.DiskPartition, error) { + return []*proto.DiskPartition{ + { + Device: "sd01", + MountPoint: "/sd01", + FsType: "ext4", + }, + { + Device: "sd02", + MountPoint: "/sd02", + FsType: "ext4", + }, + }, nil +} + +func (*MockEnvironment) DiskUsage(mountpoint string) (*core.DiskUsage, error) { + return &core.DiskUsage{ + Total: 20, + Used: 10, + Free: 10, + UsedPercentage: 100, + }, nil +} + func GetProcesses() []*core.Process { return []*core.Process{ {Pid: 1, Name: "12345", IsMaster: true}, diff --git a/test/integration/vendor/golang.org/x/sync/singleflight/singleflight.go b/test/integration/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 000000000..8473fb792 --- /dev/null +++ b/test/integration/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,205 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + g.mu.Lock() + defer g.mu.Unlock() + c.wg.Done() + if g.m[key] == c { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/test/integration/vendor/modules.txt b/test/integration/vendor/modules.txt index 90eb66298..4d683e6a4 100644 --- a/test/integration/vendor/modules.txt +++ b/test/integration/vendor/modules.txt @@ -978,6 +978,7 @@ golang.org/x/oauth2/internal ## explicit; go 1.17 golang.org/x/sync/errgroup golang.org/x/sync/semaphore +golang.org/x/sync/singleflight # golang.org/x/sys v0.11.0 ## explicit; go 1.17 golang.org/x/sys/cpu diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go index 8cfab0119..b80a0112d 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/environment.go @@ -30,6 +30,7 @@ import ( "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/process" log "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" "golang.org/x/sys/unix" "github.com/nginx/agent/sdk/v2/files" @@ -52,7 +53,9 @@ type Environment interface { DeleteFile(backup ConfigApplyMarker, fileName string) error Processes() (result []*Process) FileStat(path string) (os.FileInfo, error) + Disks() ([]*proto.DiskPartition, error) DiskDevices() ([]string, error) + DiskUsage(mountPoint string) (*DiskUsage, error) GetContainerID() (string, error) GetNetOverflow() (float64, error) IsContainer() bool @@ -80,6 +83,13 @@ type Process struct { Command string } +type DiskUsage struct { + Total float64 + Used float64 + Free float64 + UsedPercentage float64 +} + const ( lengthOfContainerId = 64 versionId = "VERSION_ID" @@ -91,16 +101,19 @@ const ( KernProc = 14 // struct: process entries KernProcPathname = 12 // path to executable SYS_SYSCTL = 202 + IsContainerKey = "isContainer" + GetContainerIDKey = "GetContainerID" + GetSystemUUIDKey = "GetSystemUUIDKey" ) var ( - virtualizationFunc = host.VirtualizationWithContext - _ Environment = &EnvironmentType{} - basePattern = regexp.MustCompile("/([a-f0-9]{64})$") - colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") - scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) - containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") - containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") + virtualizationFunc = host.VirtualizationWithContext + singleflightGroup = &singleflight.Group{} + basePattern = regexp.MustCompile("/([a-f0-9]{64})$") + colonPattern = regexp.MustCompile(":([a-f0-9]{64})$") + scopePattern = regexp.MustCompile(`/.+-(.+?).scope$`) + containersPattern = regexp.MustCompile("containers/([a-f0-9]{64})") + containerdPattern = regexp.MustCompile("sandboxes/([a-f0-9]{64})") ) func (env *EnvironmentType) NewHostInfo(agentVersion string, tags *[]string, configDirs string, clearCache bool) *proto.HostInfo { @@ -123,6 +136,12 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer tags = &[]string{} } + disks, err := env.Disks() + if err != nil { + log.Warnf("Unable to get disks information from the host: %v", err) + disks = nil + } + hostInfoFacacde := &proto.HostInfo{ Agent: agentVersion, Boot: hostInformation.BootTime, @@ -131,7 +150,7 @@ func (env *EnvironmentType) NewHostInfoWithContext(ctx context.Context, agentVer OsType: hostInformation.OS, Uuid: env.GetSystemUUID(), Uname: getUnixName(), - Partitons: diskPartitions(), + Partitons: disks, Network: env.networks(), Processor: processors(hostInformation.KernelArch), Release: releaseInfo("/etc/os-release"), @@ -194,24 +213,32 @@ func (env *EnvironmentType) GetHostname() string { } func (env *EnvironmentType) GetSystemUUID() string { - ctx := context.Background() - defer ctx.Done() + res, err, _ := singleflightGroup.Do(GetSystemUUIDKey, func() (interface{}, error) { + var err error + ctx := context.Background() + defer ctx.Done() - if env.IsContainer() { - containerID, err := env.GetContainerID() - if err != nil { - log.Errorf("Unable to read docker container ID: %v", err) - return "" + if env.IsContainer() { + containerID, err := env.GetContainerID() + if err != nil { + return "", fmt.Errorf("unable to read docker container ID: %v", err) + } + return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String(), err } - return uuid.NewMD5(uuid.NameSpaceDNS, []byte(containerID)).String() - } - hostInfo, err := host.InfoWithContext(ctx) + hostID, err := host.HostIDWithContext(ctx) + if err != nil { + log.Warnf("Unable to read host id from dataplane, defaulting value. Error: %v", err) + return "", err + } + return uuid.NewMD5(uuid.Nil, []byte(hostID)).String(), err + }) if err != nil { - log.Infof("Unable to read host id from dataplane, defaulting value. Error: %v", err) + log.Warnf("Unable to set hostname due to %v", err) return "" } - return uuid.NewMD5(uuid.Nil, []byte(hostInfo.HostID)).String() + + return res.(string) } func (env *EnvironmentType) ReadDirectory(dir string, ext string) ([]string, error) { @@ -307,18 +334,25 @@ func (env *EnvironmentType) IsContainer() bool { k8sServiceAcct = "/var/run/secrets/kubernetes.io/serviceaccount" ) - for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { - if _, err := os.Stat(filename); err == nil { - log.Debugf("is a container because (%s) exists", filename) - return true + res, err, _ := singleflightGroup.Do(IsContainerKey, func() (interface{}, error) { + for _, filename := range []string{dockerEnv, containerEnv, k8sServiceAcct} { + if _, err := os.Stat(filename); err == nil { + log.Debugf("Is a container because (%s) exists", filename) + return true, nil + } } - } - // v1 check - if result, err := cGroupV1Check(selfCgroup); err == nil && result { - return result + // v1 check + if result, err := cGroupV1Check(selfCgroup); err == nil && result { + return result, err + } + return false, nil + }) + + if err != nil { + log.Warnf("Unable to retrieve values from cache (%v)", err) } - return false + return res.(bool) } // cGroupV1Check returns if running cgroup v1 @@ -345,20 +379,24 @@ func cGroupV1Check(cgroupFile string) (bool, error) { // GetContainerID returns the ID of the current environment if running in a container func (env *EnvironmentType) GetContainerID() (string, error) { - const mountInfo = "/proc/self/mountinfo" + res, err, _ := singleflightGroup.Do(GetContainerIDKey, func() (interface{}, error) { + const mountInfo = "/proc/self/mountinfo" - if !env.IsContainer() { - return "", errors.New("not in docker") - } + if !env.IsContainer() { + return "", errors.New("not in docker") + } - containerID, err := getContainerID(mountInfo) - if err != nil { - return "", fmt.Errorf("could not get container ID: %v", err) - } + containerID, err := getContainerID(mountInfo) + if err != nil { + return "", fmt.Errorf("could not get container ID: %v", err) + } + + log.Debugf("Container ID: %s", containerID) - log.Debugf("Container ID: %s", containerID) + return containerID, err + }) - return containerID, err + return res.(string), err } // getContainerID returns the container ID of the current running environment. @@ -428,6 +466,42 @@ func (env *EnvironmentType) DiskDevices() ([]string, error) { } } +func (env *EnvironmentType) Disks() (partitions []*proto.DiskPartition, err error) { + ctx := context.Background() + defer ctx.Done() + parts, err := disk.PartitionsWithContext(ctx, false) + if err != nil { + // return an array of 0 + log.Errorf("Could not read disk partitions for host: %v", err) + return []*proto.DiskPartition{}, err + } + for _, part := range parts { + pm := proto.DiskPartition{ + MountPoint: part.Mountpoint, + Device: part.Device, + FsType: part.Fstype, + } + partitions = append(partitions, &pm) + } + return partitions, nil +} + +func (env *EnvironmentType) DiskUsage(mountPoint string) (*DiskUsage, error) { + ctx := context.Background() + defer ctx.Done() + usage, err := disk.UsageWithContext(ctx, mountPoint) + if err != nil { + return nil, errors.New("unable to obtain disk usage stats") + } + + return &DiskUsage{ + Total: float64(usage.Total), + Used: float64(usage.Used), + Free: float64(usage.Free), + UsedPercentage: float64(usage.UsedPercent), + }, nil +} + func (env *EnvironmentType) GetNetOverflow() (float64, error) { return network.GetNetOverflow() } @@ -795,26 +869,6 @@ func virtualization() (string, string) { return virtualizationSystem, virtualizationRole } -func diskPartitions() (partitions []*proto.DiskPartition) { - ctx := context.Background() - defer ctx.Done() - parts, err := disk.PartitionsWithContext(ctx, false) - if err != nil { - // return an array of 0 - log.Errorf("Could not read disk partitions for host: %v", err) - return []*proto.DiskPartition{} - } - for _, part := range parts { - pm := proto.DiskPartition{ - MountPoint: part.Mountpoint, - Device: part.Device, - FsType: part.Fstype, - } - partitions = append(partitions, &pm) - } - return partitions -} - func releaseInfo(osReleaseFile string) (release *proto.ReleaseInfo) { hostReleaseInfo := getHostReleaseInfo() osRelease, err := getOsRelease(osReleaseFile) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/system.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/system.go index 0aa1060b3..a92c4a7b8 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/system.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/collectors/system.go @@ -40,7 +40,7 @@ func NewSystemCollector(env core.Environment, conf *config.Config) *SystemCollec systemSources = []metrics.Source{ sources.NewVirtualMemorySource(sources.SystemNamespace, env), sources.NewCPUTimesSource(sources.SystemNamespace, env), - sources.NewDiskSource(sources.SystemNamespace), + sources.NewDiskSource(sources.SystemNamespace, env), sources.NewDiskIOSource(sources.SystemNamespace, env), sources.NewNetIOSource(sources.SystemNamespace, env), sources.NewLoadSource(sources.SystemNamespace), diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go index 29b788953..472133097 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/metrics/sources/disk.go @@ -13,8 +13,8 @@ import ( "sync" "github.com/nginx/agent/sdk/v2/proto" + "github.com/nginx/agent/v2/src/core" "github.com/nginx/agent/v2/src/core/metrics" - "github.com/shirou/gopsutil/v3/disk" ) const MOUNT_POINT = "mount_point" @@ -22,25 +22,24 @@ const MOUNT_POINT = "mount_point" type Disk struct { logger *MetricSourceLogger *namedMetric - disks []disk.PartitionStat + disks []*proto.DiskPartition + env core.Environment } -func NewDiskSource(namespace string) *Disk { - ctx := context.Background() - defer ctx.Done() - disks, _ := disk.PartitionsWithContext(ctx, false) - return &Disk{NewMetricSourceLogger(), &namedMetric{namespace, "disk"}, disks} +func NewDiskSource(namespace string, env core.Environment) *Disk { + disks, _ := env.Disks() + return &Disk{NewMetricSourceLogger(), &namedMetric{namespace, "disk"}, disks, env} } func (c *Disk) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metrics.StatsEntityWrapper) { defer wg.Done() for _, part := range c.disks { - if part.Device == "" || part.Fstype == "" { + if part.Device == "" || part.FsType == "" { continue } - usage, err := disk.UsageWithContext(ctx, part.Mountpoint) + usage, err := c.env.DiskUsage(part.MountPoint) if err != nil { - c.logger.Log(fmt.Sprintf("Failed to get disk metrics for mount point %s, %v", part.Mountpoint, err)) + c.logger.Log(fmt.Sprintf("Failed to get disk metrics for mount point %s, %v", part.MountPoint, err)) continue } @@ -48,14 +47,14 @@ func (c *Disk) Collect(ctx context.Context, wg *sync.WaitGroup, m chan<- *metric "total": float64(usage.Total), "used": float64(usage.Used), "free": float64(usage.Free), - "in_use": float64(usage.UsedPercent), + "in_use": float64(usage.UsedPercentage), }) select { case <-ctx.Done(): return // mount point is not a common dim - case m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{{Name: MOUNT_POINT, Value: part.Mountpoint}}, simpleMetrics, proto.MetricsReport_SYSTEM): + case m <- metrics.NewStatsEntityWrapper([]*proto.Dimension{{Name: MOUNT_POINT, Value: part.MountPoint}}, simpleMetrics, proto.MetricsReport_SYSTEM): } } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 04dcc944e..f29f410ee 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -131,7 +131,6 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []*Proc n.nginxWorkersMap[nginxDetails.GetNginxId()] = append(n.nginxWorkersMap[nginxDetails.GetNginxId()], nginxDetails) } } - } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index d1aef6ae7..d9f14f549 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -210,7 +210,7 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) hostInfo(send bool) (info *proto.HostInfo) { // this sets send if we are forcing details, or it has been 24 hours since the last send - hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, true) + hostInfo := dps.env.NewHostInfo(dps.version, dps.tags, dps.configDirs, send) if !send && cmp.Equal(dps.envHostInfo, hostInfo) { return nil } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/environment.go b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/environment.go index da6787a24..2039a2ce2 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/test/utils/environment.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/test/utils/environment.go @@ -12,6 +12,31 @@ type MockEnvironment struct { mock.Mock } +// Disks implements core.Environment. +func (*MockEnvironment) Disks() ([]*proto.DiskPartition, error) { + return []*proto.DiskPartition{ + { + Device: "sd01", + MountPoint: "/sd01", + FsType: "ext4", + }, + { + Device: "sd02", + MountPoint: "/sd02", + FsType: "ext4", + }, + }, nil +} + +func (*MockEnvironment) DiskUsage(mountpoint string) (*core.DiskUsage, error) { + return &core.DiskUsage{ + Total: 20, + Used: 10, + Free: 10, + UsedPercentage: 100, + }, nil +} + func GetProcesses() []*core.Process { return []*core.Process{ {Pid: 1, Name: "12345", IsMaster: true}, diff --git a/test/performance/vendor/golang.org/x/sync/singleflight/singleflight.go b/test/performance/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 000000000..8473fb792 --- /dev/null +++ b/test/performance/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,205 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + g.mu.Lock() + defer g.mu.Unlock() + c.wg.Done() + if g.m[key] == c { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/test/performance/vendor/modules.txt b/test/performance/vendor/modules.txt index db5d0da4a..f13622221 100644 --- a/test/performance/vendor/modules.txt +++ b/test/performance/vendor/modules.txt @@ -338,6 +338,7 @@ golang.org/x/net/trace # golang.org/x/sync v0.3.0 ## explicit; go 1.17 golang.org/x/sync/errgroup +golang.org/x/sync/singleflight # golang.org/x/sys v0.11.0 ## explicit; go 1.17 golang.org/x/sys/cpu diff --git a/test/utils/environment.go b/test/utils/environment.go index da6787a24..2039a2ce2 100644 --- a/test/utils/environment.go +++ b/test/utils/environment.go @@ -12,6 +12,31 @@ type MockEnvironment struct { mock.Mock } +// Disks implements core.Environment. +func (*MockEnvironment) Disks() ([]*proto.DiskPartition, error) { + return []*proto.DiskPartition{ + { + Device: "sd01", + MountPoint: "/sd01", + FsType: "ext4", + }, + { + Device: "sd02", + MountPoint: "/sd02", + FsType: "ext4", + }, + }, nil +} + +func (*MockEnvironment) DiskUsage(mountpoint string) (*core.DiskUsage, error) { + return &core.DiskUsage{ + Total: 20, + Used: 10, + Free: 10, + UsedPercentage: 100, + }, nil +} + func GetProcesses() []*core.Process { return []*core.Process{ {Pid: 1, Name: "12345", IsMaster: true}, diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 000000000..8473fb792 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,205 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + g.mu.Lock() + defer g.mu.Unlock() + c.wg.Done() + if g.m[key] == c { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f73f4696f..02d4349e8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1635,6 +1635,7 @@ golang.org/x/net/trace ## explicit; go 1.17 golang.org/x/sync/errgroup golang.org/x/sync/semaphore +golang.org/x/sync/singleflight # golang.org/x/sys v0.11.0 ## explicit; go 1.17 golang.org/x/sys/cpu