From b710b2116ead71e354ba986582dcdcdd3cb762ac Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 4 Nov 2022 11:24:23 +0000 Subject: [PATCH 1/3] fix race conditions --- src/core/nginx.go | 2 ++ src/plugins/dataplane_status.go | 7 +++++++ src/plugins/file_watcher.go | 1 - src/plugins/metrics.go | 7 +++++++ .../vendor/github.com/nginx/agent/v2/src/core/nginx.go | 2 ++ .../nginx/agent/v2/src/plugins/dataplane_status.go | 7 +++++++ .../github.com/nginx/agent/v2/src/plugins/file_watcher.go | 1 - .../github.com/nginx/agent/v2/src/plugins/metrics.go | 7 +++++++ 8 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/core/nginx.go b/src/core/nginx.go index 984847555..d474d4769 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -114,6 +114,8 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []Proce } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { + n.mx.Lock() + defer n.mx.Unlock() return n.nginxWorkersMap } diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index 8a8eda177..da5eece60 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -3,6 +3,7 @@ package plugins import ( "context" "fmt" + "sync" "time" "github.com/google/go-cmp/cmp" @@ -32,6 +33,7 @@ type DataPlaneStatus struct { reportInterval time.Duration napDetails *proto.DataplaneSoftwareDetails_AppProtectWafDetails agentActivityStatuses []*proto.AgentActivityStatus + mu sync.Mutex } const ( @@ -57,6 +59,7 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core configDirs: config.ConfigDirs, statusUrls: make(map[string]string), reportInterval: config.Dataplane.Status.ReportInterval, + mu: sync.Mutex{}, // Intentionally empty as it will be set later napDetails: nil, } @@ -197,6 +200,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) dataplaneSoftwareDetails() []*proto.DataplaneSoftwareDetails { allDetails := make([]*proto.DataplaneSoftwareDetails, 0) + dps.mu.Lock() + defer dps.mu.Unlock() if dps.napDetails != nil { napDetails := &proto.DataplaneSoftwareDetails{ Data: dps.napDetails, @@ -330,7 +335,9 @@ func (dps *DataPlaneStatus) syncNAPDetails(msg *core.Message) { switch commandData := msg.Data().(type) { case *proto.DataplaneSoftwareDetails_AppProtectWafDetails: log.Debugf("DataPlaneStatus is syncing with NAP details - %+v", commandData.AppProtectWafDetails) + dps.mu.Lock() dps.napDetails = commandData + dps.mu.Unlock() default: log.Errorf("Expected the type %T but got %T", &proto.DataplaneSoftwareDetails_AppProtectWafDetails{}, commandData) } diff --git a/src/plugins/file_watcher.go b/src/plugins/file_watcher.go index d443aae63..3e70d6c4c 100644 --- a/src/plugins/file_watcher.go +++ b/src/plugins/file_watcher.go @@ -94,7 +94,6 @@ func (fw *FileWatcher) Info() *core.Info { func (fw *FileWatcher) Close() { log.Info("File Watcher is wrapping up") fw.watcher.Close() - fw.watching = &sync.Map{} } func (fw *FileWatcher) Process(message *core.Message) { diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index 0a9fa0a14..efd7af987 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -32,6 +32,7 @@ type Metrics struct { wg sync.WaitGroup mu sync.Mutex mu2 sync.Mutex + mu3 sync.RWMutex env core.Environment conf *config.Config binary core.NginxBinary @@ -52,6 +53,7 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi wg: sync.WaitGroup{}, mu: sync.Mutex{}, mu2: sync.Mutex{}, + mu3: sync.RWMutex{}, env: env, conf: config, binary: binary, @@ -121,6 +123,8 @@ func (m *Metrics) Process(msg *core.Message) { m.mu2.Unlock() stoppedCollectorIndex := -1 + + m.mu3.RLock() for index, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { for _, nginxId := range collectorsToStop { @@ -133,6 +137,7 @@ func (m *Metrics) Process(msg *core.Message) { } } } + m.mu3.RUnlock() if stoppedCollectorIndex >= 0 { m.collectors = append(m.collectors[:stoppedCollectorIndex], m.collectors[stoppedCollectorIndex+1:]...) @@ -265,7 +270,9 @@ func (m *Metrics) registerStatsSources() { m.mu.Lock() defer m.mu.Unlock() + m.mu3.Lock() m.collectors = tempCollectors + m.mu3.Unlock() } func (m *Metrics) syncAgentConfigChange() { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index 984847555..d474d4769 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -114,6 +114,8 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []Proce } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { + n.mx.Lock() + defer n.mx.Unlock() return n.nginxWorkersMap } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index 8a8eda177..805984651 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "time" + "sync" "github.com/google/go-cmp/cmp" "github.com/google/uuid" @@ -32,6 +33,7 @@ type DataPlaneStatus struct { reportInterval time.Duration napDetails *proto.DataplaneSoftwareDetails_AppProtectWafDetails agentActivityStatuses []*proto.AgentActivityStatus + mu sync.Mutex } const ( @@ -57,6 +59,7 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core configDirs: config.ConfigDirs, statusUrls: make(map[string]string), reportInterval: config.Dataplane.Status.ReportInterval, + mu: sync.Mutex{}, // Intentionally empty as it will be set later napDetails: nil, } @@ -197,6 +200,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) dataplaneSoftwareDetails() []*proto.DataplaneSoftwareDetails { allDetails := make([]*proto.DataplaneSoftwareDetails, 0) + dps.mu.Lock() + defer dps.mu.Unlock() if dps.napDetails != nil { napDetails := &proto.DataplaneSoftwareDetails{ Data: dps.napDetails, @@ -330,7 +335,9 @@ func (dps *DataPlaneStatus) syncNAPDetails(msg *core.Message) { switch commandData := msg.Data().(type) { case *proto.DataplaneSoftwareDetails_AppProtectWafDetails: log.Debugf("DataPlaneStatus is syncing with NAP details - %+v", commandData.AppProtectWafDetails) + dps.mu.Lock() dps.napDetails = commandData + dps.mu.Unlock() default: log.Errorf("Expected the type %T but got %T", &proto.DataplaneSoftwareDetails_AppProtectWafDetails{}, commandData) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/file_watcher.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/file_watcher.go index d443aae63..3e70d6c4c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/file_watcher.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/file_watcher.go @@ -94,7 +94,6 @@ func (fw *FileWatcher) Info() *core.Info { func (fw *FileWatcher) Close() { log.Info("File Watcher is wrapping up") fw.watcher.Close() - fw.watching = &sync.Map{} } func (fw *FileWatcher) Process(message *core.Message) { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 0a9fa0a14..00f9370be 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -32,6 +32,7 @@ type Metrics struct { wg sync.WaitGroup mu sync.Mutex mu2 sync.Mutex + mu3 sync.RWMutex env core.Environment conf *config.Config binary core.NginxBinary @@ -52,6 +53,7 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi wg: sync.WaitGroup{}, mu: sync.Mutex{}, mu2: sync.Mutex{}, + mu3: sync.RWMutex{}, env: env, conf: config, binary: binary, @@ -121,6 +123,8 @@ func (m *Metrics) Process(msg *core.Message) { m.mu2.Unlock() stoppedCollectorIndex := -1 + + m.mu3.RLock() for index, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { for _, nginxId := range collectorsToStop { @@ -133,6 +137,7 @@ func (m *Metrics) Process(msg *core.Message) { } } } + m.mu3.RUnlock() if stoppedCollectorIndex >= 0 { m.collectors = append(m.collectors[:stoppedCollectorIndex], m.collectors[stoppedCollectorIndex+1:]...) @@ -265,7 +270,9 @@ func (m *Metrics) registerStatsSources() { m.mu.Lock() defer m.mu.Unlock() + m.mu3.Lock() m.collectors = tempCollectors + m.mu3.Unlock() } func (m *Metrics) syncAgentConfigChange() { From cf6238839e6c9577b37ae219b1a94fda42f67b9c Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Mon, 7 Nov 2022 11:14:37 +0000 Subject: [PATCH 2/3] fix race conditions --- src/core/payloads/register_software_details.go | 2 +- src/plugins/dataplane_status.go | 8 ++++---- .../v2/src/core/payloads/register_software_details.go | 2 +- .../nginx/agent/v2/src/plugins/dataplane_status.go | 10 +++++----- .../github.com/nginx/agent/v2/src/plugins/metrics.go | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/core/payloads/register_software_details.go b/src/core/payloads/register_software_details.go index 71706dd4c..13ad39446 100644 --- a/src/core/payloads/register_software_details.go +++ b/src/core/payloads/register_software_details.go @@ -25,6 +25,6 @@ func NewRegisterWithDataplaneSoftwareDetailsPayload(details map[string]*proto.Da // the dataplane software details map object that has been sent as part of the payload. func (p *RegisterWithDataplaneSoftwareDetailsPayload) AddDataplaneSoftwareDetails(pluginName string, details *proto.DataplaneSoftwareDetails) { p.mutex.Lock() - defer p.mutex.Unlock() p.dataplaneSoftwareDetails[pluginName] = details + p.mutex.Unlock() } diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index da5eece60..64f9ab3ae 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -33,7 +33,7 @@ type DataPlaneStatus struct { reportInterval time.Duration napDetails *proto.DataplaneSoftwareDetails_AppProtectWafDetails agentActivityStatuses []*proto.AgentActivityStatus - mu sync.Mutex + mu sync.RWMutex } const ( @@ -59,7 +59,7 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core configDirs: config.ConfigDirs, statusUrls: make(map[string]string), reportInterval: config.Dataplane.Status.ReportInterval, - mu: sync.Mutex{}, + mu: sync.RWMutex{}, // Intentionally empty as it will be set later napDetails: nil, } @@ -200,8 +200,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) dataplaneSoftwareDetails() []*proto.DataplaneSoftwareDetails { allDetails := make([]*proto.DataplaneSoftwareDetails, 0) - dps.mu.Lock() - defer dps.mu.Unlock() + dps.mu.RLock() + defer dps.mu.RUnlock() if dps.napDetails != nil { napDetails := &proto.DataplaneSoftwareDetails{ Data: dps.napDetails, diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/payloads/register_software_details.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/payloads/register_software_details.go index 71706dd4c..13ad39446 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/payloads/register_software_details.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/payloads/register_software_details.go @@ -25,6 +25,6 @@ func NewRegisterWithDataplaneSoftwareDetailsPayload(details map[string]*proto.Da // the dataplane software details map object that has been sent as part of the payload. func (p *RegisterWithDataplaneSoftwareDetailsPayload) AddDataplaneSoftwareDetails(pluginName string, details *proto.DataplaneSoftwareDetails) { p.mutex.Lock() - defer p.mutex.Unlock() p.dataplaneSoftwareDetails[pluginName] = details + p.mutex.Unlock() } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index 805984651..64f9ab3ae 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -3,8 +3,8 @@ package plugins import ( "context" "fmt" - "time" "sync" + "time" "github.com/google/go-cmp/cmp" "github.com/google/uuid" @@ -33,7 +33,7 @@ type DataPlaneStatus struct { reportInterval time.Duration napDetails *proto.DataplaneSoftwareDetails_AppProtectWafDetails agentActivityStatuses []*proto.AgentActivityStatus - mu sync.Mutex + mu sync.RWMutex } const ( @@ -59,7 +59,7 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core configDirs: config.ConfigDirs, statusUrls: make(map[string]string), reportInterval: config.Dataplane.Status.ReportInterval, - mu: sync.Mutex{}, + mu: sync.RWMutex{}, // Intentionally empty as it will be set later napDetails: nil, } @@ -200,8 +200,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) dataplaneSoftwareDetails() []*proto.DataplaneSoftwareDetails { allDetails := make([]*proto.DataplaneSoftwareDetails, 0) - dps.mu.Lock() - defer dps.mu.Unlock() + dps.mu.RLock() + defer dps.mu.RUnlock() if dps.napDetails != nil { napDetails := &proto.DataplaneSoftwareDetails{ Data: dps.napDetails, diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index 00f9370be..efd7af987 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -32,7 +32,7 @@ type Metrics struct { wg sync.WaitGroup mu sync.Mutex mu2 sync.Mutex - mu3 sync.RWMutex + mu3 sync.RWMutex env core.Environment conf *config.Config binary core.NginxBinary @@ -53,7 +53,7 @@ func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBi wg: sync.WaitGroup{}, mu: sync.Mutex{}, mu2: sync.Mutex{}, - mu3: sync.RWMutex{}, + mu3: sync.RWMutex{}, env: env, conf: config, binary: binary, From 24da22bc8d8b261f860967ae93231784dae5d5a0 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Mon, 7 Nov 2022 15:08:12 +0000 Subject: [PATCH 3/3] change naming of mutex --- src/core/nginx.go | 22 +++-- .../payloads/register_software_details.go | 8 +- src/plugins/dataplane_status.go | 34 +++---- src/plugins/metrics.go | 90 +++++++++---------- .../nginx/agent/v2/src/core/nginx.go | 22 +++-- .../payloads/register_software_details.go | 8 +- .../agent/v2/src/plugins/dataplane_status.go | 34 +++---- .../nginx/agent/v2/src/plugins/metrics.go | 90 +++++++++---------- 8 files changed, 154 insertions(+), 154 deletions(-) diff --git a/src/core/nginx.go b/src/core/nginx.go index d474d4769..2b11d26a9 100644 --- a/src/core/nginx.go +++ b/src/core/nginx.go @@ -53,7 +53,8 @@ type NginxBinary interface { } type NginxBinaryType struct { - mx sync.Mutex + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex env Environment config *config.Config nginxDetailsMap map[string]*proto.NginxDetails @@ -92,15 +93,18 @@ func NewNginxBinary(env Environment, config *config.Config) *NginxBinaryType { } func (n *NginxBinaryType) GetNginxDetailsMapFromProcesses(nginxProcesses []Process) map[string]*proto.NginxDetails { - n.mx.Lock() - defer n.mx.Unlock() + n.detailsMapMutex.Lock() + defer n.detailsMapMutex.Unlock() return n.nginxDetailsMap } func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []Process) { - n.mx.Lock() - defer n.mx.Unlock() + n.detailsMapMutex.Lock() + defer n.detailsMapMutex.Unlock() n.nginxDetailsMap = map[string]*proto.NginxDetails{} + + n.workersMapMutex.Lock() + defer n.workersMapMutex.Unlock() n.nginxWorkersMap = map[string][]*proto.NginxDetails{} for _, process := range nginxProcesses { @@ -114,8 +118,8 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []Proce } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { - n.mx.Lock() - defer n.mx.Unlock() + n.workersMapMutex.Lock() + defer n.workersMapMutex.Unlock() return n.nginxWorkersMap } @@ -143,8 +147,8 @@ func (n *NginxBinaryType) getNginxIDFromProcessInfo(nginxProcess Process, info * } func (n *NginxBinaryType) GetNginxDetailsByID(nginxID string) *proto.NginxDetails { - n.mx.Lock() - defer n.mx.Unlock() + n.detailsMapMutex.Lock() + defer n.detailsMapMutex.Unlock() return n.nginxDetailsMap[nginxID] } diff --git a/src/core/payloads/register_software_details.go b/src/core/payloads/register_software_details.go index 13ad39446..ae434fb81 100644 --- a/src/core/payloads/register_software_details.go +++ b/src/core/payloads/register_software_details.go @@ -9,8 +9,8 @@ import ( // RegisterWithDataplaneSoftwareDetailsPayload is an internal payload meant to be used as // part of registration when there are plugins reporting software details. type RegisterWithDataplaneSoftwareDetailsPayload struct { - dataplaneSoftwareDetails map[string]*proto.DataplaneSoftwareDetails - mutex sync.Mutex + dataplaneSoftwareDetails map[string]*proto.DataplaneSoftwareDetails + dataplaneSoftwareDetailsMutex sync.Mutex } // NewRegisterWithDataplaneSoftwareDetailsPayload returns a pointer to an instance of a @@ -24,7 +24,7 @@ func NewRegisterWithDataplaneSoftwareDetailsPayload(details map[string]*proto.Da // AddDataplaneSoftwareDetails adds the dataplane software details passed into the function to // the dataplane software details map object that has been sent as part of the payload. func (p *RegisterWithDataplaneSoftwareDetailsPayload) AddDataplaneSoftwareDetails(pluginName string, details *proto.DataplaneSoftwareDetails) { - p.mutex.Lock() + p.dataplaneSoftwareDetailsMutex.Lock() p.dataplaneSoftwareDetails[pluginName] = details - p.mutex.Unlock() + p.dataplaneSoftwareDetailsMutex.Unlock() } diff --git a/src/plugins/dataplane_status.go b/src/plugins/dataplane_status.go index 64f9ab3ae..92a5f4b41 100644 --- a/src/plugins/dataplane_status.go +++ b/src/plugins/dataplane_status.go @@ -33,7 +33,7 @@ type DataPlaneStatus struct { reportInterval time.Duration napDetails *proto.DataplaneSoftwareDetails_AppProtectWafDetails agentActivityStatuses []*proto.AgentActivityStatus - mu sync.RWMutex + napDetailsMutex sync.RWMutex } const ( @@ -48,18 +48,18 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, config.Dataplane.Status.PollInterval) } return &DataPlaneStatus{ - sendStatus: make(chan bool), - healthTicker: time.NewTicker(pollInt), - interval: pollInt, - meta: meta, - binary: binary, - env: env, - version: version, - tags: &config.Tags, - configDirs: config.ConfigDirs, - statusUrls: make(map[string]string), - reportInterval: config.Dataplane.Status.ReportInterval, - mu: sync.RWMutex{}, + sendStatus: make(chan bool), + healthTicker: time.NewTicker(pollInt), + interval: pollInt, + meta: meta, + binary: binary, + env: env, + version: version, + tags: &config.Tags, + configDirs: config.ConfigDirs, + statusUrls: make(map[string]string), + reportInterval: config.Dataplane.Status.ReportInterval, + napDetailsMutex: sync.RWMutex{}, // Intentionally empty as it will be set later napDetails: nil, } @@ -200,8 +200,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) dataplaneSoftwareDetails() []*proto.DataplaneSoftwareDetails { allDetails := make([]*proto.DataplaneSoftwareDetails, 0) - dps.mu.RLock() - defer dps.mu.RUnlock() + dps.napDetailsMutex.RLock() + defer dps.napDetailsMutex.RUnlock() if dps.napDetails != nil { napDetails := &proto.DataplaneSoftwareDetails{ Data: dps.napDetails, @@ -335,9 +335,9 @@ func (dps *DataPlaneStatus) syncNAPDetails(msg *core.Message) { switch commandData := msg.Data().(type) { case *proto.DataplaneSoftwareDetails_AppProtectWafDetails: log.Debugf("DataPlaneStatus is syncing with NAP details - %+v", commandData.AppProtectWafDetails) - dps.mu.Lock() + dps.napDetailsMutex.Lock() dps.napDetails = commandData - dps.mu.Unlock() + dps.napDetailsMutex.Unlock() default: log.Errorf("Expected the type %T but got %T", &proto.DataplaneSoftwareDetails_AppProtectWafDetails{}, commandData) } diff --git a/src/plugins/metrics.go b/src/plugins/metrics.go index efd7af987..250e0cbd9 100644 --- a/src/plugins/metrics.go +++ b/src/plugins/metrics.go @@ -18,45 +18,43 @@ import ( ) type Metrics struct { - pipeline core.MessagePipeInterface - registrationComplete *atomic.Bool - collectorsReady *atomic.Bool - collectorsUpdate *atomic.Bool - ticker *time.Ticker - interval time.Duration - collectors []metrics.Collector - buf chan *proto.StatsEntity - errors chan error - collectorConfigsMap map[string]*metrics.NginxCollectorConfig - ctx context.Context - wg sync.WaitGroup - mu sync.Mutex - mu2 sync.Mutex - mu3 sync.RWMutex - env core.Environment - conf *config.Config - binary core.NginxBinary + pipeline core.MessagePipeInterface + registrationComplete *atomic.Bool + collectorsReady *atomic.Bool + collectorsUpdate *atomic.Bool + ticker *time.Ticker + interval time.Duration + collectors []metrics.Collector + buf chan *proto.StatsEntity + errors chan error + collectorConfigsMap map[string]*metrics.NginxCollectorConfig + ctx context.Context + wg sync.WaitGroup + collectorsMutex sync.RWMutex + collectorConfigsMapMutex sync.Mutex + env core.Environment + conf *config.Config + binary core.NginxBinary } func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics { collectorConfigsMap := createCollectorConfigsMap(config, env, binary) return &Metrics{ - registrationComplete: atomic.NewBool(false), - collectorsReady: atomic.NewBool(false), - collectorsUpdate: atomic.NewBool(false), - ticker: time.NewTicker(config.AgentMetrics.CollectionInterval), - interval: config.AgentMetrics.CollectionInterval, - buf: make(chan *proto.StatsEntity, 4096), - errors: make(chan error), - collectorConfigsMap: collectorConfigsMap, - wg: sync.WaitGroup{}, - mu: sync.Mutex{}, - mu2: sync.Mutex{}, - mu3: sync.RWMutex{}, - env: env, - conf: config, - binary: binary, + registrationComplete: atomic.NewBool(false), + collectorsReady: atomic.NewBool(false), + collectorsUpdate: atomic.NewBool(false), + ticker: time.NewTicker(config.AgentMetrics.CollectionInterval), + interval: config.AgentMetrics.CollectionInterval, + buf: make(chan *proto.StatsEntity, 4096), + errors: make(chan error), + collectorConfigsMap: collectorConfigsMap, + wg: sync.WaitGroup{}, + collectorsMutex: sync.RWMutex{}, + collectorConfigsMapMutex: sync.Mutex{}, + env: env, + conf: config, + binary: binary, } } @@ -82,9 +80,9 @@ func (m *Metrics) Process(msg *core.Message) { // If the agent config on disk changed or the NGINX statusAPI was updated // Then update Metrics with relevant config info collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary) - m.mu2.Lock() + m.collectorConfigsMapMutex.Lock() m.collectorConfigsMap = collectorConfigsMap - m.mu2.Unlock() + m.collectorConfigsMapMutex.Unlock() m.syncAgentConfigChange() m.updateCollectorsConfig() @@ -118,13 +116,13 @@ func (m *Metrics) Process(msg *core.Message) { } } - m.mu2.Lock() + m.collectorConfigsMapMutex.Lock() m.collectorConfigsMap = collectorConfigsMap - m.mu2.Unlock() + m.collectorConfigsMapMutex.Unlock() stoppedCollectorIndex := -1 - m.mu3.RLock() + m.collectorsMutex.RLock() for index, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { for _, nginxId := range collectorsToStop { @@ -137,7 +135,7 @@ func (m *Metrics) Process(msg *core.Message) { } } } - m.mu3.RUnlock() + m.collectorsMutex.RUnlock() if stoppedCollectorIndex >= 0 { m.collectors = append(m.collectors[:stoppedCollectorIndex], m.collectors[stoppedCollectorIndex+1:]...) @@ -205,8 +203,8 @@ func (m *Metrics) collectStats() (stats []*proto.StatsEntity) { // locks the m.collectors to make sure it doesn't get deleted in the middle // of collection, as we will delete the old one if config changes. // maybe we can fine tune the lock later, but the collection has been very quick so far. - m.mu.Lock() - defer m.mu.Unlock() + m.collectorsMutex.Lock() + defer m.collectorsMutex.Unlock() wg := &sync.WaitGroup{} start := time.Now() for _, s := range m.collectors { @@ -250,14 +248,14 @@ func (m *Metrics) registerStatsSources() { } hasNginxCollector := false - m.mu2.Lock() + m.collectorConfigsMapMutex.Lock() for key := range m.collectorConfigsMap { tempCollectors = append(tempCollectors, collectors.NewNginxCollector(m.conf, m.env, m.collectorConfigsMap[key], m.binary), ) hasNginxCollector = true } - m.mu2.Unlock() + m.collectorConfigsMapMutex.Unlock() // if NGINX is not running/detected, still run the static collector to output nginx.status = 0. if !hasNginxCollector { @@ -268,11 +266,9 @@ func (m *Metrics) registerStatsSources() { ) } - m.mu.Lock() - defer m.mu.Unlock() - m.mu3.Lock() + m.collectorsMutex.Lock() m.collectors = tempCollectors - m.mu3.Unlock() + m.collectorsMutex.Unlock() } func (m *Metrics) syncAgentConfigChange() { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go index d474d4769..2b11d26a9 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/nginx.go @@ -53,7 +53,8 @@ type NginxBinary interface { } type NginxBinaryType struct { - mx sync.Mutex + detailsMapMutex sync.Mutex + workersMapMutex sync.Mutex env Environment config *config.Config nginxDetailsMap map[string]*proto.NginxDetails @@ -92,15 +93,18 @@ func NewNginxBinary(env Environment, config *config.Config) *NginxBinaryType { } func (n *NginxBinaryType) GetNginxDetailsMapFromProcesses(nginxProcesses []Process) map[string]*proto.NginxDetails { - n.mx.Lock() - defer n.mx.Unlock() + n.detailsMapMutex.Lock() + defer n.detailsMapMutex.Unlock() return n.nginxDetailsMap } func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []Process) { - n.mx.Lock() - defer n.mx.Unlock() + n.detailsMapMutex.Lock() + defer n.detailsMapMutex.Unlock() n.nginxDetailsMap = map[string]*proto.NginxDetails{} + + n.workersMapMutex.Lock() + defer n.workersMapMutex.Unlock() n.nginxWorkersMap = map[string][]*proto.NginxDetails{} for _, process := range nginxProcesses { @@ -114,8 +118,8 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []Proce } func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails { - n.mx.Lock() - defer n.mx.Unlock() + n.workersMapMutex.Lock() + defer n.workersMapMutex.Unlock() return n.nginxWorkersMap } @@ -143,8 +147,8 @@ func (n *NginxBinaryType) getNginxIDFromProcessInfo(nginxProcess Process, info * } func (n *NginxBinaryType) GetNginxDetailsByID(nginxID string) *proto.NginxDetails { - n.mx.Lock() - defer n.mx.Unlock() + n.detailsMapMutex.Lock() + defer n.detailsMapMutex.Unlock() return n.nginxDetailsMap[nginxID] } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/payloads/register_software_details.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/payloads/register_software_details.go index 13ad39446..ae434fb81 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/payloads/register_software_details.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/payloads/register_software_details.go @@ -9,8 +9,8 @@ import ( // RegisterWithDataplaneSoftwareDetailsPayload is an internal payload meant to be used as // part of registration when there are plugins reporting software details. type RegisterWithDataplaneSoftwareDetailsPayload struct { - dataplaneSoftwareDetails map[string]*proto.DataplaneSoftwareDetails - mutex sync.Mutex + dataplaneSoftwareDetails map[string]*proto.DataplaneSoftwareDetails + dataplaneSoftwareDetailsMutex sync.Mutex } // NewRegisterWithDataplaneSoftwareDetailsPayload returns a pointer to an instance of a @@ -24,7 +24,7 @@ func NewRegisterWithDataplaneSoftwareDetailsPayload(details map[string]*proto.Da // AddDataplaneSoftwareDetails adds the dataplane software details passed into the function to // the dataplane software details map object that has been sent as part of the payload. func (p *RegisterWithDataplaneSoftwareDetailsPayload) AddDataplaneSoftwareDetails(pluginName string, details *proto.DataplaneSoftwareDetails) { - p.mutex.Lock() + p.dataplaneSoftwareDetailsMutex.Lock() p.dataplaneSoftwareDetails[pluginName] = details - p.mutex.Unlock() + p.dataplaneSoftwareDetailsMutex.Unlock() } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go index 64f9ab3ae..92a5f4b41 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/dataplane_status.go @@ -33,7 +33,7 @@ type DataPlaneStatus struct { reportInterval time.Duration napDetails *proto.DataplaneSoftwareDetails_AppProtectWafDetails agentActivityStatuses []*proto.AgentActivityStatus - mu sync.RWMutex + napDetailsMutex sync.RWMutex } const ( @@ -48,18 +48,18 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, config.Dataplane.Status.PollInterval) } return &DataPlaneStatus{ - sendStatus: make(chan bool), - healthTicker: time.NewTicker(pollInt), - interval: pollInt, - meta: meta, - binary: binary, - env: env, - version: version, - tags: &config.Tags, - configDirs: config.ConfigDirs, - statusUrls: make(map[string]string), - reportInterval: config.Dataplane.Status.ReportInterval, - mu: sync.RWMutex{}, + sendStatus: make(chan bool), + healthTicker: time.NewTicker(pollInt), + interval: pollInt, + meta: meta, + binary: binary, + env: env, + version: version, + tags: &config.Tags, + configDirs: config.ConfigDirs, + statusUrls: make(map[string]string), + reportInterval: config.Dataplane.Status.ReportInterval, + napDetailsMutex: sync.RWMutex{}, // Intentionally empty as it will be set later napDetails: nil, } @@ -200,8 +200,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS func (dps *DataPlaneStatus) dataplaneSoftwareDetails() []*proto.DataplaneSoftwareDetails { allDetails := make([]*proto.DataplaneSoftwareDetails, 0) - dps.mu.RLock() - defer dps.mu.RUnlock() + dps.napDetailsMutex.RLock() + defer dps.napDetailsMutex.RUnlock() if dps.napDetails != nil { napDetails := &proto.DataplaneSoftwareDetails{ Data: dps.napDetails, @@ -335,9 +335,9 @@ func (dps *DataPlaneStatus) syncNAPDetails(msg *core.Message) { switch commandData := msg.Data().(type) { case *proto.DataplaneSoftwareDetails_AppProtectWafDetails: log.Debugf("DataPlaneStatus is syncing with NAP details - %+v", commandData.AppProtectWafDetails) - dps.mu.Lock() + dps.napDetailsMutex.Lock() dps.napDetails = commandData - dps.mu.Unlock() + dps.napDetailsMutex.Unlock() default: log.Errorf("Expected the type %T but got %T", &proto.DataplaneSoftwareDetails_AppProtectWafDetails{}, commandData) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go index efd7af987..250e0cbd9 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics.go @@ -18,45 +18,43 @@ import ( ) type Metrics struct { - pipeline core.MessagePipeInterface - registrationComplete *atomic.Bool - collectorsReady *atomic.Bool - collectorsUpdate *atomic.Bool - ticker *time.Ticker - interval time.Duration - collectors []metrics.Collector - buf chan *proto.StatsEntity - errors chan error - collectorConfigsMap map[string]*metrics.NginxCollectorConfig - ctx context.Context - wg sync.WaitGroup - mu sync.Mutex - mu2 sync.Mutex - mu3 sync.RWMutex - env core.Environment - conf *config.Config - binary core.NginxBinary + pipeline core.MessagePipeInterface + registrationComplete *atomic.Bool + collectorsReady *atomic.Bool + collectorsUpdate *atomic.Bool + ticker *time.Ticker + interval time.Duration + collectors []metrics.Collector + buf chan *proto.StatsEntity + errors chan error + collectorConfigsMap map[string]*metrics.NginxCollectorConfig + ctx context.Context + wg sync.WaitGroup + collectorsMutex sync.RWMutex + collectorConfigsMapMutex sync.Mutex + env core.Environment + conf *config.Config + binary core.NginxBinary } func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics { collectorConfigsMap := createCollectorConfigsMap(config, env, binary) return &Metrics{ - registrationComplete: atomic.NewBool(false), - collectorsReady: atomic.NewBool(false), - collectorsUpdate: atomic.NewBool(false), - ticker: time.NewTicker(config.AgentMetrics.CollectionInterval), - interval: config.AgentMetrics.CollectionInterval, - buf: make(chan *proto.StatsEntity, 4096), - errors: make(chan error), - collectorConfigsMap: collectorConfigsMap, - wg: sync.WaitGroup{}, - mu: sync.Mutex{}, - mu2: sync.Mutex{}, - mu3: sync.RWMutex{}, - env: env, - conf: config, - binary: binary, + registrationComplete: atomic.NewBool(false), + collectorsReady: atomic.NewBool(false), + collectorsUpdate: atomic.NewBool(false), + ticker: time.NewTicker(config.AgentMetrics.CollectionInterval), + interval: config.AgentMetrics.CollectionInterval, + buf: make(chan *proto.StatsEntity, 4096), + errors: make(chan error), + collectorConfigsMap: collectorConfigsMap, + wg: sync.WaitGroup{}, + collectorsMutex: sync.RWMutex{}, + collectorConfigsMapMutex: sync.Mutex{}, + env: env, + conf: config, + binary: binary, } } @@ -82,9 +80,9 @@ func (m *Metrics) Process(msg *core.Message) { // If the agent config on disk changed or the NGINX statusAPI was updated // Then update Metrics with relevant config info collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary) - m.mu2.Lock() + m.collectorConfigsMapMutex.Lock() m.collectorConfigsMap = collectorConfigsMap - m.mu2.Unlock() + m.collectorConfigsMapMutex.Unlock() m.syncAgentConfigChange() m.updateCollectorsConfig() @@ -118,13 +116,13 @@ func (m *Metrics) Process(msg *core.Message) { } } - m.mu2.Lock() + m.collectorConfigsMapMutex.Lock() m.collectorConfigsMap = collectorConfigsMap - m.mu2.Unlock() + m.collectorConfigsMapMutex.Unlock() stoppedCollectorIndex := -1 - m.mu3.RLock() + m.collectorsMutex.RLock() for index, collector := range m.collectors { if nginxCollector, ok := collector.(*collectors.NginxCollector); ok { for _, nginxId := range collectorsToStop { @@ -137,7 +135,7 @@ func (m *Metrics) Process(msg *core.Message) { } } } - m.mu3.RUnlock() + m.collectorsMutex.RUnlock() if stoppedCollectorIndex >= 0 { m.collectors = append(m.collectors[:stoppedCollectorIndex], m.collectors[stoppedCollectorIndex+1:]...) @@ -205,8 +203,8 @@ func (m *Metrics) collectStats() (stats []*proto.StatsEntity) { // locks the m.collectors to make sure it doesn't get deleted in the middle // of collection, as we will delete the old one if config changes. // maybe we can fine tune the lock later, but the collection has been very quick so far. - m.mu.Lock() - defer m.mu.Unlock() + m.collectorsMutex.Lock() + defer m.collectorsMutex.Unlock() wg := &sync.WaitGroup{} start := time.Now() for _, s := range m.collectors { @@ -250,14 +248,14 @@ func (m *Metrics) registerStatsSources() { } hasNginxCollector := false - m.mu2.Lock() + m.collectorConfigsMapMutex.Lock() for key := range m.collectorConfigsMap { tempCollectors = append(tempCollectors, collectors.NewNginxCollector(m.conf, m.env, m.collectorConfigsMap[key], m.binary), ) hasNginxCollector = true } - m.mu2.Unlock() + m.collectorConfigsMapMutex.Unlock() // if NGINX is not running/detected, still run the static collector to output nginx.status = 0. if !hasNginxCollector { @@ -268,11 +266,9 @@ func (m *Metrics) registerStatsSources() { ) } - m.mu.Lock() - defer m.mu.Unlock() - m.mu3.Lock() + m.collectorsMutex.Lock() m.collectors = tempCollectors - m.mu3.Unlock() + m.collectorsMutex.Unlock() } func (m *Metrics) syncAgentConfigChange() {