diff --git a/src/core/config/config.go b/src/core/config/config.go index 2babeb75f..9ed0b11ae 100644 --- a/src/core/config/config.go +++ b/src/core/config/config.go @@ -11,7 +11,7 @@ import ( "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" - advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics" + "github.com/spf13/cobra" "github.com/spf13/viper" "gopkg.in/yaml.v3" @@ -73,16 +73,6 @@ func SetDefaults() { Viper.SetDefault(NginxClientVersion, Defaults.Nginx.NginxClientVersion) } -func SetAdvancedMetricsDefaults() { - Viper.SetDefault(AdvancedMetricsSocketPath, Defaults.AdvancedMetrics.SocketPath) - Viper.SetDefault(AdvancedMetricsAggregationPeriod, Defaults.AdvancedMetrics.AggregationPeriod) - Viper.SetDefault(AdvancedMetricsPublishPeriod, Defaults.AdvancedMetrics.PublishingPeriod) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTMS, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTT, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTMS, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTT, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold) -} - func SetNginxAppProtectDefaults() { Viper.SetDefault(NginxAppProtectReportInterval, Defaults.NginxAppProtect.ReportInterval) } @@ -139,7 +129,6 @@ func RegisterFlags() { if err := Viper.BindPFlag(strings.ReplaceAll(flag.Name, "-", "_"), fs.Lookup(flag.Name)); err != nil { return } - // Viper.BindPFlag(flag.Name, ROOT_COMMAND.Flags().Lookup(flag.Name)) err := Viper.BindEnv(flag.Name) if err != nil { log.Warnf("error occurred binding env %s: %v", flag.Name, err) @@ -184,7 +173,6 @@ func GetConfig(clientId string) (*Config, error) { AllowedDirectoriesMap: map[string]struct{}{}, DisplayName: Viper.GetString(DisplayNameKey), InstanceGroup: Viper.GetString(InstanceGroupKey), - AdvancedMetrics: getAdvancedMetrics(), NginxAppProtect: getNginxAppProtect(), NAPMonitoring: getNAPMonitoring(), } @@ -293,20 +281,6 @@ func getDataplane() Dataplane { } } -func getAdvancedMetrics() AdvancedMetrics { - return AdvancedMetrics{ - SocketPath: Viper.GetString(AdvancedMetricsSocketPath), - AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod), - PublishingPeriod: Viper.GetDuration(AdvancedMetricsPublishPeriod), - TableSizesLimits: advanced_metrics.TableSizesLimits{ - StagingTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTMS), - StagingTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTT), - PriorityTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTMS), - PriorityTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTT), - }, - } -} - func getNginxAppProtect() NginxAppProtect { return NginxAppProtect{ ReportInterval: Viper.GetDuration(NginxAppProtectReportInterval), diff --git a/src/core/config/defaults.go b/src/core/config/defaults.go index 28f8fc4ca..2cdfc9140 100644 --- a/src/core/config/defaults.go +++ b/src/core/config/defaults.go @@ -7,7 +7,6 @@ import ( "github.com/google/uuid" agent_config "github.com/nginx/agent/sdk/v2/agent/config" - advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics" log "github.com/sirupsen/logrus" ) @@ -69,17 +68,6 @@ var ( CollectionInterval: 15 * time.Second, Mode: "aggregation", }, - AdvancedMetrics: AdvancedMetrics{ - SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", - AggregationPeriod: time.Second * 10, - PublishingPeriod: time.Second * 30, - TableSizesLimits: advanced_metrics.TableSizesLimits{ - StagingTableThreshold: 1000, - StagingTableMaxSize: 1000, - PriorityTableThreshold: 1000, - PriorityTableMaxSize: 1000, - }, - }, Features: agent_config.GetDefaultFeatures(), NAPMonitoring: NAPMonitoring{ ProcessorBufferSize: 50000, @@ -281,41 +269,34 @@ var ( }, // Advanced Metrics &StringFlag{ - Name: AdvancedMetricsSocketPath, - Usage: "The advanced metrics socket location.", - DefaultValue: Defaults.AdvancedMetrics.SocketPath, + Name: AdvancedMetricsSocketPath, + Usage: "The advanced metrics socket location.", }, // change to advanced metrics collection interval &DurationFlag{ - Name: AdvancedMetricsAggregationPeriod, - Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", - DefaultValue: Defaults.AdvancedMetrics.AggregationPeriod, + Name: AdvancedMetricsAggregationPeriod, + Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", }, // change to advanced metrics report interval &DurationFlag{ - Name: AdvancedMetricsPublishPeriod, - Usage: "The polling period specified for a single set of advanced metrics being collected.", - DefaultValue: Defaults.AdvancedMetrics.PublishingPeriod, + Name: AdvancedMetricsPublishPeriod, + Usage: "The polling period specified for a single set of advanced metrics being collected.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTMS, - Usage: "Default Maximum Size of the Priority Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsPTMS, + Usage: "Default Maximum Size of the Priority Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTT, - Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold, + Name: AdvancedMetricsTableSizesLimitsPTT, + Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTMS, - Usage: "Default Maximum Size of the Staging Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsSTMS, + Usage: "Default Maximum Size of the Staging Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTT, - Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold, + Name: AdvancedMetricsTableSizesLimitsSTT, + Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", }, // TLS Config &BoolFlag{ diff --git a/src/core/network/network.go b/src/core/network/network.go index dfcddd06b..af37b330c 100644 --- a/src/core/network/network.go +++ b/src/core/network/network.go @@ -147,7 +147,7 @@ func GetDataplaneNetworks() (res *proto.Network) { defaultNetworkInterface, err := getDefaultNetworkInterfaceCrossPlatform() if err != nil { - log.Warnf("Error getting default network interface, %v", err) + log.Debugf("Error getting default network interface, %v", err) } if defaultNetworkInterface == "" && len(ifs) > 0 { diff --git a/src/plugins/advanced_metrics.go b/src/plugins/advanced_metrics.go index e6076965f..9839b4ea1 100644 --- a/src/plugins/advanced_metrics.go +++ b/src/plugins/advanced_metrics.go @@ -88,6 +88,18 @@ var totalOnlyMetrics = map[string]struct{}{ connectionDurationMetric: {}, } +var advancedMetricsDefaults = &config.AdvancedMetrics{ + SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", + AggregationPeriod: time.Second * 10, + PublishingPeriod: time.Second * 30, + TableSizesLimits: advanced_metrics.TableSizesLimits{ + StagingTableThreshold: 1000, + StagingTableMaxSize: 1000, + PriorityTableThreshold: 1000, + PriorityTableMaxSize: 1000, + }, +} + const httpMetricPrefix = "http.request" const streamMetricPrefix = "stream" @@ -156,7 +168,7 @@ func NewAdvancedMetrics(env core.Environment, conf *config.Config) *AdvancedMetr TableSizesLimits: conf.AdvancedMetrics.TableSizesLimits, } - checkAdvancedMetricsDefaults(&cfg) + CheckAdvancedMetricsDefaults(&cfg) schema, err := builder.Build() if err != nil { @@ -311,12 +323,12 @@ func (m *AdvancedMetrics) Subscriptions() []string { return []string{} } -func checkAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { - config.CheckAndSetDefault(&cfg.Address, config.Defaults.AdvancedMetrics.SocketPath) - config.CheckAndSetDefault(&cfg.AggregationPeriod, config.Defaults.AdvancedMetrics.AggregationPeriod) - config.CheckAndSetDefault(&cfg.PublishingPeriod, config.Defaults.AdvancedMetrics.PublishingPeriod) - config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize) - config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold) - config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize) - config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold) +func CheckAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { + config.CheckAndSetDefault(&cfg.Address, advancedMetricsDefaults.SocketPath) + config.CheckAndSetDefault(&cfg.AggregationPeriod, advancedMetricsDefaults.AggregationPeriod) + config.CheckAndSetDefault(&cfg.PublishingPeriod, advancedMetricsDefaults.PublishingPeriod) + config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, advancedMetricsDefaults.TableSizesLimits.StagingTableMaxSize) + config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, advancedMetricsDefaults.TableSizesLimits.StagingTableThreshold) + config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, advancedMetricsDefaults.TableSizesLimits.PriorityTableMaxSize) + config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, advancedMetricsDefaults.TableSizesLimits.PriorityTableThreshold) } diff --git a/src/plugins/advanced_metrics_test.go b/src/plugins/advanced_metrics_test.go index ff9216b4b..87bcbd3b4 100644 --- a/src/plugins/advanced_metrics_test.go +++ b/src/plugins/advanced_metrics_test.go @@ -188,7 +188,7 @@ func TestAppCentricMetric_toMetricReport(t *testing.T) { func TestAppCentricMetricClose(t *testing.T) { env := tutils.GetMockEnv() - pluginUnderTest := NewAdvancedMetrics(env, &config.Config{}) + pluginUnderTest := NewAdvancedMetrics(env, &config.Config{AdvancedMetrics: config.AdvancedMetrics{}}) ctx, cancelCTX := context.WithCancel(context.Background()) defer cancelCTX() @@ -202,6 +202,6 @@ func TestAppCentricMetricClose(t *testing.T) { } func TestAppCentricMetricSubscriptions(t *testing.T) { - pluginUnderTest := NewAdvancedMetrics(tutils.GetMockEnv(), &config.Config{}) + pluginUnderTest := NewAdvancedMetrics(tutils.GetMockEnv(), &config.Config{AdvancedMetrics: config.AdvancedMetrics{}}) assert.Equal(t, []string{}, pluginUnderTest.Subscriptions()) } diff --git a/src/plugins/comms.go b/src/plugins/comms.go index b0e112079..f76de6613 100644 --- a/src/plugins/comms.go +++ b/src/plugins/comms.go @@ -3,7 +3,6 @@ package plugins import ( "context" "strings" - "sync" log "github.com/sirupsen/logrus" "go.uber.org/atomic" @@ -14,30 +13,19 @@ import ( "github.com/nginx/agent/v2/src/core" ) -const ( - DefaultMetricsChanLength = 4 * 1024 - DefaultEventsChanLength = 4 * 1024 -) - type Comms struct { - reporter client.MetricReporter - pipeline core.MessagePipeInterface - reportChan chan *proto.MetricsReport - reportEventsChan chan *models.EventReport - ctx context.Context - started *atomic.Bool - readyToSend *atomic.Bool - wait sync.WaitGroup + reporter client.MetricReporter + pipeline core.MessagePipeInterface + ctx context.Context + started *atomic.Bool + readyToSend *atomic.Bool } func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ - reporter: reporter, - reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength), - reportEventsChan: make(chan *models.EventReport, DefaultEventsChanLength), - started: atomic.NewBool(false), - readyToSend: atomic.NewBool(false), - wait: sync.WaitGroup{}, + reporter: reporter, + started: atomic.NewBool(false), + readyToSend: atomic.NewBool(false), } } @@ -49,7 +37,6 @@ func (r *Comms) Init(pipeline core.MessagePipeInterface) { r.pipeline = pipeline r.ctx = pipeline.Context() log.Info("Comms initializing") - go r.reportLoop() } func (r *Comms) Close() { @@ -75,30 +62,31 @@ func (r *Comms) Process(msg *core.Message) { return } for _, p := range payloads { + if !r.readyToSend.Load() { + continue + } + switch report := p.(type) { case *proto.MetricsReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context Process in comms %v", err) - } - return - case r.reportChan <- report: - // report queued - log.Debug("metrics report queued") + message := client.MessageFromMetrics(report) + err := r.reporter.Send(r.ctx, message) + + if err != nil { + log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) + } else { + log.Tracef("MetricsReport sent, %v", report) } case *models.EventReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context Process in comms %v", err) + err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) + if err != nil { + l := len(report.Events) + var sb strings.Builder + for i := 0; i < l-1; i++ { + sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID) + sb.WriteString(", ") } - return - case r.reportEventsChan <- report: - // report queued - log.Debug("events report queued") + sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID) + log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String()) } } } @@ -108,43 +96,3 @@ func (r *Comms) Process(msg *core.Message) { func (r *Comms) Subscriptions() []string { return []string{core.CommMetrics, core.RegistrationCompletedTopic} } - -func (r *Comms) reportLoop() { - r.wait.Add(1) - defer r.wait.Done() - for { - if !r.readyToSend.Load() { - continue - } - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context reportLoop %v", err) - } - log.Debug("reporter loop exiting") - return - case report := <-r.reportChan: - err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report)) - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } - case report := <-r.reportEventsChan: - err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) - if err != nil { - l := len(report.Events) - var sb strings.Builder - for i := 0; i < l-1; i++ { - sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID) - sb.WriteString(", ") - } - sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID) - log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String()) - } else { - log.Tracef("EventReport sent, %v", report) - } - } - } -} diff --git a/src/plugins/comms_test.go b/src/plugins/comms_test.go index ddc804194..67843e4af 100644 --- a/src/plugins/comms_test.go +++ b/src/plugins/comms_test.go @@ -49,10 +49,13 @@ func TestCommsSendMetrics(t *testing.T) { assert.True(t, pluginUnderTest.readyToSend.Load()) + metricData := make([]*proto.StatsEntity, 0, 1) + metricData = append(metricData, &proto.StatsEntity{Simplemetrics: []*proto.SimpleMetric{{Name: "Metric A", Value: 5}}}) + pluginUnderTest.Process(core.NewMessage(core.CommMetrics, []core.Payload{&proto.MetricsReport{ Meta: &proto.Metadata{Timestamp: types.TimestampNow()}, Type: proto.MetricsReport_INSTANCE, - Data: make([]*proto.StatsEntity, 0, 1), + Data: metricData, }})) time.Sleep(1 * time.Second) // for the above call being asynchronous diff --git a/src/plugins/extensions.go b/src/plugins/extensions.go index 542df28ac..5f04e2877 100644 --- a/src/plugins/extensions.go +++ b/src/plugins/extensions.go @@ -41,7 +41,6 @@ func (e *Extensions) Process(msg *core.Message) { case core.EnableExtension: if data == config.AdvancedMetricsKey { if !e.isPluginAlreadyRegistered(advancedMetricsPluginName) { - config.SetAdvancedMetricsDefaults() conf, err := config.GetConfig(e.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) diff --git a/src/plugins/metrics_throlling.go b/src/plugins/metrics_throlling.go index 1a6a90843..52a520a41 100644 --- a/src/plugins/metrics_throlling.go +++ b/src/plugins/metrics_throlling.go @@ -26,7 +26,6 @@ type MetricsThrottle struct { reportsReady *atomic.Bool collectorsUpdate *atomic.Bool metricsAggregation bool - firstRun bool metricsCollections metrics.Collections ctx context.Context wg sync.WaitGroup @@ -49,7 +48,6 @@ func NewMetricsThrottle(conf *config.Config, env core.Environment) *MetricsThrot reportsReady: atomic.NewBool(false), collectorsUpdate: atomic.NewBool(false), metricsAggregation: conf.AgentMetrics.Mode == "aggregated", - firstRun: true, metricsCollections: metricsCollections, wg: sync.WaitGroup{}, env: env, @@ -133,15 +131,11 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W r.messagePipeline.Process( core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), ) - if r.firstRun { - // for the first run, we added the staggering time in report cycle, reset it back to regular - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) - r.firstRun = false - } if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) + r.ticker.Stop() + r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval + reportStaggeringStartTime) r.messagePipeline.Process(core.NewMessage(core.AgentCollectorsUpdate, "")) r.collectorsUpdate.Store(false) } diff --git a/src/plugins/nginx.go b/src/plugins/nginx.go index e22d871ce..afe81eb32 100644 --- a/src/plugins/nginx.go +++ b/src/plugins/nginx.go @@ -334,7 +334,7 @@ func (n *Nginx) applyConfig(cmd *proto.Command, cfg *proto.Command_NginxConfig) go n.validateConfig(nginx, cmd.Meta.MessageId, config, configApply) // If the NGINX config can be validated with the validationTimeout the result will be returned straight away. - // This is timeout is temporary to ensure we support backwards compatibility. In a future release this timeout + // This is timeout is temporary to ensure we support backwards compatibility. In a future release this timeout // will be removed. select { case result := <-n.configApplyStatusChannel: diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go index 2babeb75f..9ed0b11ae 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/config.go @@ -11,7 +11,7 @@ import ( "time" agent_config "github.com/nginx/agent/sdk/v2/agent/config" - advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics" + "github.com/spf13/cobra" "github.com/spf13/viper" "gopkg.in/yaml.v3" @@ -73,16 +73,6 @@ func SetDefaults() { Viper.SetDefault(NginxClientVersion, Defaults.Nginx.NginxClientVersion) } -func SetAdvancedMetricsDefaults() { - Viper.SetDefault(AdvancedMetricsSocketPath, Defaults.AdvancedMetrics.SocketPath) - Viper.SetDefault(AdvancedMetricsAggregationPeriod, Defaults.AdvancedMetrics.AggregationPeriod) - Viper.SetDefault(AdvancedMetricsPublishPeriod, Defaults.AdvancedMetrics.PublishingPeriod) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTMS, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTT, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTMS, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize) - Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTT, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold) -} - func SetNginxAppProtectDefaults() { Viper.SetDefault(NginxAppProtectReportInterval, Defaults.NginxAppProtect.ReportInterval) } @@ -139,7 +129,6 @@ func RegisterFlags() { if err := Viper.BindPFlag(strings.ReplaceAll(flag.Name, "-", "_"), fs.Lookup(flag.Name)); err != nil { return } - // Viper.BindPFlag(flag.Name, ROOT_COMMAND.Flags().Lookup(flag.Name)) err := Viper.BindEnv(flag.Name) if err != nil { log.Warnf("error occurred binding env %s: %v", flag.Name, err) @@ -184,7 +173,6 @@ func GetConfig(clientId string) (*Config, error) { AllowedDirectoriesMap: map[string]struct{}{}, DisplayName: Viper.GetString(DisplayNameKey), InstanceGroup: Viper.GetString(InstanceGroupKey), - AdvancedMetrics: getAdvancedMetrics(), NginxAppProtect: getNginxAppProtect(), NAPMonitoring: getNAPMonitoring(), } @@ -293,20 +281,6 @@ func getDataplane() Dataplane { } } -func getAdvancedMetrics() AdvancedMetrics { - return AdvancedMetrics{ - SocketPath: Viper.GetString(AdvancedMetricsSocketPath), - AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod), - PublishingPeriod: Viper.GetDuration(AdvancedMetricsPublishPeriod), - TableSizesLimits: advanced_metrics.TableSizesLimits{ - StagingTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTMS), - StagingTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTT), - PriorityTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTMS), - PriorityTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTT), - }, - } -} - func getNginxAppProtect() NginxAppProtect { return NginxAppProtect{ ReportInterval: Viper.GetDuration(NginxAppProtectReportInterval), diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go index 28f8fc4ca..2cdfc9140 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/config/defaults.go @@ -7,7 +7,6 @@ import ( "github.com/google/uuid" agent_config "github.com/nginx/agent/sdk/v2/agent/config" - advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics" log "github.com/sirupsen/logrus" ) @@ -69,17 +68,6 @@ var ( CollectionInterval: 15 * time.Second, Mode: "aggregation", }, - AdvancedMetrics: AdvancedMetrics{ - SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", - AggregationPeriod: time.Second * 10, - PublishingPeriod: time.Second * 30, - TableSizesLimits: advanced_metrics.TableSizesLimits{ - StagingTableThreshold: 1000, - StagingTableMaxSize: 1000, - PriorityTableThreshold: 1000, - PriorityTableMaxSize: 1000, - }, - }, Features: agent_config.GetDefaultFeatures(), NAPMonitoring: NAPMonitoring{ ProcessorBufferSize: 50000, @@ -281,41 +269,34 @@ var ( }, // Advanced Metrics &StringFlag{ - Name: AdvancedMetricsSocketPath, - Usage: "The advanced metrics socket location.", - DefaultValue: Defaults.AdvancedMetrics.SocketPath, + Name: AdvancedMetricsSocketPath, + Usage: "The advanced metrics socket location.", }, // change to advanced metrics collection interval &DurationFlag{ - Name: AdvancedMetricsAggregationPeriod, - Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", - DefaultValue: Defaults.AdvancedMetrics.AggregationPeriod, + Name: AdvancedMetricsAggregationPeriod, + Usage: "Sets the interval, in seconds, at which advanced metrics are collected.", }, // change to advanced metrics report interval &DurationFlag{ - Name: AdvancedMetricsPublishPeriod, - Usage: "The polling period specified for a single set of advanced metrics being collected.", - DefaultValue: Defaults.AdvancedMetrics.PublishingPeriod, + Name: AdvancedMetricsPublishPeriod, + Usage: "The polling period specified for a single set of advanced metrics being collected.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTMS, - Usage: "Default Maximum Size of the Priority Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsPTMS, + Usage: "Default Maximum Size of the Priority Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsPTT, - Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold, + Name: AdvancedMetricsTableSizesLimitsPTT, + Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTMS, - Usage: "Default Maximum Size of the Staging Table.", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize, + Name: AdvancedMetricsTableSizesLimitsSTMS, + Usage: "Default Maximum Size of the Staging Table.", }, &IntFlag{ - Name: AdvancedMetricsTableSizesLimitsSTT, - Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", - DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold, + Name: AdvancedMetricsTableSizesLimitsSTT, + Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).", }, // TLS Config &BoolFlag{ diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go index dfcddd06b..af37b330c 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/network/network.go @@ -147,7 +147,7 @@ func GetDataplaneNetworks() (res *proto.Network) { defaultNetworkInterface, err := getDefaultNetworkInterfaceCrossPlatform() if err != nil { - log.Warnf("Error getting default network interface, %v", err) + log.Debugf("Error getting default network interface, %v", err) } if defaultNetworkInterface == "" && len(ifs) > 0 { diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go index e6076965f..9839b4ea1 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/advanced_metrics.go @@ -88,6 +88,18 @@ var totalOnlyMetrics = map[string]struct{}{ connectionDurationMetric: {}, } +var advancedMetricsDefaults = &config.AdvancedMetrics{ + SocketPath: "/var/run/nginx-agent/advanced-metrics.sock", + AggregationPeriod: time.Second * 10, + PublishingPeriod: time.Second * 30, + TableSizesLimits: advanced_metrics.TableSizesLimits{ + StagingTableThreshold: 1000, + StagingTableMaxSize: 1000, + PriorityTableThreshold: 1000, + PriorityTableMaxSize: 1000, + }, +} + const httpMetricPrefix = "http.request" const streamMetricPrefix = "stream" @@ -156,7 +168,7 @@ func NewAdvancedMetrics(env core.Environment, conf *config.Config) *AdvancedMetr TableSizesLimits: conf.AdvancedMetrics.TableSizesLimits, } - checkAdvancedMetricsDefaults(&cfg) + CheckAdvancedMetricsDefaults(&cfg) schema, err := builder.Build() if err != nil { @@ -311,12 +323,12 @@ func (m *AdvancedMetrics) Subscriptions() []string { return []string{} } -func checkAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { - config.CheckAndSetDefault(&cfg.Address, config.Defaults.AdvancedMetrics.SocketPath) - config.CheckAndSetDefault(&cfg.AggregationPeriod, config.Defaults.AdvancedMetrics.AggregationPeriod) - config.CheckAndSetDefault(&cfg.PublishingPeriod, config.Defaults.AdvancedMetrics.PublishingPeriod) - config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize) - config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold) - config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize) - config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold) +func CheckAdvancedMetricsDefaults(cfg *advanced_metrics.Config) { + config.CheckAndSetDefault(&cfg.Address, advancedMetricsDefaults.SocketPath) + config.CheckAndSetDefault(&cfg.AggregationPeriod, advancedMetricsDefaults.AggregationPeriod) + config.CheckAndSetDefault(&cfg.PublishingPeriod, advancedMetricsDefaults.PublishingPeriod) + config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, advancedMetricsDefaults.TableSizesLimits.StagingTableMaxSize) + config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, advancedMetricsDefaults.TableSizesLimits.StagingTableThreshold) + config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, advancedMetricsDefaults.TableSizesLimits.PriorityTableMaxSize) + config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, advancedMetricsDefaults.TableSizesLimits.PriorityTableThreshold) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go index b0e112079..f76de6613 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/comms.go @@ -3,7 +3,6 @@ package plugins import ( "context" "strings" - "sync" log "github.com/sirupsen/logrus" "go.uber.org/atomic" @@ -14,30 +13,19 @@ import ( "github.com/nginx/agent/v2/src/core" ) -const ( - DefaultMetricsChanLength = 4 * 1024 - DefaultEventsChanLength = 4 * 1024 -) - type Comms struct { - reporter client.MetricReporter - pipeline core.MessagePipeInterface - reportChan chan *proto.MetricsReport - reportEventsChan chan *models.EventReport - ctx context.Context - started *atomic.Bool - readyToSend *atomic.Bool - wait sync.WaitGroup + reporter client.MetricReporter + pipeline core.MessagePipeInterface + ctx context.Context + started *atomic.Bool + readyToSend *atomic.Bool } func NewComms(reporter client.MetricReporter) *Comms { return &Comms{ - reporter: reporter, - reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength), - reportEventsChan: make(chan *models.EventReport, DefaultEventsChanLength), - started: atomic.NewBool(false), - readyToSend: atomic.NewBool(false), - wait: sync.WaitGroup{}, + reporter: reporter, + started: atomic.NewBool(false), + readyToSend: atomic.NewBool(false), } } @@ -49,7 +37,6 @@ func (r *Comms) Init(pipeline core.MessagePipeInterface) { r.pipeline = pipeline r.ctx = pipeline.Context() log.Info("Comms initializing") - go r.reportLoop() } func (r *Comms) Close() { @@ -75,30 +62,31 @@ func (r *Comms) Process(msg *core.Message) { return } for _, p := range payloads { + if !r.readyToSend.Load() { + continue + } + switch report := p.(type) { case *proto.MetricsReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context Process in comms %v", err) - } - return - case r.reportChan <- report: - // report queued - log.Debug("metrics report queued") + message := client.MessageFromMetrics(report) + err := r.reporter.Send(r.ctx, message) + + if err != nil { + log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) + } else { + log.Tracef("MetricsReport sent, %v", report) } case *models.EventReport: - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context Process in comms %v", err) + err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) + if err != nil { + l := len(report.Events) + var sb strings.Builder + for i := 0; i < l-1; i++ { + sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID) + sb.WriteString(", ") } - return - case r.reportEventsChan <- report: - // report queued - log.Debug("events report queued") + sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID) + log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String()) } } } @@ -108,43 +96,3 @@ func (r *Comms) Process(msg *core.Message) { func (r *Comms) Subscriptions() []string { return []string{core.CommMetrics, core.RegistrationCompletedTopic} } - -func (r *Comms) reportLoop() { - r.wait.Add(1) - defer r.wait.Done() - for { - if !r.readyToSend.Load() { - continue - } - select { - case <-r.ctx.Done(): - err := r.ctx.Err() - if err != nil { - log.Errorf("error in done context reportLoop %v", err) - } - log.Debug("reporter loop exiting") - return - case report := <-r.reportChan: - err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report)) - if err != nil { - log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report) - } else { - log.Tracef("MetricsReport sent, %v", report) - } - case report := <-r.reportEventsChan: - err := r.reporter.Send(r.ctx, client.MessageFromEvents(report)) - if err != nil { - l := len(report.Events) - var sb strings.Builder - for i := 0; i < l-1; i++ { - sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID) - sb.WriteString(", ") - } - sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID) - log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String()) - } else { - log.Tracef("EventReport sent, %v", report) - } - } - } -} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/extensions.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/extensions.go index 542df28ac..5f04e2877 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/extensions.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/extensions.go @@ -41,7 +41,6 @@ func (e *Extensions) Process(msg *core.Message) { case core.EnableExtension: if data == config.AdvancedMetricsKey { if !e.isPluginAlreadyRegistered(advancedMetricsPluginName) { - config.SetAdvancedMetricsDefaults() conf, err := config.GetConfig(e.conf.ClientID) if err != nil { log.Warnf("Unable to get agent config, %v", err) diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go index 1a6a90843..52a520a41 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/metrics_throlling.go @@ -26,7 +26,6 @@ type MetricsThrottle struct { reportsReady *atomic.Bool collectorsUpdate *atomic.Bool metricsAggregation bool - firstRun bool metricsCollections metrics.Collections ctx context.Context wg sync.WaitGroup @@ -49,7 +48,6 @@ func NewMetricsThrottle(conf *config.Config, env core.Environment) *MetricsThrot reportsReady: atomic.NewBool(false), collectorsUpdate: atomic.NewBool(false), metricsAggregation: conf.AgentMetrics.Mode == "aggregated", - firstRun: true, metricsCollections: metricsCollections, wg: sync.WaitGroup{}, env: env, @@ -133,15 +131,11 @@ func (r *MetricsThrottle) metricsReportGoroutine(ctx context.Context, wg *sync.W r.messagePipeline.Process( core.NewMessage(core.CommMetrics, []core.Payload{aggregatedReport}), ) - if r.firstRun { - // for the first run, we added the staggering time in report cycle, reset it back to regular - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) - r.firstRun = false - } if r.collectorsUpdate.Load() { r.BulkSize = r.conf.AgentMetrics.BulkSize r.metricsAggregation = r.conf.AgentMetrics.Mode == "aggregated" - r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval) + r.ticker.Stop() + r.ticker = time.NewTicker(r.conf.AgentMetrics.ReportInterval + reportStaggeringStartTime) r.messagePipeline.Process(core.NewMessage(core.AgentCollectorsUpdate, "")) r.collectorsUpdate.Store(false) } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go index e22d871ce..afe81eb32 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/plugins/nginx.go @@ -334,7 +334,7 @@ func (n *Nginx) applyConfig(cmd *proto.Command, cfg *proto.Command_NginxConfig) go n.validateConfig(nginx, cmd.Meta.MessageId, config, configApply) // If the NGINX config can be validated with the validationTimeout the result will be returned straight away. - // This is timeout is temporary to ensure we support backwards compatibility. In a future release this timeout + // This is timeout is temporary to ensure we support backwards compatibility. In a future release this timeout // will be removed. select { case result := <-n.configApplyStatusChannel: