From b4805b79d64439c6000cf302a8b07ec6645d3c0c Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 20 Oct 2022 17:29:56 +0100 Subject: [PATCH 01/15] fix empty metric reports being sent --- nginx-agent.conf | 2 +- src/core/config/defaults.go | 35 +++++------- src/core/network/network.go | 2 +- src/plugins/comms.go | 56 +++++-------------- src/plugins/metrics_throlling.go | 15 ++--- .../agent/v2/src/core/config/defaults.go | 35 +++++------- .../agent/v2/src/core/network/network.go | 2 +- .../nginx/agent/v2/src/plugins/comms.go | 56 +++++-------------- .../agent/v2/src/plugins/metrics_throlling.go | 15 ++--- 9 files changed, 71 insertions(+), 147 deletions(-) diff --git a/nginx-agent.conf b/nginx-agent.conf index 549de3b38..6fa6c4de8 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -10,7 +10,7 @@ # specify the server grpc port to connect to server: # host of the control plane - host: 127.0.0.1 + host: 172.16.0.160 grpcPort: 443 # provide servername overrides if using SNI # metrics: "" diff --git a/src/core/config/defaults.go b/src/core/config/defaults.go index 11ec40568..81052399a 100644 --- a/src/core/config/defaults.go +++ b/src/core/config/defaults.go @@ -287,41 +287,34 @@ var ( }, // Advanced Metrics &StringFlag{ - Name: AdvancedMetricsSocketPath, - Usage: "The advanced metrics socket location.", - DefaultValue: Defaults.AdvancedMetrics.SocketPath, + Name: AdvancedMetricsSocketPath, + Usage: "The advanced metrics socket location.", }, // change to advanced metrics collection interval &DurationFlag{ - Name: AdvancedMetricsAggregationPeriod, - Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", - DefaultValue: Defaults.AdvancedMetrics.AggregationPeriod, + Name: AdvancedMetricsAggregationPeriod, + Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", }, // change to advanced metrics report interval &DurationFlag{ - Name: AdvancedMetricsPublishPeriod, - Usage: "The polling period specified for a single set of advanced metrics being collected.", - DefaultValue: Defaults.AdvancedMetrics.PublishingPeriod, + Name: AdvancedMetricsPublishPeriod, + Usage: "The polling period specified for a single set of advanced metrics being collected.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTMS, - Usage: "Default Maximum Size of the Priority Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsPTMS, + Usage: "Default Maximum Size of the Priority Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTT, - Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold, + Name: AdvancedMetricsTableSizesLimitsPTT, + Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTMS, - Usage: "Default Maximum Size of the Staging Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsSTMS, + Usage: "Default Maximum Size of the Staging Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTT, - Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold, + Name: AdvancedMetricsTableSizesLimitsSTT, + Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", }, // TLS Config &BoolFlag{ diff --git a/src/core/network/network.go b/src/core/network/network.go index dfcddd06b..af37b330c 100644 --- a/src/core/network/network.go +++ b/src/core/network/network.go @@ -147,7 +147,7 @@ func GetDataplaneNetworks() (res *proto.Network) { defaultNetworkInterface, err := getDefaultNetworkInterfaceCrossPlatform() if err != nil { - log.Warnf("Error getting default network interface, %v", err) + log.Debugf("Error getting default network interface, %v", err) } if defaultNetworkInterface == "" && len(ifs) > 0 { diff --git a/src/plugins/comms.go b/src/plugins/comms.go index e7a086955..74fdbf85a 100644 --- a/src/plugins/comms.go +++ b/src/plugins/comms.go @@ -11,14 +11,9 @@ import ( "go.uber.org/atomic" ) -const ( - DefaultMetricsChanLength = 4 * 1024 -) - type Comms struct { reporter client.MetricReporter pipeline core.MessagePipeInterface - reportChan chan *proto.MetricsReport ctx context.Context started *atomic.Bool readyToSend *atomic.Bool @@ -28,7 +23,6 @@ type Comms struct { func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ reporter: reporter, - reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength), started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), wait: sync.WaitGroup{}, @@ -43,7 +37,6 @@ func (r *Comms) Init(pipeline core.MessagePipeInterface) { r.pipeline = pipeline r.ctx = pipeline.Context() log.Info("Comms initializing") - go r.reportLoop() } func (r *Comms) Close() { @@ -69,18 +62,25 @@ func (r *Comms) Process(msg *core.Message) { return } for _, p := range payloads { + if !r.readyToSend.Load() { + continue + } + switch report := p.(type) { case *proto.MetricsReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() + + if len(report.Data) > 0 { + message := client.MessageFromMetrics(report) + err := r.reporter.Send(r.ctx, message) + if err != nil { - log.Errorf("error in done context Process in comms %v", err) + log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) + } else { + log.Tracef("MetricsReport sent, %v", report) } - return - case r.reportChan <- report: - // report queued - log.Debug("report queued") + + } else { + log.Errorf("Got report of length 0 data: %v", report) } } } @@ -90,29 +90,3 @@ func (r *Comms) Process(msg *core.Message) { func (r *Comms) Subscriptions() []string { return []string{core.CommMetrics, core.RegistrationCompletedTopic} } - -func (r *Comms) reportLoop() { - r.wait.Add(1) - defer r.wait.Done() - for { - if !r.readyToSend.Load() { - continue - } - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context reportLoop %v", err) - } - log.Debug("reporter loop exiting") - return - case report := <-r.reportChan: - err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report)) - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } - } - } -} diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 1a6a90843..48a9efd72 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -26,7 +26,6 @@ type MetricsThrottle struct { reportsReady *atomic.Bool collectorsUpdate *atomic.Bool metricsAggregation bool - firstRun bool metricsCollections metrics.Collections ctx context.Context wg sync.WaitGroup @@ -49,7 +48,6 @@ func NewMetricsThrottle(conf *config.Config, env core.Environment) *MetricsThrot reportsReady: atomic.NewBool(false), collectorsUpdate: atomic.NewBool(false), metricsAggregation: conf.AgentMetrics.Mode == "aggregated", - firstRun: true, metricsCollections: metricsCollections, wg: sync.WaitGroup{}, env: env, @@ -130,18 +128,15 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: aggregatedReport := r.getAggregatedReport() - r.messagePipeline.Process( - core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), - ) - if r.firstRun { - // for the first run, we added the staggering time in report cycle, reset it back to regular - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) - r.firstRun = false + if len(aggregatedReport.Data) > 0 { + r.messagePipeline.Process( + core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), + ) } if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) + r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval + reportStaggeringStartTime) r.messagePipeline.Process(core.NewMessage(core.AgentCollectorsUpdate, "")) r.collectorsUpdate.Store(false) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go index 11ec40568..81052399a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go @@ -287,41 +287,34 @@ var ( }, // Advanced Metrics &StringFlag{ - Name: AdvancedMetricsSocketPath, - Usage: "The advanced metrics socket location.", - DefaultValue: Defaults.AdvancedMetrics.SocketPath, + Name: AdvancedMetricsSocketPath, + Usage: "The advanced metrics socket location.", }, // change to advanced metrics collection interval &DurationFlag{ - Name: AdvancedMetricsAggregationPeriod, - Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", - DefaultValue: Defaults.AdvancedMetrics.AggregationPeriod, + Name: AdvancedMetricsAggregationPeriod, + Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", }, // change to advanced metrics report interval &DurationFlag{ - Name: AdvancedMetricsPublishPeriod, - Usage: "The polling period specified for a single set of advanced metrics being collected.", - DefaultValue: Defaults.AdvancedMetrics.PublishingPeriod, + Name: AdvancedMetricsPublishPeriod, + Usage: "The polling period specified for a single set of advanced metrics being collected.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTMS, - Usage: "Default Maximum Size of the Priority Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsPTMS, + Usage: "Default Maximum Size of the Priority Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTT, - Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold, + Name: AdvancedMetricsTableSizesLimitsPTT, + Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTMS, - Usage: "Default Maximum Size of the Staging Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsSTMS, + Usage: "Default Maximum Size of the Staging Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTT, - Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold, + Name: AdvancedMetricsTableSizesLimitsSTT, + Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", }, // TLS Config &BoolFlag{ diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go index dfcddd06b..af37b330c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go @@ -147,7 +147,7 @@ func GetDataplaneNetworks() (res *proto.Network) { defaultNetworkInterface, err := getDefaultNetworkInterfaceCrossPlatform() if err != nil { - log.Warnf("Error getting default network interface, %v", err) + log.Debugf("Error getting default network interface, %v", err) } if defaultNetworkInterface == "" && len(ifs) > 0 { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index e7a086955..74fdbf85a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -11,14 +11,9 @@ import ( "go.uber.org/atomic" ) -const ( - DefaultMetricsChanLength = 4 * 1024 -) - type Comms struct { reporter client.MetricReporter pipeline core.MessagePipeInterface - reportChan chan *proto.MetricsReport ctx context.Context started *atomic.Bool readyToSend *atomic.Bool @@ -28,7 +23,6 @@ type Comms struct { func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ reporter: reporter, - reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength), started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), wait: sync.WaitGroup{}, @@ -43,7 +37,6 @@ func (r *Comms) Init(pipeline core.MessagePipeInterface) { r.pipeline = pipeline r.ctx = pipeline.Context() log.Info("Comms initializing") - go r.reportLoop() } func (r *Comms) Close() { @@ -69,18 +62,25 @@ func (r *Comms) Process(msg *core.Message) { return } for _, p := range payloads { + if !r.readyToSend.Load() { + continue + } + switch report := p.(type) { case *proto.MetricsReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() + + if len(report.Data) > 0 { + message := client.MessageFromMetrics(report) + err := r.reporter.Send(r.ctx, message) + if err != nil { - log.Errorf("error in done context Process in comms %v", err) + log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) + } else { + log.Tracef("MetricsReport sent, %v", report) } - return - case r.reportChan <- report: - // report queued - log.Debug("report queued") + + } else { + log.Errorf("Got report of length 0 data: %v", report) } } } @@ -90,29 +90,3 @@ func (r *Comms) Process(msg *core.Message) { func (r *Comms) Subscriptions() []string { return []string{core.CommMetrics, core.RegistrationCompletedTopic} } - -func (r *Comms) reportLoop() { - r.wait.Add(1) - defer r.wait.Done() - for { - if !r.readyToSend.Load() { - continue - } - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context reportLoop %v", err) - } - log.Debug("reporter loop exiting") - return - case report := <-r.reportChan: - err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report)) - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } - } - } -} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 1a6a90843..48a9efd72 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -26,7 +26,6 @@ type MetricsThrottle struct { reportsReady *atomic.Bool collectorsUpdate *atomic.Bool metricsAggregation bool - firstRun bool metricsCollections metrics.Collections ctx context.Context wg sync.WaitGroup @@ -49,7 +48,6 @@ func NewMetricsThrottle(conf *config.Config, env core.Environment) *MetricsThrot reportsReady: atomic.NewBool(false), collectorsUpdate: atomic.NewBool(false), metricsAggregation: conf.AgentMetrics.Mode == "aggregated", - firstRun: true, metricsCollections: metricsCollections, wg: sync.WaitGroup{}, env: env, @@ -130,18 +128,15 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: aggregatedReport := r.getAggregatedReport() - r.messagePipeline.Process( - core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), - ) - if r.firstRun { - // for the first run, we added the staggering time in report cycle, reset it back to regular - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) - r.firstRun = false + if len(aggregatedReport.Data) > 0 { + r.messagePipeline.Process( + core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), + ) } if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) + r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval + reportStaggeringStartTime) r.messagePipeline.Process(core.NewMessage(core.AgentCollectorsUpdate, "")) r.collectorsUpdate.Store(false) } From faadb6c72afd6d2a509bafd8527de5784c9df016 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 21 Oct 2022 12:18:05 +0100 Subject: [PATCH 02/15] fix failing unit test --- src/plugins/comms_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/plugins/comms_test.go b/src/plugins/comms_test.go index ddc804194..67843e4af 100644 --- a/src/plugins/comms_test.go +++ b/src/plugins/comms_test.go @@ -49,10 +49,13 @@ func TestCommsSendMetrics(t *testing.T) { assert.True(t, pluginUnderTest.readyToSend.Load()) + metricData := make([]*proto.StatsEntity, 0, 1) + metricData = append(metricData, &proto.StatsEntity{Simplemetrics: []*proto.SimpleMetric{{Name: "Metric A", Value: 5}}}) + pluginUnderTest.Process(core.NewMessage(core.CommMetrics, []core.Payload{&proto.MetricsReport{ Meta: &proto.Metadata{Timestamp: types.TimestampNow()}, Type: proto.MetricsReport_INSTANCE, - Data: make([]*proto.StatsEntity, 0, 1), + Data: metricData, }})) time.Sleep(1 * time.Second) // for the above call being asynchronous From d40e953c638b9ffed31cc4dd922c3e886fc143ad Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 20 Oct 2022 17:29:56 +0100 Subject: [PATCH 03/15] fix empty metric reports being sent --- nginx-agent.conf | 2 +- src/core/config/defaults.go | 35 +++++------- src/core/network/network.go | 2 +- src/plugins/comms.go | 56 +++++-------------- src/plugins/metrics_throlling.go | 15 ++--- .../agent/v2/src/core/config/defaults.go | 35 +++++------- .../agent/v2/src/core/network/network.go | 2 +- .../nginx/agent/v2/src/plugins/comms.go | 56 +++++-------------- .../agent/v2/src/plugins/metrics_throlling.go | 15 ++--- 9 files changed, 71 insertions(+), 147 deletions(-) diff --git a/nginx-agent.conf b/nginx-agent.conf index 549de3b38..6fa6c4de8 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -10,7 +10,7 @@ # specify the server grpc port to connect to server: # host of the control plane - host: 127.0.0.1 + host: 172.16.0.160 grpcPort: 443 # provide servername overrides if using SNI # metrics: "" diff --git a/src/core/config/defaults.go b/src/core/config/defaults.go index 11ec40568..81052399a 100644 --- a/src/core/config/defaults.go +++ b/src/core/config/defaults.go @@ -287,41 +287,34 @@ var ( }, // Advanced Metrics &StringFlag{ - Name: AdvancedMetricsSocketPath, - Usage: "The advanced metrics socket location.", - DefaultValue: Defaults.AdvancedMetrics.SocketPath, + Name: AdvancedMetricsSocketPath, + Usage: "The advanced metrics socket location.", }, // change to advanced metrics collection interval &DurationFlag{ - Name: AdvancedMetricsAggregationPeriod, - Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", - DefaultValue: Defaults.AdvancedMetrics.AggregationPeriod, + Name: AdvancedMetricsAggregationPeriod, + Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", }, // change to advanced metrics report interval &DurationFlag{ - Name: AdvancedMetricsPublishPeriod, - Usage: "The polling period specified for a single set of advanced metrics being collected.", - DefaultValue: Defaults.AdvancedMetrics.PublishingPeriod, + Name: AdvancedMetricsPublishPeriod, + Usage: "The polling period specified for a single set of advanced metrics being collected.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTMS, - Usage: "Default Maximum Size of the Priority Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsPTMS, + Usage: "Default Maximum Size of the Priority Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTT, - Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold, + Name: AdvancedMetricsTableSizesLimitsPTT, + Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTMS, - Usage: "Default Maximum Size of the Staging Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsSTMS, + Usage: "Default Maximum Size of the Staging Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTT, - Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold, + Name: AdvancedMetricsTableSizesLimitsSTT, + Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", }, // TLS Config &BoolFlag{ diff --git a/src/core/network/network.go b/src/core/network/network.go index dfcddd06b..af37b330c 100644 --- a/src/core/network/network.go +++ b/src/core/network/network.go @@ -147,7 +147,7 @@ func GetDataplaneNetworks() (res *proto.Network) { defaultNetworkInterface, err := getDefaultNetworkInterfaceCrossPlatform() if err != nil { - log.Warnf("Error getting default network interface, %v", err) + log.Debugf("Error getting default network interface, %v", err) } if defaultNetworkInterface == "" && len(ifs) > 0 { diff --git a/src/plugins/comms.go b/src/plugins/comms.go index e7a086955..74fdbf85a 100644 --- a/src/plugins/comms.go +++ b/src/plugins/comms.go @@ -11,14 +11,9 @@ import ( "go.uber.org/atomic" ) -const ( - DefaultMetricsChanLength = 4 * 1024 -) - type Comms struct { reporter client.MetricReporter pipeline core.MessagePipeInterface - reportChan chan *proto.MetricsReport ctx context.Context started *atomic.Bool readyToSend *atomic.Bool @@ -28,7 +23,6 @@ type Comms struct { func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ reporter: reporter, - reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength), started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), wait: sync.WaitGroup{}, @@ -43,7 +37,6 @@ func (r *Comms) Init(pipeline core.MessagePipeInterface) { r.pipeline = pipeline r.ctx = pipeline.Context() log.Info("Comms initializing") - go r.reportLoop() } func (r *Comms) Close() { @@ -69,18 +62,25 @@ func (r *Comms) Process(msg *core.Message) { return } for _, p := range payloads { + if !r.readyToSend.Load() { + continue + } + switch report := p.(type) { case *proto.MetricsReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() + + if len(report.Data) > 0 { + message := client.MessageFromMetrics(report) + err := r.reporter.Send(r.ctx, message) + if err != nil { - log.Errorf("error in done context Process in comms %v", err) + log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) + } else { + log.Tracef("MetricsReport sent, %v", report) } - return - case r.reportChan <- report: - // report queued - log.Debug("report queued") + + } else { + log.Errorf("Got report of length 0 data: %v", report) } } } @@ -90,29 +90,3 @@ func (r *Comms) Process(msg *core.Message) { func (r *Comms) Subscriptions() []string { return []string{core.CommMetrics, core.RegistrationCompletedTopic} } - -func (r *Comms) reportLoop() { - r.wait.Add(1) - defer r.wait.Done() - for { - if !r.readyToSend.Load() { - continue - } - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context reportLoop %v", err) - } - log.Debug("reporter loop exiting") - return - case report := <-r.reportChan: - err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report)) - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } - } - } -} diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 1a6a90843..48a9efd72 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -26,7 +26,6 @@ type MetricsThrottle struct { reportsReady *atomic.Bool collectorsUpdate *atomic.Bool metricsAggregation bool - firstRun bool metricsCollections metrics.Collections ctx context.Context wg sync.WaitGroup @@ -49,7 +48,6 @@ func NewMetricsThrottle(conf *config.Config, env core.Environment) *MetricsThrot reportsReady: atomic.NewBool(false), collectorsUpdate: atomic.NewBool(false), metricsAggregation: conf.AgentMetrics.Mode == "aggregated", - firstRun: true, metricsCollections: metricsCollections, wg: sync.WaitGroup{}, env: env, @@ -130,18 +128,15 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: aggregatedReport := r.getAggregatedReport() - r.messagePipeline.Process( - core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), - ) - if r.firstRun { - // for the first run, we added the staggering time in report cycle, reset it back to regular - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) - r.firstRun = false + if len(aggregatedReport.Data) > 0 { + r.messagePipeline.Process( + core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), + ) } if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) + r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval + reportStaggeringStartTime) r.messagePipeline.Process(core.NewMessage(core.AgentCollectorsUpdate, "")) r.collectorsUpdate.Store(false) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go index 11ec40568..81052399a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go @@ -287,41 +287,34 @@ var ( }, // Advanced Metrics &StringFlag{ - Name: AdvancedMetricsSocketPath, - Usage: "The advanced metrics socket location.", - DefaultValue: Defaults.AdvancedMetrics.SocketPath, + Name: AdvancedMetricsSocketPath, + Usage: "The advanced metrics socket location.", }, // change to advanced metrics collection interval &DurationFlag{ - Name: AdvancedMetricsAggregationPeriod, - Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", - DefaultValue: Defaults.AdvancedMetrics.AggregationPeriod, + Name: AdvancedMetricsAggregationPeriod, + Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", }, // change to advanced metrics report interval &DurationFlag{ - Name: AdvancedMetricsPublishPeriod, - Usage: "The polling period specified for a single set of advanced metrics being collected.", - DefaultValue: Defaults.AdvancedMetrics.PublishingPeriod, + Name: AdvancedMetricsPublishPeriod, + Usage: "The polling period specified for a single set of advanced metrics being collected.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTMS, - Usage: "Default Maximum Size of the Priority Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsPTMS, + Usage: "Default Maximum Size of the Priority Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTT, - Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold, + Name: AdvancedMetricsTableSizesLimitsPTT, + Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTMS, - Usage: "Default Maximum Size of the Staging Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsSTMS, + Usage: "Default Maximum Size of the Staging Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTT, - Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold, + Name: AdvancedMetricsTableSizesLimitsSTT, + Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", }, // TLS Config &BoolFlag{ diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go index dfcddd06b..af37b330c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go @@ -147,7 +147,7 @@ func GetDataplaneNetworks() (res *proto.Network) { defaultNetworkInterface, err := getDefaultNetworkInterfaceCrossPlatform() if err != nil { - log.Warnf("Error getting default network interface, %v", err) + log.Debugf("Error getting default network interface, %v", err) } if defaultNetworkInterface == "" && len(ifs) > 0 { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index e7a086955..74fdbf85a 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -11,14 +11,9 @@ import ( "go.uber.org/atomic" ) -const ( - DefaultMetricsChanLength = 4 * 1024 -) - type Comms struct { reporter client.MetricReporter pipeline core.MessagePipeInterface - reportChan chan *proto.MetricsReport ctx context.Context started *atomic.Bool readyToSend *atomic.Bool @@ -28,7 +23,6 @@ type Comms struct { func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ reporter: reporter, - reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength), started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), wait: sync.WaitGroup{}, @@ -43,7 +37,6 @@ func (r *Comms) Init(pipeline core.MessagePipeInterface) { r.pipeline = pipeline r.ctx = pipeline.Context() log.Info("Comms initializing") - go r.reportLoop() } func (r *Comms) Close() { @@ -69,18 +62,25 @@ func (r *Comms) Process(msg *core.Message) { return } for _, p := range payloads { + if !r.readyToSend.Load() { + continue + } + switch report := p.(type) { case *proto.MetricsReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() + + if len(report.Data) > 0 { + message := client.MessageFromMetrics(report) + err := r.reporter.Send(r.ctx, message) + if err != nil { - log.Errorf("error in done context Process in comms %v", err) + log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) + } else { + log.Tracef("MetricsReport sent, %v", report) } - return - case r.reportChan <- report: - // report queued - log.Debug("report queued") + + } else { + log.Errorf("Got report of length 0 data: %v", report) } } } @@ -90,29 +90,3 @@ func (r *Comms) Process(msg *core.Message) { func (r *Comms) Subscriptions() []string { return []string{core.CommMetrics, core.RegistrationCompletedTopic} } - -func (r *Comms) reportLoop() { - r.wait.Add(1) - defer r.wait.Done() - for { - if !r.readyToSend.Load() { - continue - } - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context reportLoop %v", err) - } - log.Debug("reporter loop exiting") - return - case report := <-r.reportChan: - err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report)) - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } - } - } -} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 1a6a90843..48a9efd72 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -26,7 +26,6 @@ type MetricsThrottle struct { reportsReady *atomic.Bool collectorsUpdate *atomic.Bool metricsAggregation bool - firstRun bool metricsCollections metrics.Collections ctx context.Context wg sync.WaitGroup @@ -49,7 +48,6 @@ func NewMetricsThrottle(conf *config.Config, env core.Environment) *MetricsThrot reportsReady: atomic.NewBool(false), collectorsUpdate: atomic.NewBool(false), metricsAggregation: conf.AgentMetrics.Mode == "aggregated", - firstRun: true, metricsCollections: metricsCollections, wg: sync.WaitGroup{}, env: env, @@ -130,18 +128,15 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: aggregatedReport := r.getAggregatedReport() - r.messagePipeline.Process( - core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), - ) - if r.firstRun { - // for the first run, we added the staggering time in report cycle, reset it back to regular - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) - r.firstRun = false + if len(aggregatedReport.Data) > 0 { + r.messagePipeline.Process( + core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), + ) } if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) + r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval + reportStaggeringStartTime) r.messagePipeline.Process(core.NewMessage(core.AgentCollectorsUpdate, "")) r.collectorsUpdate.Store(false) } From 542b397c13189533b81f859971122c2d59f4fd22 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 21 Oct 2022 12:18:05 +0100 Subject: [PATCH 04/15] fix failing unit test --- src/plugins/comms_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/plugins/comms_test.go b/src/plugins/comms_test.go index ddc804194..67843e4af 100644 --- a/src/plugins/comms_test.go +++ b/src/plugins/comms_test.go @@ -49,10 +49,13 @@ func TestCommsSendMetrics(t *testing.T) { assert.True(t, pluginUnderTest.readyToSend.Load()) + metricData := make([]*proto.StatsEntity, 0, 1) + metricData = append(metricData, &proto.StatsEntity{Simplemetrics: []*proto.SimpleMetric{{Name: "Metric A", Value: 5}}}) + pluginUnderTest.Process(core.NewMessage(core.CommMetrics, []core.Payload{&proto.MetricsReport{ Meta: &proto.Metadata{Timestamp: types.TimestampNow()}, Type: proto.MetricsReport_INSTANCE, - Data: make([]*proto.StatsEntity, 0, 1), + Data: metricData, }})) time.Sleep(1 * time.Second) // for the above call being asynchronous From b42b9e77aedfb6c9d3aff454d1e3a385c05ca2c0 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 21 Oct 2022 12:45:44 +0100 Subject: [PATCH 05/15] fix empty metric reports --- nginx-agent.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nginx-agent.conf b/nginx-agent.conf index 6fa6c4de8..549de3b38 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -10,7 +10,7 @@ # specify the server grpc port to connect to server: # host of the control plane - host: 172.16.0.160 + host: 127.0.0.1 grpcPort: 443 # provide servername overrides if using SNI # metrics: "" From 7f5f6c7ef6c34243999c115de5ef7c8ee11af803 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 21 Oct 2022 14:19:05 +0100 Subject: [PATCH 06/15] fix empty metric reports --- src/plugins/comms.go | 5 +---- .../vendor/github.com/nginx/agent/v2/src/plugins/comms.go | 4 +--- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/plugins/comms.go b/src/plugins/comms.go index 74fdbf85a..162ffcb26 100644 --- a/src/plugins/comms.go +++ b/src/plugins/comms.go @@ -2,7 +2,6 @@ package plugins import ( "context" - "sync" "github.com/nginx/agent/sdk/v2/client" "github.com/nginx/agent/sdk/v2/proto" @@ -17,7 +16,6 @@ type Comms struct { ctx context.Context started *atomic.Bool readyToSend *atomic.Bool - wait sync.WaitGroup } func NewComms(reporter client.MetricReporter) *Comms { @@ -25,7 +23,6 @@ func NewComms(reporter client.MetricReporter) *Comms { reporter: reporter, started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), - wait: sync.WaitGroup{}, } } @@ -80,7 +77,7 @@ func (r *Comms) Process(msg *core.Message) { } } else { - log.Errorf("Got report of length 0 data: %v", report) + log.Debugf("Got report of length 0 data: %v", report) } } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index 74fdbf85a..1657404e9 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -17,7 +17,6 @@ type Comms struct { ctx context.Context started *atomic.Bool readyToSend *atomic.Bool - wait sync.WaitGroup } func NewComms(reporter client.MetricReporter) *Comms { @@ -25,7 +24,6 @@ func NewComms(reporter client.MetricReporter) *Comms { reporter: reporter, started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), - wait: sync.WaitGroup{}, } } @@ -80,7 +78,7 @@ func (r *Comms) Process(msg *core.Message) { } } else { - log.Errorf("Got report of length 0 data: %v", report) + log.Debugf("Got report of length 0 data: %v", report) } } } From 7a3ed440807bf40c764f9da3fd5cda39d1879989 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Fri, 21 Oct 2022 14:25:11 +0100 Subject: [PATCH 07/15] fix empty metric reports --- .../vendor/github.com/nginx/agent/v2/src/plugins/comms.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index 1657404e9..162ffcb26 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -2,7 +2,6 @@ package plugins import ( "context" - "sync" "github.com/nginx/agent/sdk/v2/client" "github.com/nginx/agent/sdk/v2/proto" From 8af4a18425da4b1740a0dd0fdca43c7828c84270 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Mon, 24 Oct 2022 12:24:27 +0100 Subject: [PATCH 08/15] remove checks --- nginx-agent.conf | 4 ++-- src/plugins/comms.go | 15 +++++---------- src/plugins/metrics_throlling.go | 8 +++----- .../nginx/agent/v2/src/plugins/comms.go | 15 +++++---------- .../agent/v2/src/plugins/metrics_throlling.go | 8 +++----- 5 files changed, 18 insertions(+), 32 deletions(-) diff --git a/nginx-agent.conf b/nginx-agent.conf index 549de3b38..0196dfc01 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -10,7 +10,7 @@ # specify the server grpc port to connect to server: # host of the control plane - host: 127.0.0.1 + host: 172.16.0.18 grpcPort: 443 # provide servername overrides if using SNI # metrics: "" @@ -33,7 +33,7 @@ tls: skip_verify: true log: # set log level (panic, fatal, error, info, debug, trace; default "info") - level: info + level: trace # set log path. if empty, don't log to file. path: /var/log/nginx-agent/ # data plane status message / 'heartbeat' diff --git a/src/plugins/comms.go b/src/plugins/comms.go index 162ffcb26..743c86176 100644 --- a/src/plugins/comms.go +++ b/src/plugins/comms.go @@ -66,18 +66,13 @@ func (r *Comms) Process(msg *core.Message) { switch report := p.(type) { case *proto.MetricsReport: - if len(report.Data) > 0 { - message := client.MessageFromMetrics(report) - err := r.reporter.Send(r.ctx, message) - - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } + message := client.MessageFromMetrics(report) + err := r.reporter.Send(r.ctx, message) + if err != nil { + log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) } else { - log.Debugf("Got report of length 0 data: %v", report) + log.Tracef("MetricsReport sent, %v", report) } } } diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 48a9efd72..8a5548e68 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -128,11 +128,9 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: aggregatedReport := r.getAggregatedReport() - if len(aggregatedReport.Data) > 0 { - r.messagePipeline.Process( - core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), - ) - } + r.messagePipeline.Process( + core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), + ) if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index 162ffcb26..743c86176 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -66,18 +66,13 @@ func (r *Comms) Process(msg *core.Message) { switch report := p.(type) { case *proto.MetricsReport: - if len(report.Data) > 0 { - message := client.MessageFromMetrics(report) - err := r.reporter.Send(r.ctx, message) - - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } + message := client.MessageFromMetrics(report) + err := r.reporter.Send(r.ctx, message) + if err != nil { + log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) } else { - log.Debugf("Got report of length 0 data: %v", report) + log.Tracef("MetricsReport sent, %v", report) } } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 48a9efd72..8a5548e68 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -128,11 +128,9 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W return case <-r.ticker.C: aggregatedReport := r.getAggregatedReport() - if len(aggregatedReport.Data) > 0 { - r.messagePipeline.Process( - core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), - ) - } + r.messagePipeline.Process( + core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), + ) if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" From f71ca064662d309b7c3b386a528a70bac635a727 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Mon, 24 Oct 2022 14:41:27 +0100 Subject: [PATCH 09/15] fix empty metrics reports --- nginx-agent.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nginx-agent.conf b/nginx-agent.conf index 0196dfc01..549de3b38 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -10,7 +10,7 @@ # specify the server grpc port to connect to server: # host of the control plane - host: 172.16.0.18 + host: 127.0.0.1 grpcPort: 443 # provide servername overrides if using SNI # metrics: "" @@ -33,7 +33,7 @@ tls: skip_verify: true log: # set log level (panic, fatal, error, info, debug, trace; default "info") - level: trace + level: info # set log path. if empty, don't log to file. path: /var/log/nginx-agent/ # data plane status message / 'heartbeat' From 2ca6a0816bad5c8c0a98fb805f6f46690c63f6a4 Mon Sep 17 00:00:00 2001 From: "o.omahony" Date: Thu, 3 Nov 2022 11:02:07 +0000 Subject: [PATCH 10/15] removing unused code --- src/core/config/config.go | 27 ++++++------ src/core/config/defaults.go | 12 ----- src/plugins/comms.go | 44 ------------------- .../nginx/agent/v2/src/core/config/config.go | 27 ++++++------ .../agent/v2/src/core/config/defaults.go | 12 ----- .../nginx/agent/v2/src/plugins/comms.go | 44 ------------------- 6 files changed, 28 insertions(+), 138 deletions(-) diff --git a/src/core/config/config.go b/src/core/config/config.go index 1d89affea..297fdad9b 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -72,16 +72,6 @@ func SetDefaults() { Viper.SetDefault(NginxClientVersion, Defaults.Nginx.NginxClientVersion) } -func SetAdvancedMetricsDefaults() { - Viper.SetDefault(AdvancedMetricsSocketPath, Defaults.AdvancedMetrics.SocketPath) - Viper.SetDefault(AdvancedMetricsAggregationPeriod, Defaults.AdvancedMetrics.AggregationPeriod) - Viper.SetDefault(AdvancedMetricsPublishPeriod, Defaults.AdvancedMetrics.PublishingPeriod) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTMS, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTT, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTMS, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTT, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold) -} - func SetNginxAppProtectDefaults() { Viper.SetDefault(NginxAppProtectReportInterval, Defaults.NginxAppProtect.ReportInterval) } @@ -138,7 +128,6 @@ func RegisterFlags() { if err := Viper.BindPFlag(strings.ReplaceAll(flag.Name, "-", "_"), fs.Lookup(flag.Name)); err != nil { return } - // Viper.BindPFlag(flag.Name, ROOT_COMMAND.Flags().Lookup(flag.Name)) err := Viper.BindEnv(flag.Name) if err != nil { log.Warnf("error occurred binding env %s: %v", flag.Name, err) @@ -183,7 +172,6 @@ func GetConfig(clientId string) (*Config, error) { AllowedDirectoriesMap: map[string]struct{}{}, DisplayName: Viper.GetString(DisplayNameKey), InstanceGroup: Viper.GetString(InstanceGroupKey), - AdvancedMetrics: getAdvancedMetrics(), NginxAppProtect: getNginxAppProtect(), NAPMonitoring: getNAPMonitoring(), } @@ -291,7 +279,6 @@ func getDataplane() Dataplane { }, } } - func getAdvancedMetrics() AdvancedMetrics { return AdvancedMetrics{ SocketPath: Viper.GetString(AdvancedMetricsSocketPath), @@ -306,6 +293,20 @@ func getAdvancedMetrics() AdvancedMetrics { } } +func SetAdvancedMetricsDefaults() AdvancedMetrics { + return AdvancedMetrics{ + SocketPath: Viper.GetString(AdvancedMetricsSocketPath), + AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod), + PublishingPeriod: Viper.GetDuration(AdvancedMetricsPublishPeriod), + TableSizesLimits: advanced_metrics.TableSizesLimits{ + StagingTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTMS), + StagingTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTT), + PriorityTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTMS), + PriorityTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTT), + }, + } +} + func getNginxAppProtect() NginxAppProtect { return NginxAppProtect{ ReportInterval: Viper.GetDuration(NginxAppProtectReportInterval), diff --git a/src/core/config/defaults.go b/src/core/config/defaults.go index f28baeb77..7da7effcb 100644 --- a/src/core/config/defaults.go +++ b/src/core/config/defaults.go @@ -5,7 +5,6 @@ import ( "time" "github.com/google/uuid" - advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics" log "github.com/sirupsen/logrus" ) @@ -67,17 +66,6 @@ var ( CollectionInterval: 15 * time.Second, Mode: "aggregation", }, - AdvancedMetrics: AdvancedMetrics{ - SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", - AggregationPeriod: time.Second * 10, - PublishingPeriod: time.Second * 30, - TableSizesLimits: advanced_metrics.TableSizesLimits{ - StagingTableThreshold: 1000, - StagingTableMaxSize: 1000, - PriorityTableThreshold: 1000, - PriorityTableMaxSize: 1000, - }, - }, NAPMonitoring: NAPMonitoring{ ProcessorBufferSize: 50000, CollectorBufferSize: 50000, diff --git a/src/plugins/comms.go b/src/plugins/comms.go index b851235d8..a32a65ab4 100644 --- a/src/plugins/comms.go +++ b/src/plugins/comms.go @@ -2,8 +2,6 @@ package plugins import ( "context" - "strings" - "sync" log "github.com/sirupsen/logrus" "go.uber.org/atomic" @@ -27,7 +25,6 @@ type Comms struct { ctx context.Context started *atomic.Bool readyToSend *atomic.Bool - wait sync.WaitGroup } func NewComms(reporter client.MetricReporter) *Comms { @@ -37,7 +34,6 @@ func NewComms(reporter client.MetricReporter) *Comms { reportEventsChan: make(chan *models.EventReport, DefaultEventsChanLength), started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), - wait: sync.WaitGroup{}, } } @@ -108,43 +104,3 @@ func (r *Comms) Process(msg *core.Message) { func (r *Comms) Subscriptions() []string { return []string{core.CommMetrics, core.RegistrationCompletedTopic} } - -func (r *Comms) reportLoop() { - r.wait.Add(1) - defer r.wait.Done() - for { - if !r.readyToSend.Load() { - continue - } - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context reportLoop %v", err) - } - log.Debug("reporter loop exiting") - return - case report := <-r.reportChan: - err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report)) - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } - case report := <-r.reportEventsChan: - err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) - if err != nil { - l := len(report.Events) - var sb strings.Builder - for i := 0; i < l-1; i++ { - sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID) - sb.WriteString(", ") - } - sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID) - log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String()) - } else { - log.Tracef("EventReport sent, %v", report) - } - } - } -} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index 1d89affea..297fdad9b 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -72,16 +72,6 @@ func SetDefaults() { Viper.SetDefault(NginxClientVersion, Defaults.Nginx.NginxClientVersion) } -func SetAdvancedMetricsDefaults() { - Viper.SetDefault(AdvancedMetricsSocketPath, Defaults.AdvancedMetrics.SocketPath) - Viper.SetDefault(AdvancedMetricsAggregationPeriod, Defaults.AdvancedMetrics.AggregationPeriod) - Viper.SetDefault(AdvancedMetricsPublishPeriod, Defaults.AdvancedMetrics.PublishingPeriod) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTMS, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTT, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTMS, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTT, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold) -} - func SetNginxAppProtectDefaults() { Viper.SetDefault(NginxAppProtectReportInterval, Defaults.NginxAppProtect.ReportInterval) } @@ -138,7 +128,6 @@ func RegisterFlags() { if err := Viper.BindPFlag(strings.ReplaceAll(flag.Name, "-", "_"), fs.Lookup(flag.Name)); err != nil { return } - // Viper.BindPFlag(flag.Name, ROOT_COMMAND.Flags().Lookup(flag.Name)) err := Viper.BindEnv(flag.Name) if err != nil { log.Warnf("error occurred binding env %s: %v", flag.Name, err) @@ -183,7 +172,6 @@ func GetConfig(clientId string) (*Config, error) { AllowedDirectoriesMap: map[string]struct{}{}, DisplayName: Viper.GetString(DisplayNameKey), InstanceGroup: Viper.GetString(InstanceGroupKey), - AdvancedMetrics: getAdvancedMetrics(), NginxAppProtect: getNginxAppProtect(), NAPMonitoring: getNAPMonitoring(), } @@ -291,7 +279,6 @@ func getDataplane() Dataplane { }, } } - func getAdvancedMetrics() AdvancedMetrics { return AdvancedMetrics{ SocketPath: Viper.GetString(AdvancedMetricsSocketPath), @@ -306,6 +293,20 @@ func getAdvancedMetrics() AdvancedMetrics { } } +func SetAdvancedMetricsDefaults() AdvancedMetrics { + return AdvancedMetrics{ + SocketPath: Viper.GetString(AdvancedMetricsSocketPath), + AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod), + PublishingPeriod: Viper.GetDuration(AdvancedMetricsPublishPeriod), + TableSizesLimits: advanced_metrics.TableSizesLimits{ + StagingTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTMS), + StagingTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTT), + PriorityTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTMS), + PriorityTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTT), + }, + } +} + func getNginxAppProtect() NginxAppProtect { return NginxAppProtect{ ReportInterval: Viper.GetDuration(NginxAppProtectReportInterval), diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go index f28baeb77..7da7effcb 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go @@ -5,7 +5,6 @@ import ( "time" "github.com/google/uuid" - advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics" log "github.com/sirupsen/logrus" ) @@ -67,17 +66,6 @@ var ( CollectionInterval: 15 * time.Second, Mode: "aggregation", }, - AdvancedMetrics: AdvancedMetrics{ - SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", - AggregationPeriod: time.Second * 10, - PublishingPeriod: time.Second * 30, - TableSizesLimits: advanced_metrics.TableSizesLimits{ - StagingTableThreshold: 1000, - StagingTableMaxSize: 1000, - PriorityTableThreshold: 1000, - PriorityTableMaxSize: 1000, - }, - }, NAPMonitoring: NAPMonitoring{ ProcessorBufferSize: 50000, CollectorBufferSize: 50000, diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index b851235d8..a32a65ab4 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -2,8 +2,6 @@ package plugins import ( "context" - "strings" - "sync" log "github.com/sirupsen/logrus" "go.uber.org/atomic" @@ -27,7 +25,6 @@ type Comms struct { ctx context.Context started *atomic.Bool readyToSend *atomic.Bool - wait sync.WaitGroup } func NewComms(reporter client.MetricReporter) *Comms { @@ -37,7 +34,6 @@ func NewComms(reporter client.MetricReporter) *Comms { reportEventsChan: make(chan *models.EventReport, DefaultEventsChanLength), started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), - wait: sync.WaitGroup{}, } } @@ -108,43 +104,3 @@ func (r *Comms) Process(msg *core.Message) { func (r *Comms) Subscriptions() []string { return []string{core.CommMetrics, core.RegistrationCompletedTopic} } - -func (r *Comms) reportLoop() { - r.wait.Add(1) - defer r.wait.Done() - for { - if !r.readyToSend.Load() { - continue - } - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context reportLoop %v", err) - } - log.Debug("reporter loop exiting") - return - case report := <-r.reportChan: - err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report)) - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } - case report := <-r.reportEventsChan: - err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) - if err != nil { - l := len(report.Events) - var sb strings.Builder - for i := 0; i < l-1; i++ { - sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID) - sb.WriteString(", ") - } - sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID) - log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String()) - } else { - log.Tracef("EventReport sent, %v", report) - } - } - } -} From ef4d211b1afa3bade846f3d35215eaa37b19226d Mon Sep 17 00:00:00 2001 From: "o.omahony" Date: Thu, 3 Nov 2022 12:01:16 +0000 Subject: [PATCH 11/15] remove issues with the merge. Dropped multiple channels and tidied the code --- src/plugins/comms.go | 28 +++++++------------ .../nginx/agent/v2/src/plugins/comms.go | 28 +++++++------------ 2 files changed, 20 insertions(+), 36 deletions(-) diff --git a/src/plugins/comms.go b/src/plugins/comms.go index a32a65ab4..19c642091 100644 --- a/src/plugins/comms.go +++ b/src/plugins/comms.go @@ -2,6 +2,7 @@ package plugins import ( "context" + "strings" log "github.com/sirupsen/logrus" "go.uber.org/atomic" @@ -12,16 +13,9 @@ import ( "github.com/nginx/agent/v2/src/core" ) -const ( - DefaultMetricsChanLength = 4 * 1024 - DefaultEventsChanLength = 4 * 1024 -) - type Comms struct { reporter client.MetricReporter pipeline core.MessagePipeInterface - reportChan chan *proto.MetricsReport - reportEventsChan chan *models.EventReport ctx context.Context started *atomic.Bool readyToSend *atomic.Bool @@ -30,8 +24,6 @@ type Comms struct { func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ reporter: reporter, - reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength), - reportEventsChan: make(chan *models.EventReport, DefaultEventsChanLength), started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), } @@ -85,16 +77,16 @@ func (r *Comms) Process(msg *core.Message) { log.Tracef("MetricsReport sent, %v", report) } case *models.EventReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context Process in comms %v", err) + err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) + if err != nil { + l := len(report.Events) + var sb strings.Builder + for i := 0; i < l-1; i++ { + sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID) + sb.WriteString(", ") } - return - case r.reportEventsChan <- report: - // report queued - log.Debug("events report queued") + sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID) + log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String()) } } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index 0c4400c98..19c642091 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -2,6 +2,7 @@ package plugins import ( "context" + "strings" log "github.com/sirupsen/logrus" "go.uber.org/atomic" @@ -12,16 +13,9 @@ import ( "github.com/nginx/agent/v2/src/core" ) -const ( - DefaultMetricsChanLength = 10 * 1024 - DefaultEventsChanLength = 10 * 1024 -) - type Comms struct { reporter client.MetricReporter pipeline core.MessagePipeInterface - reportChan chan *proto.MetricsReport - reportEventsChan chan *models.EventReport ctx context.Context started *atomic.Bool readyToSend *atomic.Bool @@ -30,8 +24,6 @@ type Comms struct { func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ reporter: reporter, - reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength), - reportEventsChan: make(chan *models.EventReport, DefaultEventsChanLength), started: atomic.NewBool(false), readyToSend: atomic.NewBool(false), } @@ -85,16 +77,16 @@ func (r *Comms) Process(msg *core.Message) { log.Tracef("MetricsReport sent, %v", report) } case *models.EventReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context Process in comms %v", err) + err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) + if err != nil { + l := len(report.Events) + var sb strings.Builder + for i := 0; i < l-1; i++ { + sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID) + sb.WriteString(", ") } - return - case r.reportEventsChan <- report: - // report queued - log.Debug("events report queued") + sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID) + log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String()) } } } From 35ff06baa7ab46aebf7af42efc8e340e417e6edf Mon Sep 17 00:00:00 2001 From: "o.omahony" Date: Thu, 3 Nov 2022 12:07:44 +0000 Subject: [PATCH 12/15] updated metrics. Defaults already considered --- src/core/config/config.go | 2 +- src/plugins/advanced_metrics.go | 4 ++-- src/plugins/comms.go | 16 ++++++++-------- src/plugins/extensions.go | 1 - .../nginx/agent/v2/src/core/config/config.go | 2 +- .../agent/v2/src/plugins/advanced_metrics.go | 4 ++-- .../nginx/agent/v2/src/plugins/comms.go | 16 ++++++++-------- .../nginx/agent/v2/src/plugins/extensions.go | 1 - 8 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/core/config/config.go b/src/core/config/config.go index bb30b1f4a..47c8e8dbb 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -280,7 +280,7 @@ func getDataplane() Dataplane { } } -func SetAdvancedMetricsDefaults() AdvancedMetrics { +func getAdvancedMetrics() AdvancedMetrics { return AdvancedMetrics{ SocketPath: Viper.GetString(AdvancedMetricsSocketPath), AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod), diff --git a/src/plugins/advanced_metrics.go b/src/plugins/advanced_metrics.go index e6076965f..6aab480cc 100644 --- a/src/plugins/advanced_metrics.go +++ b/src/plugins/advanced_metrics.go @@ -156,7 +156,7 @@ func NewAdvancedMetrics(env core.Environment, conf *config.Config) *AdvancedMetr TableSizesLimits: conf.AdvancedMetrics.TableSizesLimits, } - checkAdvancedMetricsDefaults(&cfg) + CheckAdvancedMetricsDefaults(&cfg) schema, err := builder.Build() if err != nil { @@ -311,7 +311,7 @@ func (m *AdvancedMetrics) Subscriptions() []string { return []string{} } -func checkAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { +func CheckAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { config.CheckAndSetDefault(&cfg.Address, config.Defaults.AdvancedMetrics.SocketPath) config.CheckAndSetDefault(&cfg.AggregationPeriod, config.Defaults.AdvancedMetrics.AggregationPeriod) config.CheckAndSetDefault(&cfg.PublishingPeriod, config.Defaults.AdvancedMetrics.PublishingPeriod) diff --git a/src/plugins/comms.go b/src/plugins/comms.go index 19c642091..f76de6613 100644 --- a/src/plugins/comms.go +++ b/src/plugins/comms.go @@ -14,18 +14,18 @@ import ( ) type Comms struct { - reporter client.MetricReporter - pipeline core.MessagePipeInterface - ctx context.Context - started *atomic.Bool - readyToSend *atomic.Bool + reporter client.MetricReporter + pipeline core.MessagePipeInterface + ctx context.Context + started *atomic.Bool + readyToSend *atomic.Bool } func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ - reporter: reporter, - started: atomic.NewBool(false), - readyToSend: atomic.NewBool(false), + reporter: reporter, + started: atomic.NewBool(false), + readyToSend: atomic.NewBool(false), } } diff --git a/src/plugins/extensions.go b/src/plugins/extensions.go index 542df28ac..5f04e2877 100644 --- a/src/plugins/extensions.go +++ b/src/plugins/extensions.go @@ -41,7 +41,6 @@ func (e *Extensions) Process(msg *core.Message) { case core.EnableExtension: if data == config.AdvancedMetricsKey { if !e.isPluginAlreadyRegistered(advancedMetricsPluginName) { - config.SetAdvancedMetricsDefaults() conf, err := config.GetConfig(e.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index bb30b1f4a..47c8e8dbb 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -280,7 +280,7 @@ func getDataplane() Dataplane { } } -func SetAdvancedMetricsDefaults() AdvancedMetrics { +func getAdvancedMetrics() AdvancedMetrics { return AdvancedMetrics{ SocketPath: Viper.GetString(AdvancedMetricsSocketPath), AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod), diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go index e6076965f..6aab480cc 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go @@ -156,7 +156,7 @@ func NewAdvancedMetrics(env core.Environment, conf *config.Config) *AdvancedMetr TableSizesLimits: conf.AdvancedMetrics.TableSizesLimits, } - checkAdvancedMetricsDefaults(&cfg) + CheckAdvancedMetricsDefaults(&cfg) schema, err := builder.Build() if err != nil { @@ -311,7 +311,7 @@ func (m *AdvancedMetrics) Subscriptions() []string { return []string{} } -func checkAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { +func CheckAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { config.CheckAndSetDefault(&cfg.Address, config.Defaults.AdvancedMetrics.SocketPath) config.CheckAndSetDefault(&cfg.AggregationPeriod, config.Defaults.AdvancedMetrics.AggregationPeriod) config.CheckAndSetDefault(&cfg.PublishingPeriod, config.Defaults.AdvancedMetrics.PublishingPeriod) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index 19c642091..f76de6613 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -14,18 +14,18 @@ import ( ) type Comms struct { - reporter client.MetricReporter - pipeline core.MessagePipeInterface - ctx context.Context - started *atomic.Bool - readyToSend *atomic.Bool + reporter client.MetricReporter + pipeline core.MessagePipeInterface + ctx context.Context + started *atomic.Bool + readyToSend *atomic.Bool } func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ - reporter: reporter, - started: atomic.NewBool(false), - readyToSend: atomic.NewBool(false), + reporter: reporter, + started: atomic.NewBool(false), + readyToSend: atomic.NewBool(false), } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/extensions.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/extensions.go index 542df28ac..5f04e2877 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/extensions.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/extensions.go @@ -41,7 +41,6 @@ func (e *Extensions) Process(msg *core.Message) { case core.EnableExtension: if data == config.AdvancedMetricsKey { if !e.isPluginAlreadyRegistered(advancedMetricsPluginName) { - config.SetAdvancedMetricsDefaults() conf, err := config.GetConfig(e.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) From 41b32cdef3d1017dac7f3337dda2f4218461db68 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 3 Nov 2022 12:21:00 +0000 Subject: [PATCH 13/15] add stop to ticker --- src/plugins/metrics_throlling.go | 1 + .../github.com/nginx/agent/v2/src/plugins/metrics_throlling.go | 1 + 2 files changed, 2 insertions(+) diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 8a5548e68..52a520a41 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -134,6 +134,7 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" + r.ticker.Stop() r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval + reportStaggeringStartTime) r.messagePipeline.Process(core.NewMessage(core.AgentCollectorsUpdate, "")) r.collectorsUpdate.Store(false) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 8a5548e68..52a520a41 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -134,6 +134,7 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" + r.ticker.Stop() r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval + reportStaggeringStartTime) r.messagePipeline.Process(core.NewMessage(core.AgentCollectorsUpdate, "")) r.collectorsUpdate.Store(false) From 6f17837420ff5c83900f48a0a5f6867a8aea29f9 Mon Sep 17 00:00:00 2001 From: "o.omahony" Date: Thu, 3 Nov 2022 12:24:31 +0000 Subject: [PATCH 14/15] fixed failng lint and tests --- src/core/config/config.go | 15 --------------- src/plugins/advanced_metrics_test.go | 4 ++-- .../nginx/agent/v2/src/core/config/config.go | 15 --------------- 3 files changed, 2 insertions(+), 32 deletions(-) diff --git a/src/core/config/config.go b/src/core/config/config.go index 47c8e8dbb..7ad883d09 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -10,7 +10,6 @@ import ( "strings" "time" - advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics" "github.com/spf13/cobra" "github.com/spf13/viper" "gopkg.in/yaml.v3" @@ -280,20 +279,6 @@ func getDataplane() Dataplane { } } -func getAdvancedMetrics() AdvancedMetrics { - return AdvancedMetrics{ - SocketPath: Viper.GetString(AdvancedMetricsSocketPath), - AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod), - PublishingPeriod: Viper.GetDuration(AdvancedMetricsPublishPeriod), - TableSizesLimits: advanced_metrics.TableSizesLimits{ - StagingTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTMS), - StagingTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTT), - PriorityTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTMS), - PriorityTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTT), - }, - } -} - func getNginxAppProtect() NginxAppProtect { return NginxAppProtect{ ReportInterval: Viper.GetDuration(NginxAppProtectReportInterval), diff --git a/src/plugins/advanced_metrics_test.go b/src/plugins/advanced_metrics_test.go index ff9216b4b..87bcbd3b4 100644 --- a/src/plugins/advanced_metrics_test.go +++ b/src/plugins/advanced_metrics_test.go @@ -188,7 +188,7 @@ func TestAppCentricMetric_toMetricReport(t *testing.T) { func TestAppCentricMetricClose(t *testing.T) { env := tutils.GetMockEnv() - pluginUnderTest := NewAdvancedMetrics(env, &config.Config{}) + pluginUnderTest := NewAdvancedMetrics(env, &config.Config{AdvancedMetrics: config.AdvancedMetrics{}}) ctx, cancelCTX := context.WithCancel(context.Background()) defer cancelCTX() @@ -202,6 +202,6 @@ func TestAppCentricMetricClose(t *testing.T) { } func TestAppCentricMetricSubscriptions(t *testing.T) { - pluginUnderTest := NewAdvancedMetrics(tutils.GetMockEnv(), &config.Config{}) + pluginUnderTest := NewAdvancedMetrics(tutils.GetMockEnv(), &config.Config{AdvancedMetrics: config.AdvancedMetrics{}}) assert.Equal(t, []string{}, pluginUnderTest.Subscriptions()) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index 47c8e8dbb..7ad883d09 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -10,7 +10,6 @@ import ( "strings" "time" - advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics" "github.com/spf13/cobra" "github.com/spf13/viper" "gopkg.in/yaml.v3" @@ -280,20 +279,6 @@ func getDataplane() Dataplane { } } -func getAdvancedMetrics() AdvancedMetrics { - return AdvancedMetrics{ - SocketPath: Viper.GetString(AdvancedMetricsSocketPath), - AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod), - PublishingPeriod: Viper.GetDuration(AdvancedMetricsPublishPeriod), - TableSizesLimits: advanced_metrics.TableSizesLimits{ - StagingTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTMS), - StagingTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTT), - PriorityTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTMS), - PriorityTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTT), - }, - } -} - func getNginxAppProtect() NginxAppProtect { return NginxAppProtect{ ReportInterval: Viper.GetDuration(NginxAppProtectReportInterval), From b372fbc3c85d98249ef6550b8cbd4041556f945e Mon Sep 17 00:00:00 2001 From: "o.omahony" Date: Thu, 3 Nov 2022 12:41:10 +0000 Subject: [PATCH 15/15] moved defaults to advanced metrics --- src/plugins/advanced_metrics.go | 26 ++++++++++++++----- .../agent/v2/src/plugins/advanced_metrics.go | 26 ++++++++++++++----- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/plugins/advanced_metrics.go b/src/plugins/advanced_metrics.go index 6aab480cc..da6253767 100644 --- a/src/plugins/advanced_metrics.go +++ b/src/plugins/advanced_metrics.go @@ -88,6 +88,18 @@ var totalOnlyMetrics = map[string]struct{}{ connectionDurationMetric: {}, } +var advancedMetricsDefaults = &config.AdvancedMetrics{ + SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", + AggregationPeriod: time.Second * 10, + PublishingPeriod: time.Second * 30, + TableSizesLimits: advanced_metrics.TableSizesLimits{ + StagingTableThreshold: 1000, + StagingTableMaxSize: 1000, + PriorityTableThreshold: 1000, + PriorityTableMaxSize: 1000, + }, +} + const httpMetricPrefix = "http.request" const streamMetricPrefix = "stream" @@ -312,11 +324,11 @@ func (m *AdvancedMetrics) Subscriptions() []string { } func CheckAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { - config.CheckAndSetDefault(&cfg.Address, config.Defaults.AdvancedMetrics.SocketPath) - config.CheckAndSetDefault(&cfg.AggregationPeriod, config.Defaults.AdvancedMetrics.AggregationPeriod) - config.CheckAndSetDefault(&cfg.PublishingPeriod, config.Defaults.AdvancedMetrics.PublishingPeriod) - config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize) - config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold) - config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize) - config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold) + config.CheckAndSetDefault(&cfg.Address, advancedMetricsDefaults.SocketPath) + config.CheckAndSetDefault(&cfg.AggregationPeriod, advancedMetricsDefaults.AggregationPeriod) + config.CheckAndSetDefault(&cfg.PublishingPeriod, advancedMetricsDefaults.PublishingPeriod) + config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, advancedMetricsDefaults.TableSizesLimits.StagingTableMaxSize) + config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, advancedMetricsDefaults.TableSizesLimits.StagingTableThreshold) + config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, advancedMetricsDefaults.TableSizesLimits.PriorityTableMaxSize) + config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, advancedMetricsDefaults.TableSizesLimits.PriorityTableThreshold) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go index 6aab480cc..da6253767 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go @@ -88,6 +88,18 @@ var totalOnlyMetrics = map[string]struct{}{ connectionDurationMetric: {}, } +var advancedMetricsDefaults = &config.AdvancedMetrics{ + SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", + AggregationPeriod: time.Second * 10, + PublishingPeriod: time.Second * 30, + TableSizesLimits: advanced_metrics.TableSizesLimits{ + StagingTableThreshold: 1000, + StagingTableMaxSize: 1000, + PriorityTableThreshold: 1000, + PriorityTableMaxSize: 1000, + }, +} + const httpMetricPrefix = "http.request" const streamMetricPrefix = "stream" @@ -312,11 +324,11 @@ func (m *AdvancedMetrics) Subscriptions() []string { } func CheckAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { - config.CheckAndSetDefault(&cfg.Address, config.Defaults.AdvancedMetrics.SocketPath) - config.CheckAndSetDefault(&cfg.AggregationPeriod, config.Defaults.AdvancedMetrics.AggregationPeriod) - config.CheckAndSetDefault(&cfg.PublishingPeriod, config.Defaults.AdvancedMetrics.PublishingPeriod) - config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize) - config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold) - config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize) - config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold) + config.CheckAndSetDefault(&cfg.Address, advancedMetricsDefaults.SocketPath) + config.CheckAndSetDefault(&cfg.AggregationPeriod, advancedMetricsDefaults.AggregationPeriod) + config.CheckAndSetDefault(&cfg.PublishingPeriod, advancedMetricsDefaults.PublishingPeriod) + config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, advancedMetricsDefaults.TableSizesLimits.StagingTableMaxSize) + config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, advancedMetricsDefaults.TableSizesLimits.StagingTableThreshold) + config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, advancedMetricsDefaults.TableSizesLimits.PriorityTableMaxSize) + config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, advancedMetricsDefaults.TableSizesLimits.PriorityTableThreshold) }