Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/core/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type NginxBinary interface {
}

type NginxBinaryType struct {
mx sync.Mutex
detailsMapMutex sync.Mutex
workersMapMutex sync.Mutex
env Environment
config *config.Config
nginxDetailsMap map[string]*proto.NginxDetails
Expand Down Expand Up @@ -92,15 +93,18 @@ func NewNginxBinary(env Environment, config *config.Config) *NginxBinaryType {
}

func (n *NginxBinaryType) GetNginxDetailsMapFromProcesses(nginxProcesses []Process) map[string]*proto.NginxDetails {
n.mx.Lock()
defer n.mx.Unlock()
n.detailsMapMutex.Lock()
defer n.detailsMapMutex.Unlock()
return n.nginxDetailsMap
}

func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []Process) {
n.mx.Lock()
defer n.mx.Unlock()
n.detailsMapMutex.Lock()
defer n.detailsMapMutex.Unlock()
n.nginxDetailsMap = map[string]*proto.NginxDetails{}

n.workersMapMutex.Lock()
defer n.workersMapMutex.Unlock()
n.nginxWorkersMap = map[string][]*proto.NginxDetails{}

for _, process := range nginxProcesses {
Expand All @@ -114,6 +118,8 @@ func (n *NginxBinaryType) UpdateNginxDetailsFromProcesses(nginxProcesses []Proce
}

func (n *NginxBinaryType) GetChildProcesses() map[string][]*proto.NginxDetails {
n.workersMapMutex.Lock()
defer n.workersMapMutex.Unlock()
return n.nginxWorkersMap
}

Expand Down Expand Up @@ -141,8 +147,8 @@ func (n *NginxBinaryType) getNginxIDFromProcessInfo(nginxProcess Process, info *
}

func (n *NginxBinaryType) GetNginxDetailsByID(nginxID string) *proto.NginxDetails {
n.mx.Lock()
defer n.mx.Unlock()
n.detailsMapMutex.Lock()
defer n.detailsMapMutex.Unlock()
return n.nginxDetailsMap[nginxID]
}

Expand Down
8 changes: 4 additions & 4 deletions src/core/payloads/register_software_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
// RegisterWithDataplaneSoftwareDetailsPayload is an internal payload meant to be used as
// part of registration when there are plugins reporting software details.
type RegisterWithDataplaneSoftwareDetailsPayload struct {
dataplaneSoftwareDetails map[string]*proto.DataplaneSoftwareDetails
mutex sync.Mutex
dataplaneSoftwareDetails map[string]*proto.DataplaneSoftwareDetails
dataplaneSoftwareDetailsMutex sync.Mutex
}

// NewRegisterWithDataplaneSoftwareDetailsPayload returns a pointer to an instance of a
Expand All @@ -24,7 +24,7 @@ func NewRegisterWithDataplaneSoftwareDetailsPayload(details map[string]*proto.Da
// AddDataplaneSoftwareDetails adds the dataplane software details passed into the function to
// the dataplane software details map object that has been sent as part of the payload.
func (p *RegisterWithDataplaneSoftwareDetailsPayload) AddDataplaneSoftwareDetails(pluginName string, details *proto.DataplaneSoftwareDetails) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.dataplaneSoftwareDetailsMutex.Lock()
p.dataplaneSoftwareDetails[pluginName] = details
p.dataplaneSoftwareDetailsMutex.Unlock()
}
29 changes: 18 additions & 11 deletions src/plugins/dataplane_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package plugins
import (
"context"
"fmt"
"sync"
"time"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -32,6 +33,7 @@ type DataPlaneStatus struct {
reportInterval time.Duration
napDetails *proto.DataplaneSoftwareDetails_AppProtectWafDetails
agentActivityStatuses []*proto.AgentActivityStatus
napDetailsMutex sync.RWMutex
}

const (
Expand All @@ -46,17 +48,18 @@ func NewDataPlaneStatus(config *config.Config, meta *proto.Metadata, binary core
log.Warnf("interval set to %s, provided value (%s) less than minimum", pollInt, config.Dataplane.Status.PollInterval)
}
return &DataPlaneStatus{
sendStatus: make(chan bool),
healthTicker: time.NewTicker(pollInt),
interval: pollInt,
meta: meta,
binary: binary,
env: env,
version: version,
tags: &config.Tags,
configDirs: config.ConfigDirs,
statusUrls: make(map[string]string),
reportInterval: config.Dataplane.Status.ReportInterval,
sendStatus: make(chan bool),
healthTicker: time.NewTicker(pollInt),
interval: pollInt,
meta: meta,
binary: binary,
env: env,
version: version,
tags: &config.Tags,
configDirs: config.ConfigDirs,
statusUrls: make(map[string]string),
reportInterval: config.Dataplane.Status.ReportInterval,
napDetailsMutex: sync.RWMutex{},
// Intentionally empty as it will be set later
napDetails: nil,
}
Expand Down Expand Up @@ -197,6 +200,8 @@ func (dps *DataPlaneStatus) dataplaneStatus(forceDetails bool) *proto.DataplaneS
func (dps *DataPlaneStatus) dataplaneSoftwareDetails() []*proto.DataplaneSoftwareDetails {
allDetails := make([]*proto.DataplaneSoftwareDetails, 0)

dps.napDetailsMutex.RLock()
defer dps.napDetailsMutex.RUnlock()
if dps.napDetails != nil {
napDetails := &proto.DataplaneSoftwareDetails{
Data: dps.napDetails,
Expand Down Expand Up @@ -330,7 +335,9 @@ func (dps *DataPlaneStatus) syncNAPDetails(msg *core.Message) {
switch commandData := msg.Data().(type) {
case *proto.DataplaneSoftwareDetails_AppProtectWafDetails:
log.Debugf("DataPlaneStatus is syncing with NAP details - %+v", commandData.AppProtectWafDetails)
dps.napDetailsMutex.Lock()
dps.napDetails = commandData
dps.napDetailsMutex.Unlock()
default:
log.Errorf("Expected the type %T but got %T", &proto.DataplaneSoftwareDetails_AppProtectWafDetails{}, commandData)
}
Expand Down
1 change: 0 additions & 1 deletion src/plugins/file_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (fw *FileWatcher) Info() *core.Info {
func (fw *FileWatcher) Close() {
log.Info("File Watcher is wrapping up")
fw.watcher.Close()
fw.watching = &sync.Map{}
}

func (fw *FileWatcher) Process(message *core.Message) {
Expand Down
85 changes: 44 additions & 41 deletions src/plugins/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,43 @@ import (
)

type Metrics struct {
pipeline core.MessagePipeInterface
registrationComplete *atomic.Bool
collectorsReady *atomic.Bool
collectorsUpdate *atomic.Bool
ticker *time.Ticker
interval time.Duration
collectors []metrics.Collector
buf chan *proto.StatsEntity
errors chan error
collectorConfigsMap map[string]*metrics.NginxCollectorConfig
ctx context.Context
wg sync.WaitGroup
mu sync.Mutex
mu2 sync.Mutex
env core.Environment
conf *config.Config
binary core.NginxBinary
pipeline core.MessagePipeInterface
registrationComplete *atomic.Bool
collectorsReady *atomic.Bool
collectorsUpdate *atomic.Bool
ticker *time.Ticker
interval time.Duration
collectors []metrics.Collector
buf chan *proto.StatsEntity
errors chan error
collectorConfigsMap map[string]*metrics.NginxCollectorConfig
ctx context.Context
wg sync.WaitGroup
collectorsMutex sync.RWMutex
collectorConfigsMapMutex sync.Mutex
env core.Environment
conf *config.Config
binary core.NginxBinary
}

func NewMetrics(config *config.Config, env core.Environment, binary core.NginxBinary) *Metrics {

collectorConfigsMap := createCollectorConfigsMap(config, env, binary)
return &Metrics{
registrationComplete: atomic.NewBool(false),
collectorsReady: atomic.NewBool(false),
collectorsUpdate: atomic.NewBool(false),
ticker: time.NewTicker(config.AgentMetrics.CollectionInterval),
interval: config.AgentMetrics.CollectionInterval,
buf: make(chan *proto.StatsEntity, 4096),
errors: make(chan error),
collectorConfigsMap: collectorConfigsMap,
wg: sync.WaitGroup{},
mu: sync.Mutex{},
mu2: sync.Mutex{},
env: env,
conf: config,
binary: binary,
registrationComplete: atomic.NewBool(false),
collectorsReady: atomic.NewBool(false),
collectorsUpdate: atomic.NewBool(false),
ticker: time.NewTicker(config.AgentMetrics.CollectionInterval),
interval: config.AgentMetrics.CollectionInterval,
buf: make(chan *proto.StatsEntity, 4096),
errors: make(chan error),
collectorConfigsMap: collectorConfigsMap,
wg: sync.WaitGroup{},
collectorsMutex: sync.RWMutex{},
collectorConfigsMapMutex: sync.Mutex{},
env: env,
conf: config,
binary: binary,
}
}

Expand All @@ -80,9 +80,9 @@ func (m *Metrics) Process(msg *core.Message) {
// If the agent config on disk changed or the NGINX statusAPI was updated
// Then update Metrics with relevant config info
collectorConfigsMap := createCollectorConfigsMap(m.conf, m.env, m.binary)
m.mu2.Lock()
m.collectorConfigsMapMutex.Lock()
m.collectorConfigsMap = collectorConfigsMap
m.mu2.Unlock()
m.collectorConfigsMapMutex.Unlock()

m.syncAgentConfigChange()
m.updateCollectorsConfig()
Expand Down Expand Up @@ -116,11 +116,13 @@ func (m *Metrics) Process(msg *core.Message) {
}
}

m.mu2.Lock()
m.collectorConfigsMapMutex.Lock()
m.collectorConfigsMap = collectorConfigsMap
m.mu2.Unlock()
m.collectorConfigsMapMutex.Unlock()

stoppedCollectorIndex := -1

m.collectorsMutex.RLock()
for index, collector := range m.collectors {
if nginxCollector, ok := collector.(*collectors.NginxCollector); ok {
for _, nginxId := range collectorsToStop {
Expand All @@ -133,6 +135,7 @@ func (m *Metrics) Process(msg *core.Message) {
}
}
}
m.collectorsMutex.RUnlock()

if stoppedCollectorIndex >= 0 {
m.collectors = append(m.collectors[:stoppedCollectorIndex], m.collectors[stoppedCollectorIndex+1:]...)
Expand Down Expand Up @@ -200,8 +203,8 @@ func (m *Metrics) collectStats() (stats []*proto.StatsEntity) {
// locks the m.collectors to make sure it doesn't get deleted in the middle
// of collection, as we will delete the old one if config changes.
// maybe we can fine tune the lock later, but the collection has been very quick so far.
m.mu.Lock()
defer m.mu.Unlock()
m.collectorsMutex.Lock()
defer m.collectorsMutex.Unlock()
wg := &sync.WaitGroup{}
start := time.Now()
for _, s := range m.collectors {
Expand Down Expand Up @@ -245,14 +248,14 @@ func (m *Metrics) registerStatsSources() {
}

hasNginxCollector := false
m.mu2.Lock()
m.collectorConfigsMapMutex.Lock()
for key := range m.collectorConfigsMap {
tempCollectors = append(tempCollectors,
collectors.NewNginxCollector(m.conf, m.env, m.collectorConfigsMap[key], m.binary),
)
hasNginxCollector = true
}
m.mu2.Unlock()
m.collectorConfigsMapMutex.Unlock()

// if NGINX is not running/detected, still run the static collector to output nginx.status = 0.
if !hasNginxCollector {
Expand All @@ -263,9 +266,9 @@ func (m *Metrics) registerStatsSources() {
)
}

m.mu.Lock()
defer m.mu.Unlock()
m.collectorsMutex.Lock()
m.collectors = tempCollectors
m.collectorsMutex.Unlock()
}

func (m *Metrics) syncAgentConfigChange() {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading