Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ test-docker-component: ## Run integration tests in docker
for container in `docker ps -aqf "name=^nginx-agent_"`; do echo && docker ps -f "id=$$container" --format "{{.Image}}" && docker exec $$container ./tmp/component.test -test.v; done

test-component-run: ## Run component tests
GOWORK=off CGO_ENABLED=0 go test -v ./test/component
GOWORK=off CGO_ENABLED=0 go test -v ./test/component/...

# Performance tests
performance-test: ## Run performance tests
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/fsnotify/fsnotify v1.5.4
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.8
github.com/google/uuid v1.3.0
github.com/klauspost/cpuid/v2 v2.1.0
Expand All @@ -32,6 +33,7 @@ require (
google.golang.org/grpc v1.48.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
google.golang.org/protobuf v1.28.1
gopkg.in/mcuadros/go-syslog.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -43,7 +45,6 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/mcuadros/go-syslog.v2 v2.3.0 h1:kcsiS+WsTKyIEPABJBJtoG0KkOS6yzvJ+/eZlhD79kk=
gopkg.in/mcuadros/go-syslog.v2 v2.3.0/go.mod h1:l5LPIyOOyIdQquNg+oU6Z3524YwrcqEm0aKH+5zpt2U=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qS
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/mcuadros/go-syslog.v2 v2.3.0/go.mod h1:l5LPIyOOyIdQquNg+oU6Z3524YwrcqEm0aKH+5zpt2U=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
Expand Down
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ func loadPlugins(commander client.Commander, binary *core.NginxBinaryType, env *
}
}

if loadedConfig.NAPMonitoring != (config.NAPMonitoring{}) {
nm, err := plugins.NewNAPMonitoring(env, loadedConfig)
if err != nil {
log.Errorf("Unable to load the Nginx App Protect Monitoring plugin due to the following error: %v", err)
} else {
corePlugins = append(corePlugins, nm)
}
}

return corePlugins
}

Expand Down
1 change: 1 addition & 0 deletions sdk/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Mgoogle/protobuf/field_mask.proto=github.com/gogo/protobuf/types:\
done

protoc \
-I ./proto/events \
-I ./proto \
-I /usr/local/include \
-I ./vendor/github.com/gogo/protobuf/gogoproto \
Expand Down
1 change: 1 addition & 0 deletions sdk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type MsgClassification int
const (
MsgClassificationCommand MsgClassification = iota
MsgClassificationMetric
MsgClassificationEvent
)

var (
Expand Down
11 changes: 11 additions & 0 deletions sdk/client/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"github.com/nginx/agent/sdk/v2/proto"
models "github.com/nginx/agent/sdk/v2/proto/events"
)

func MessageFromCommand(cmd *proto.Command) Message {
Expand All @@ -18,10 +19,18 @@ func MessageFromMetrics(metric *proto.MetricsReport) Message {
}
}

func MessageFromEvents(event *models.EventReport) Message {
return &msg{
msgType: MsgClassificationEvent,
event: event,
}
}

type msg struct {
msgType MsgClassification
cmd *proto.Command
metric *proto.MetricsReport
event *models.EventReport
}

func (m *msg) Meta() *proto.Metadata {
Expand Down Expand Up @@ -67,6 +76,8 @@ func (m *msg) Raw() interface{} {
return m.cmd
case MsgClassificationMetric:
return m.metric
case MsgClassificationEvent:
return m.event
}

return nil
Expand Down
63 changes: 45 additions & 18 deletions sdk/client/metric_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"io"
"sync"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"

"github.com/nginx/agent/sdk/v2"
sdkGRPC "github.com/nginx/agent/sdk/v2/grpc"
"github.com/nginx/agent/sdk/v2/interceptors"
"github.com/nginx/agent/sdk/v2/proto"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
events "github.com/nginx/agent/sdk/v2/proto/events"
)

