Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions src/core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -73,16 +73,6 @@ func SetDefaults() {
Viper.SetDefault(NginxClientVersion, Defaults.Nginx.NginxClientVersion)
}

func SetAdvancedMetricsDefaults() {
Viper.SetDefault(AdvancedMetricsSocketPath, Defaults.AdvancedMetrics.SocketPath)
Viper.SetDefault(AdvancedMetricsAggregationPeriod, Defaults.AdvancedMetrics.AggregationPeriod)
Viper.SetDefault(AdvancedMetricsPublishPeriod, Defaults.AdvancedMetrics.PublishingPeriod)
Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTMS, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize)
Viper.SetDefault(AdvancedMetricsTableSizesLimitsSTT, Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold)
Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTMS, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize)
Viper.SetDefault(AdvancedMetricsTableSizesLimitsPTT, Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold)
}

func SetNginxAppProtectDefaults() {
Viper.SetDefault(NginxAppProtectReportInterval, Defaults.NginxAppProtect.ReportInterval)
}
Expand Down Expand Up @@ -139,7 +129,6 @@ func RegisterFlags() {
if err := Viper.BindPFlag(strings.ReplaceAll(flag.Name, "-", "_"), fs.Lookup(flag.Name)); err != nil {
return
}
// Viper.BindPFlag(flag.Name, ROOT_COMMAND.Flags().Lookup(flag.Name))
err := Viper.BindEnv(flag.Name)
if err != nil {
log.Warnf("error occurred binding env %s: %v", flag.Name, err)
Expand Down Expand Up @@ -184,7 +173,6 @@ func GetConfig(clientId string) (*Config, error) {
AllowedDirectoriesMap: map[string]struct{}{},
DisplayName: Viper.GetString(DisplayNameKey),
InstanceGroup: Viper.GetString(InstanceGroupKey),
AdvancedMetrics: getAdvancedMetrics(),
NginxAppProtect: getNginxAppProtect(),
NAPMonitoring: getNAPMonitoring(),
}
Expand Down Expand Up @@ -293,20 +281,6 @@ func getDataplane() Dataplane {
}
}

func getAdvancedMetrics() AdvancedMetrics {
return AdvancedMetrics{
SocketPath: Viper.GetString(AdvancedMetricsSocketPath),
AggregationPeriod: Viper.GetDuration(AdvancedMetricsAggregationPeriod),
PublishingPeriod: Viper.GetDuration(AdvancedMetricsPublishPeriod),
TableSizesLimits: advanced_metrics.TableSizesLimits{
StagingTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTMS),
StagingTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsSTT),
PriorityTableMaxSize: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTMS),
PriorityTableThreshold: Viper.GetInt(AdvancedMetricsTableSizesLimitsPTT),
},
}
}

func getNginxAppProtect() NginxAppProtect {
return NginxAppProtect{
ReportInterval: Viper.GetDuration(NginxAppProtectReportInterval),
Expand Down
47 changes: 14 additions & 33 deletions src/core/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/google/uuid"

agent_config "github.com/nginx/agent/sdk/v2/agent/config"
advanced_metrics "github.com/nginx/agent/v2/src/extensions/advanced-metrics/pkg/advanced-metrics"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -69,17 +68,6 @@ var (
CollectionInterval: 15 * time.Second,
Mode: "aggregation",
},
AdvancedMetrics: AdvancedMetrics{
SocketPath: "/var/run/nginx-agent/advanced-metrics.sock",
AggregationPeriod: time.Second * 10,
PublishingPeriod: time.Second * 30,
TableSizesLimits: advanced_metrics.TableSizesLimits{
StagingTableThreshold: 1000,
StagingTableMaxSize: 1000,
PriorityTableThreshold: 1000,
PriorityTableMaxSize: 1000,
},
},
Features: agent_config.GetDefaultFeatures(),
NAPMonitoring: NAPMonitoring{
ProcessorBufferSize: 50000,
Expand Down Expand Up @@ -281,41 +269,34 @@ var (
},
// Advanced Metrics
&StringFlag{
Name: AdvancedMetricsSocketPath,
Usage: "The advanced metrics socket location.",
DefaultValue: Defaults.AdvancedMetrics.SocketPath,
Name: AdvancedMetricsSocketPath,
Usage: "The advanced metrics socket location.",
},
// change to advanced metrics collection interval
&DurationFlag{
Name: AdvancedMetricsAggregationPeriod,
Usage: "Sets the interval, in seconds, at which advanced metrics are collected.",
DefaultValue: Defaults.AdvancedMetrics.AggregationPeriod,
Name: AdvancedMetricsAggregationPeriod,
Usage: "Sets the interval, in seconds, at which advanced metrics are collected.",
},
// change to advanced metrics report interval
&DurationFlag{
Name: AdvancedMetricsPublishPeriod,
Usage: "The polling period specified for a single set of advanced metrics being collected.",
DefaultValue: Defaults.AdvancedMetrics.PublishingPeriod,
Name: AdvancedMetricsPublishPeriod,
Usage: "The polling period specified for a single set of advanced metrics being collected.",
},
&IntFlag{
Name: AdvancedMetricsTableSizesLimitsPTMS,
Usage: "Default Maximum Size of the Priority Table.",
DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize,
Name: AdvancedMetricsTableSizesLimitsPTMS,
Usage: "Default Maximum Size of the Priority Table.",
},
&IntFlag{
Name: AdvancedMetricsTableSizesLimitsPTT,
Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).",
DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold,
Name: AdvancedMetricsTableSizesLimitsPTT,
Usage: "Default Threshold of the Priority Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Priority Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsPTMS).",
},
&IntFlag{
Name: AdvancedMetricsTableSizesLimitsSTMS,
Usage: "Default Maximum Size of the Staging Table.",
DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize,
Name: AdvancedMetricsTableSizesLimitsSTMS,
Usage: "Default Maximum Size of the Staging Table.",
},
&IntFlag{
Name: AdvancedMetricsTableSizesLimitsSTT,
Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).",
DefaultValue: Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold,
Name: AdvancedMetricsTableSizesLimitsSTT,
Usage: "AdvancedMetricsTableSizesLimitsSTT - Default Threshold of the Staging Table - normally a value which is a percentage of the corresponding Default Maximum Size of the Staging Table (<100%, but its value is not an actual percentage, i.e 88%, rather 88%*AdvancedMetricsTableSizesLimitsSTMS).",
},
// TLS Config
&BoolFlag{
Expand Down
2 changes: 1 addition & 1 deletion src/core/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func GetDataplaneNetworks() (res *proto.Network) {

defaultNetworkInterface, err := getDefaultNetworkInterfaceCrossPlatform()
if err != nil {
log.Warnf("Error getting default network interface, %v", err)
log.Debugf("Error getting default network interface, %v", err)
}

if defaultNetworkInterface == "" && len(ifs) > 0 {
Expand Down
30 changes: 21 additions & 9 deletions src/plugins/advanced_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ var totalOnlyMetrics = map[string]struct{}{
connectionDurationMetric: {},
}

var advancedMetricsDefaults = &config.AdvancedMetrics{
SocketPath: "/var/run/nginx-agent/advanced-metrics.sock",
AggregationPeriod: time.Second * 10,
PublishingPeriod: time.Second * 30,
TableSizesLimits: advanced_metrics.TableSizesLimits{
StagingTableThreshold: 1000,
StagingTableMaxSize: 1000,
PriorityTableThreshold: 1000,
PriorityTableMaxSize: 1000,
},
}

const httpMetricPrefix = "http.request"
const streamMetricPrefix = "stream"

Expand Down Expand Up @@ -156,7 +168,7 @@ func NewAdvancedMetrics(env core.Environment, conf *config.Config) *AdvancedMetr
TableSizesLimits: conf.AdvancedMetrics.TableSizesLimits,
}

checkAdvancedMetricsDefaults(&cfg)
CheckAdvancedMetricsDefaults(&cfg)

schema, err := builder.Build()
if err != nil {
Expand Down Expand Up @@ -311,12 +323,12 @@ func (m *AdvancedMetrics) Subscriptions() []string {
return []string{}
}

func checkAdvancedMetricsDefaults(cfg *advanced_metrics.Config) {
config.CheckAndSetDefault(&cfg.Address, config.Defaults.AdvancedMetrics.SocketPath)
config.CheckAndSetDefault(&cfg.AggregationPeriod, config.Defaults.AdvancedMetrics.AggregationPeriod)
config.CheckAndSetDefault(&cfg.PublishingPeriod, config.Defaults.AdvancedMetrics.PublishingPeriod)
config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableMaxSize)
config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.StagingTableThreshold)
config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableMaxSize)
config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, config.Defaults.AdvancedMetrics.TableSizesLimits.PriorityTableThreshold)
func CheckAdvancedMetricsDefaults(cfg *advanced_metrics.Config) {
config.CheckAndSetDefault(&cfg.Address, advancedMetricsDefaults.SocketPath)
config.CheckAndSetDefault(&cfg.AggregationPeriod, advancedMetricsDefaults.AggregationPeriod)
config.CheckAndSetDefault(&cfg.PublishingPeriod, advancedMetricsDefaults.PublishingPeriod)
config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableMaxSize, advancedMetricsDefaults.TableSizesLimits.StagingTableMaxSize)
config.CheckAndSetDefault(&cfg.TableSizesLimits.StagingTableThreshold, advancedMetricsDefaults.TableSizesLimits.StagingTableThreshold)
config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableMaxSize, advancedMetricsDefaults.TableSizesLimits.PriorityTableMaxSize)
config.CheckAndSetDefault(&cfg.TableSizesLimits.PriorityTableThreshold, advancedMetricsDefaults.TableSizesLimits.PriorityTableThreshold)
}
4 changes: 2 additions & 2 deletions src/plugins/advanced_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestAppCentricMetric_toMetricReport(t *testing.T) {

func TestAppCentricMetricClose(t *testing.T) {
env := tutils.GetMockEnv()
pluginUnderTest := NewAdvancedMetrics(env, &config.Config{})
pluginUnderTest := NewAdvancedMetrics(env, &config.Config{AdvancedMetrics: config.AdvancedMetrics{}})

ctx, cancelCTX := context.WithCancel(context.Background())
defer cancelCTX()
Expand All @@ -202,6 +202,6 @@ func TestAppCentricMetricClose(t *testing.T) {
}

func TestAppCentricMetricSubscriptions(t *testing.T) {
pluginUnderTest := NewAdvancedMetrics(tutils.GetMockEnv(), &config.Config{})
pluginUnderTest := NewAdvancedMetrics(tutils.GetMockEnv(), &config.Config{AdvancedMetrics: config.AdvancedMetrics{}})
assert.Equal(t, []string{}, pluginUnderTest.Subscriptions())
}
108 changes: 28 additions & 80 deletions src/plugins/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package plugins
import (
"context"
"strings"
"sync"

log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
Expand All @@ -14,30 +13,19 @@ import (
"github.com/nginx/agent/v2/src/core"
)

const (
DefaultMetricsChanLength = 4 * 1024
DefaultEventsChanLength = 4 * 1024
)

type Comms struct {
reporter client.MetricReporter
pipeline core.MessagePipeInterface
reportChan chan *proto.MetricsReport
reportEventsChan chan *models.EventReport
ctx context.Context
started *atomic.Bool
readyToSend *atomic.Bool
wait sync.WaitGroup
reporter client.MetricReporter
pipeline core.MessagePipeInterface
ctx context.Context
started *atomic.Bool
readyToSend *atomic.Bool
}

func NewComms(reporter client.MetricReporter) *Comms {
return &Comms{
reporter: reporter,
reportChan: make(chan *proto.MetricsReport, DefaultMetricsChanLength),
reportEventsChan: make(chan *models.EventReport, DefaultEventsChanLength),
started: atomic.NewBool(false),
readyToSend: atomic.NewBool(false),
wait: sync.WaitGroup{},
reporter: reporter,
started: atomic.NewBool(false),
readyToSend: atomic.NewBool(false),
}
}

Expand All @@ -49,7 +37,6 @@ func (r *Comms) Init(pipeline core.MessagePipeInterface) {
r.pipeline = pipeline
r.ctx = pipeline.Context()
log.Info("Comms initializing")
go r.reportLoop()
}

func (r *Comms) Close() {
Expand All @@ -75,30 +62,31 @@ func (r *Comms) Process(msg *core.Message) {
return
}
for _, p := range payloads {
if !r.readyToSend.Load() {
continue
}

switch report := p.(type) {
case *proto.MetricsReport:
select {
case <-r.ctx.Done():
err := r.ctx.Err()
if err != nil {
log.Errorf("error in done context Process in comms %v", err)
}
return
case r.reportChan <- report:
// report queued
log.Debug("metrics report queued")
message := client.MessageFromMetrics(report)
err := r.reporter.Send(r.ctx, message)

if err != nil {
log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report)
} else {
log.Tracef("MetricsReport sent, %v", report)
}
case *models.EventReport:
select {
case <-r.ctx.Done():
err := r.ctx.Err()
if err != nil {
log.Errorf("error in done context Process in comms %v", err)
err := r.reporter.Send(r.ctx, client.MessageFromEvents(report))
if err != nil {
l := len(report.Events)
var sb strings.Builder
for i := 0; i < l-1; i++ {
sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID)
sb.WriteString(", ")
}
return
case r.reportEventsChan <- report:
// report queued
log.Debug("events report queued")
sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID)
log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String())
}
}
}
Expand All @@ -108,43 +96,3 @@ func (r *Comms) Process(msg *core.Message) {
func (r *Comms) Subscriptions() []string {
return []string{core.CommMetrics, core.RegistrationCompletedTopic}
}

func (r *Comms) reportLoop() {
r.wait.Add(1)
defer r.wait.Done()
for {
if !r.readyToSend.Load() {
continue
}
select {
case <-r.ctx.Done():
err := r.ctx.Err()
if err != nil {
log.Errorf("error in done context reportLoop %v", err)
}
log.Debug("reporter loop exiting")
return
case report := <-r.reportChan:
err := r.reporter.Send(r.ctx, client.MessageFromMetrics(report))
if err != nil {
log.Errorf("Failed to send MetricsReport: %v, data: %+v", err, report)
} else {
log.Tracef("MetricsReport sent, %v", report)
}
case report := <-r.reportEventsChan:
err := r.reporter.Send(r.ctx, client.MessageFromEvents(report))
if err != nil {
l := len(report.Events)
var sb strings.Builder
for i := 0; i < l-1; i++ {
sb.WriteString(report.Events[i].GetSecurityViolationEvent().SupportID)
sb.WriteString(", ")
}
sb.WriteString(report.Events[l-1].GetSecurityViolationEvent().SupportID)
log.Errorf("Failed to send EventReport with error: %v, supportID list: %s", err, sb.String())
} else {
log.Tracef("EventReport sent, %v", report)
}
}
}
}
5 changes: 4 additions & 1 deletion src/plugins/comms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ func TestCommsSendMetrics(t *testing.T) {

assert.True(t, pluginUnderTest.readyToSend.Load())

metricData := make([]*proto.StatsEntity, 0, 1)
metricData = append(metricData, &proto.StatsEntity{Simplemetrics: []*proto.SimpleMetric{{Name: "Metric A", Value: 5}}})

pluginUnderTest.Process(core.NewMessage(core.CommMetrics, []core.Payload{&proto.MetricsReport{
Meta: &proto.Metadata{Timestamp: types.TimestampNow()},
Type: proto.MetricsReport_INSTANCE,
Data: make([]*proto.StatsEntity, 0, 1),
Data: metricData,
}}))

time.Sleep(1 * time.Second) // for the above call being asynchronous
Expand Down
1 change: 0 additions & 1 deletion src/plugins/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (e *Extensions) Process(msg *core.Message) {
case core.EnableExtension:
if data == config.AdvancedMetricsKey {
if !e.isPluginAlreadyRegistered(advancedMetricsPluginName) {
config.SetAdvancedMetricsDefaults()
conf, err := config.GetConfig(e.conf.ClientID)
if err != nil {
log.Warnf("Unable to get agent config, %v", err)
Expand Down
Loading