Skip to content

Commit

Permalink
Add monitoring information about functions to Functionbeat (elastic#1…
Browse files Browse the repository at this point in the history
…4876)

This PR adds a new counter which returns the number of triggered functions of Functionbeat.

Example snippet of the telemetry message containing info about function:

```json
{
  "state": {
    "functionbeat": {
        "functions": {
             "count": 1,
        }
    }
}
```

Follow-up work is required on Kibana side to process the reported information correctly to get telemetry data.
(cherry picked from commit 9ce1c26)
  • Loading branch information
kvch committed Jan 10, 2020
1 parent 0ed1121 commit d80fca4
Show file tree
Hide file tree
Showing 14 changed files with 117 additions and 37 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Functionbeat*

- New options to configure roles and VPC. {pull}11779[11779]
- Export automation templates used to create functions. {pull}11923[11923]
- Configurable Amazon endpoint. {pull}12369[12369]
- Add timeout option to reference configuration. {pull}13351[13351]
- Configurable tags for Lambda functions. {pull}13352[13352]
- Add input for Cloudwatch logs through Kinesis. {pull}13317[13317]
- Enable Logstash output. {pull}13345[13345]
- Make `bulk_max_size` configurable in outputs. {pull}13493[13493]
- Add `index` option to all functions to directly set a per-function index value. {issue}15064[15064] {pull}15101[15101]
- Add monitoring info about triggered functions. {pull}14876[14876]

*Winlogbeat*

Expand Down
28 changes: 17 additions & 11 deletions x-pack/functionbeat/function/beater/functionbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/x-pack/functionbeat/config"
"github.com/elastic/beats/x-pack/functionbeat/function/core"
"github.com/elastic/beats/x-pack/functionbeat/function/provider"
"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
"github.com/elastic/beats/x-pack/libbeat/licenser"
)

Expand All @@ -42,11 +44,12 @@ var (
// - Run on a read only filesystem
// - More execution constraints based on speed and memory usage.
type Functionbeat struct {
ctx context.Context
log *logp.Logger
cancel context.CancelFunc
Provider provider.Provider
Config *config.Config
ctx context.Context
log *logp.Logger
cancel context.CancelFunc
telemetry telemetry.T
Provider provider.Provider
Config *config.Config
}

// New creates an instance of functionbeat.
Expand All @@ -62,12 +65,15 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
}
ctx, cancel := context.WithCancel(context.Background())

telemetryReg := monitoring.GetNamespace("state").GetRegistry().NewRegistry("functionbeat")

bt := &Functionbeat{
ctx: ctx,
cancel: cancel,
log: logp.NewLogger("functionbeat"),
Provider: provider,
Config: c,
ctx: ctx,
log: logp.NewLogger("functionbeat"),
cancel: cancel,
telemetry: telemetry.New(telemetryReg),
Provider: provider,
Config: c,
}
return bt, nil
}
Expand Down Expand Up @@ -120,7 +126,7 @@ func (bt *Functionbeat) Run(b *beat.Beat) error {
// When an error reach the coordinator we assume that we cannot recover from it and we initiate
// a shutdown and return an aggregated errors.
coordinator := core.NewCoordinator(logp.NewLogger("coordinator"), functions...)
err = coordinator.Run(bt.ctx)
err = coordinator.Run(bt.ctx, bt.telemetry)
if err != nil {
return err
}
Expand Down
14 changes: 8 additions & 6 deletions x-pack/functionbeat/function/core/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (
"github.com/joeshaw/multierror"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
)

// Runner is the interface that the coordinator will follow to manage a function goroutine.
type Runner interface {
fmt.Stringer
Run(context.Context) error
Run(context.Context, telemetry.T) error
}

// Coordinator takes care of managing the function goroutine, it receives the list of functions that
Expand All @@ -41,7 +42,7 @@ func NewCoordinator(log *logp.Logger,

// Run starts each functions into an independent goroutine and wait until all the goroutine are
// stopped to exit.
func (r *Coordinator) Run(ctx context.Context) error {
func (r *Coordinator) Run(ctx context.Context, t telemetry.T) error {
r.log.Debug("Coordinator is starting")
defer r.log.Debug("Coordinator is stopped")

Expand All @@ -55,14 +56,14 @@ func (r *Coordinator) Run(ctx context.Context) error {

r.log.Debugf("The coordinator is starting %d functions", len(r.runners))
for _, rfn := range r.runners {
go func(ctx context.Context, rfn Runner) {
go func(ctx context.Context, t telemetry.T, rfn Runner) {
var err error
defer func() { results <- err }()
err = r.runFunc(ctx, rfn)
err = r.runFunc(ctx, t, rfn)
if err != nil {
cancel()
}
}(ctx, rfn)
}(ctx, t, rfn)
}

// Wait for goroutine to complete and aggregate any errors from the goroutine and
Expand All @@ -79,12 +80,13 @@ func (r *Coordinator) Run(ctx context.Context) error {

func (r *Coordinator) runFunc(
ctx context.Context,
t telemetry.T,
rfn Runner,
) error {
r.log.Infof("The function '%s' is starting", rfn.String())
defer r.log.Infof("The function '%s' is stopped", rfn.String())

err := rfn.Run(ctx)
err := rfn.Run(ctx, t)
if err != nil {
r.log.Errorf(
"Nonrecoverable error when executing the function: '%s', error: '%+v', terminating all running functions",
Expand Down
12 changes: 7 additions & 5 deletions x-pack/functionbeat/function/core/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,23 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
)

var errUnhappy = errors.New("unhappy :(")

type happyRunner struct{}

func (hr *happyRunner) Run(ctx context.Context) error {
func (hr *happyRunner) Run(ctx context.Context, _ telemetry.T) error {
<-ctx.Done()
return nil
}
func (hr *happyRunner) String() string { return "happyRunner" }

type unhappyRunner struct{}

func (uhr *unhappyRunner) Run(ctx context.Context) error {
func (uhr *unhappyRunner) Run(ctx context.Context, _ telemetry.T) error {
return errUnhappy
}

Expand All @@ -36,21 +38,21 @@ func TestStart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var err error
go func() {
err = coordinator.Run(ctx)
err = coordinator.Run(ctx, telemetry.Ignored())
assert.NoError(t, err)
}()
cancel()
})

t.Run("on error shutdown all the runner", func(t *testing.T) {
coordinator := NewCoordinator(nil, &happyRunner{}, &unhappyRunner{})
err := coordinator.Run(context.Background())
err := coordinator.Run(context.Background(), telemetry.Ignored())
assert.Error(t, err)
})

t.Run("aggregate all errors", func(t *testing.T) {
coordinator := NewCoordinator(nil, &unhappyRunner{}, &unhappyRunner{})
err := coordinator.Run(context.Background())
err := coordinator.Run(context.Background(), telemetry.Ignored())
assert.Error(t, err)
})
}
7 changes: 4 additions & 3 deletions x-pack/functionbeat/function/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
"github.com/elastic/beats/libbeat/feature"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/function/core"
"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
)

// Create a new pipeline client based on the function configuration.
type clientFactory func(*common.Config) (core.Client, error)

// Function is temporary
type Function interface {
Run(context.Context, core.Client) error
Run(context.Context, core.Client, telemetry.T) error
Name() string
}

Expand All @@ -46,13 +47,13 @@ type Runnable struct {

// Run call the the function's Run method, the method is a specific goroutine, it will block until
// beats shutdown or an error happen.
func (r *Runnable) Run(ctx context.Context) error {
func (r *Runnable) Run(ctx context.Context, t telemetry.T) error {
client, err := r.makeClient(r.config)
if err != nil {
return errors.Wrap(err, "could not create a client for the function")
}
defer client.Close()
return r.function.Run(ctx, client)
return r.function.Run(ctx, client, t)
}

func (r *Runnable) String() string {
Expand Down
9 changes: 5 additions & 4 deletions x-pack/functionbeat/function/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/x-pack/functionbeat/function/core"
"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
)

type simpleFunction struct {
err error
}

func (s *simpleFunction) Run(ctx context.Context, client core.Client) error {
func (s *simpleFunction) Run(ctx context.Context, client core.Client, _ telemetry.T) error {
return s.err
}

Expand All @@ -45,7 +46,7 @@ func TestRunnable(t *testing.T) {
function: &simpleFunction{err: nil},
}

errReceived := runnable.Run(context.Background())
errReceived := runnable.Run(context.Background(), telemetry.Ignored())
assert.Equal(t, err, e.Cause(errReceived))
})

Expand All @@ -57,7 +58,7 @@ func TestRunnable(t *testing.T) {
function: &simpleFunction{err: err},
}

errReceived := runnable.Run(context.Background())
errReceived := runnable.Run(context.Background(), telemetry.Ignored())
assert.Equal(t, err, e.Cause(errReceived))
})

Expand All @@ -68,7 +69,7 @@ func TestRunnable(t *testing.T) {
function: &simpleFunction{err: nil},
}

errReceived := runnable.Run(context.Background())
errReceived := runnable.Run(context.Background(), telemetry.Ignored())
assert.NoError(t, errReceived)
})
}
5 changes: 3 additions & 2 deletions x-pack/functionbeat/function/provider/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/libbeat/feature"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/function/core"
"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
)

type mockProvider struct {
Expand Down Expand Up @@ -48,8 +49,8 @@ type mockFunction struct {
name string
}

func (mf *mockFunction) Run(ctx context.Context, client core.Client) error { return nil }
func (mf *mockFunction) Name() string { return mf.name }
func (mf *mockFunction) Run(ctx context.Context, client core.Client, t telemetry.T) error { return nil }
func (mf *mockFunction) Name() string { return mf.name }

func testProviderLookup(t *testing.T) {
name := "myprovider"
Expand Down
37 changes: 37 additions & 0 deletions x-pack/functionbeat/function/telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package telemetry

import (
"github.com/elastic/beats/libbeat/monitoring"
)

// T is a telemetry instance
type T interface {
AddTriggeredFunction()
}

type telemetry struct {
registry *monitoring.Registry
countFunctions *monitoring.Int
}

// New returns a new telemetry registry.
func New(r *monitoring.Registry) T {
return &telemetry{
registry: r.NewRegistry("functions"),
countFunctions: monitoring.NewInt(r, "count"),
}
}

// Ignored is used when the package is not monitored.
func Ignored() T {
return nil
}

// AddTriggeredFunction adds a triggered function data to the registry.
func (t *telemetry) AddTriggeredFunction() {
t.countFunctions.Inc()
}
6 changes: 5 additions & 1 deletion x-pack/functionbeat/provider/aws/aws/api_gateway_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/function/core"
"github.com/elastic/beats/x-pack/functionbeat/function/provider"
"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
"github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws/transformer"
)

Expand All @@ -44,7 +45,9 @@ func APIGatewayProxyDetails() *feature.Details {
}

// Run starts the lambda function and wait for web triggers.
func (a *APIGatewayProxy) Run(_ context.Context, client core.Client) error {
func (a *APIGatewayProxy) Run(_ context.Context, client core.Client, telemetry telemetry.T) error {
telemetry.AddTriggeredFunction()

lambda.Start(a.createHandler(client))
return nil
}
Expand All @@ -55,6 +58,7 @@ func (a *APIGatewayProxy) createHandler(
return func(request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
a.log.Debugf("The handler receives a new event from the gateway (requestID: %s)", request.RequestContext.RequestID)
event := transformer.APIGatewayProxyRequest(request)

if err := client.Publish(event); err != nil {
a.log.Errorf("could not publish event to the pipeline, error: %+v", err)
return buildResponse(
Expand Down
5 changes: 4 additions & 1 deletion x-pack/functionbeat/provider/aws/aws/cloudwatch_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/function/core"
"github.com/elastic/beats/x-pack/functionbeat/function/provider"
"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
"github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws/transformer"
)

Expand Down Expand Up @@ -69,7 +70,9 @@ func CloudwatchKinesisDetails() *feature.Details {
}

// Run starts the lambda function and wait for web triggers.
func (c *CloudwatchKinesis) Run(_ context.Context, client core.Client) error {
func (c *CloudwatchKinesis) Run(_ context.Context, client core.Client, t telemetry.T) error {
t.AddTriggeredFunction()

lambdarunner.Start(c.createHandler(client))
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion x-pack/functionbeat/provider/aws/aws/cloudwatch_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/function/core"
"github.com/elastic/beats/x-pack/functionbeat/function/provider"
"github.com/elastic/beats/x-pack/functionbeat/function/telemetry"
"github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws/transformer"
)

Expand Down Expand Up @@ -105,7 +106,9 @@ func CloudwatchLogsDetails() *feature.Details {
}

// Run start the AWS lambda handles and will transform any events received to the pipeline.
func (c *CloudwatchLogs) Run(_ context.Context, client core.Client) error {
func (c *CloudwatchLogs) Run(_ context.Context, client core.Client, t telemetry.T) error {
t.AddTriggeredFunction()

lambdarunner.Start(c.createHandler(client))
return nil
}
Expand All @@ -130,6 +133,7 @@ func (c *CloudwatchLogs) createHandler(
)

events := transformer.CloudwatchLogs(parsedEvent)

if err := client.PublishAll(events); err != nil {
c.log.Errorf("Could not publish events to the pipeline, error: %+v", err)
return err
Expand Down
Loading

0 comments on commit d80fca4

Please sign in to comment.