func NewMetricReporterClient() MetricReporter {
Expand All @@ -26,6 +28,7 @@ type metricReporter struct {
*connector
client proto.MetricsServiceClient
channel proto.MetricsService_StreamClient
eventsChannel proto.MetricsService_StreamEventsClient
ctx context.Context
mu sync.Mutex
backoffSettings BackoffSettings
Expand Down Expand Up @@ -90,7 +93,16 @@ func (r *metricReporter) createClient() error {
log.Infof("Metric reporter retrying to connect to %s", r.grpc.Target())
return err
}

eventsChannel, err := r.client.StreamEvents(r.ctx)
if err != nil {
log.Warnf("Unable to create events channel: %s", err)
log.Infof("Metric reporter retrying to connect to %s", r.grpc.Target())
return err
}

r.channel = channel
r.eventsChannel = eventsChannel

return nil
}
Expand Down Expand Up @@ -125,29 +137,40 @@ func (r *metricReporter) WithBackoffSettings(backoffSettings BackoffSettings) Cl
}

func (r *metricReporter) Send(ctx context.Context, message Message) error {
var (
report *proto.MetricsReport
ok bool
)
var err error

switch message.Classification() {
case MsgClassificationMetric:
if report, ok = message.Raw().(*proto.MetricsReport); !ok {
report, ok := message.Raw().(*proto.MetricsReport)
if !ok {
return fmt.Errorf("MetricReporter expected a metrics report message, but received %T", message.Data())
}
default:
return fmt.Errorf("MetricReporter expected a metrics report message, but received %T", message.Data())
}

err := sdk.WaitUntil(r.ctx, r.backoffSettings.initialInterval, r.backoffSettings.maxInterval, r.backoffSettings.sendMaxTimeout, func() error {
if err := r.channel.Send(report); err != nil {
return r.handleGrpcError("Metric Reporter Channel Send", err)
err = sdk.WaitUntil(r.ctx, r.backoffSettings.initialInterval, r.backoffSettings.maxInterval, r.backoffSettings.sendMaxTimeout, func() error {
if err := r.channel.Send(report); err != nil {
return r.handleGrpcError("Metric Reporter Channel Send", err)
}

log.Tracef("MetricReporter sent metrics report %v", report)

return nil
})
case MsgClassificationEvent:
report, ok := message.Raw().(*events.EventReport)
if !ok {
return fmt.Errorf("MetricReporter expected an events report message, but received %T", message.Data())
}
err = sdk.WaitUntil(r.ctx, r.backoffSettings.initialInterval, r.backoffSettings.maxInterval, r.backoffSettings.sendMaxTimeout, func() error {
if err := r.eventsChannel.Send(report); err != nil {
return r.handleGrpcError("Metric Reporter Events Channel Send", err)
}

log.Tracef("MetricReporter sent report %v", report)
log.Tracef("MetricReporter sent events report %v", report)

return nil
})
return nil
})
default:
return fmt.Errorf("MetricReporter expected a metrics or events report message, but received %T", message.Data())
}

return err
}
Expand All @@ -157,6 +180,10 @@ func (r *metricReporter) closeConnection() error {
if err != nil {
return err
}
err = r.eventsChannel.CloseSend()
if err != nil {
return err
}
return r.grpc.Close()
}

Expand Down
59 changes: 55 additions & 4 deletions sdk/client/metric_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package client

import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"

"github.com/nginx/agent/sdk/v2/proto"
f5_nginx_agent_sdk_events "github.com/nginx/agent/sdk/v2/proto/events"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -152,9 +153,16 @@ type metricReporterHandler struct {
metricReportStream chan *proto.MetricsReport
}

type eventReporterHandlerFunc func(proto.MetricsService_StreamEventsServer, *sync.WaitGroup)
type eventReporterHandler struct {
streamEventsHandleFunc eventReporterHandlerFunc
eventReportStream chan *f5_nginx_agent_sdk_events.EventReport
}

type mockMetricReporterService struct {
sync.RWMutex
metricReporterHandler *metricReporterHandler
eventReporterHandler *eventReporterHandler
}

func (c *mockMetricReporterService) Stream(stream proto.MetricsService_StreamServer) error {
Expand All @@ -174,6 +182,23 @@ func (c *mockMetricReporterService) Stream(stream proto.MetricsService_StreamSer
return nil
}

func (c *mockMetricReporterService) StreamEvents(stream proto.MetricsService_StreamEventsServer) error {
wg := &sync.WaitGroup{}
h := c.ensureEventReporterHandler()
wg.Add(1)

streamEventsHandleFunc := h.streamEventsHandleFunc
if streamEventsHandleFunc == nil {
streamEventsHandleFunc = h.streamEventsHandle
}

go streamEventsHandleFunc(stream, wg)

wg.Wait()

return nil
}

func (c *mockMetricReporterService) ensureMetricReporterHandler() *metricReporterHandler {
c.RLock()
if c.metricReporterHandler == nil {
Expand All @@ -188,18 +213,44 @@ func (c *mockMetricReporterService) ensureMetricReporterHandler() *metricReporte
return c.metricReporterHandler
}

func (c *mockMetricReporterService) ensureEventReporterHandler() *eventReporterHandler {
c.RLock()
if c.eventReporterHandler == nil {
c.RUnlock()
c.Lock()
defer c.Unlock()
c.eventReporterHandler = &eventReporterHandler{}
c.eventReporterHandler.eventReportStream = make(chan *f5_nginx_agent_sdk_events.EventReport)
return c.eventReporterHandler
}
defer c.RUnlock()
return c.eventReporterHandler
}

func (h *metricReporterHandler) streamHandle(server proto.MetricsService_StreamServer, wg *sync.WaitGroup) {
for {
cmd, err := server.Recv()
fmt.Printf("Recv Metric Report: %v\n", cmd)
log.Debugf("Recv Metric Report: %v\n", cmd)
if err != nil {
fmt.Printf("Recv Metric Report: %v\n", err)
log.Debugf("Recv Metric Report: %v\n", err)
return
}
h.metricReportStream <- cmd
}
}

func (h *eventReporterHandler) streamEventsHandle(server proto.MetricsService_StreamEventsServer, wg *sync.WaitGroup) {
for {
cmd, err := server.Recv()
log.Debugf("Recv Event Report: %v\n", cmd)
if err != nil {
log.Debugf("Recv Event Report: %v\n", err)
return
}
h.eventReportStream <- cmd
}
}

func startMetricReporterMockServer() (*grpc.Server, *mockMetricReporterService, func(context.Context, string) (net.Conn, error)) {
listener := bufconn.Listen(1024 * 1024)
grpcServer := grpc.NewServer(sdkGRPC.DefaultServerDialOptions...)
Expand All @@ -210,7 +261,7 @@ func startMetricReporterMockServer() (*grpc.Server, *mockMetricReporterService,

go func() {
if err := grpcServer.Serve(listener); err != nil {
fmt.Printf("Error starting mock GRPC server: %v\n", err)
log.Errorf("Error starting mock GRPC server: %v\n", err)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion sdk/config_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
SDKfiles "github.com/nginx/agent/sdk/v2/files"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/sdk/v2/zip"
"github.com/nginxinc/nginx-go-crossplane"
crossplane "github.com/nginxinc/nginx-go-crossplane"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down
26 changes: 24 additions & 2 deletions sdk/examples/services/metrics_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"sync"

"github.com/nginx/agent/sdk/v2/proto"
f5_nginx_agent_sdk_events "github.com/nginx/agent/sdk/v2/proto/events"
log "github.com/sirupsen/logrus"
)

type MetricsGrpcService struct {
sync.RWMutex
fromClient chan *proto.MetricsReport
reports []*proto.MetricsReport
fromClient chan *proto.MetricsReport
reports []*proto.MetricsReport
eventReports []*f5_nginx_agent_sdk_events.EventReport
}

func NewMetricsService() *MetricsGrpcService {
Expand All @@ -36,6 +38,26 @@ func (grpcService *MetricsGrpcService) Stream(stream proto.MetricsService_Stream
return nil
}

func (grpcService *MetricsGrpcService) StreamEvents(stream proto.MetricsService_StreamEventsServer) error {
log.Trace("Event Report Channel")

for {
report, err := stream.Recv()
if err != nil {
// recommend handling error
log.Debugf("Error in recvHandle %v", err)
break
}
log.Info("Got metrics")
grpcService.eventReports = append(grpcService.eventReports, report)
}
return nil
}

func (grpcService *MetricsGrpcService) GetMetrics() []*proto.MetricsReport {
return grpcService.reports
}

func (grpcService *MetricsGrpcService) GetEventReports() []*f5_nginx_agent_sdk_events.EventReport {
return grpcService.eventReports
}
Loading