Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics registry #2071

Merged
merged 3 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
)
Expand Down Expand Up @@ -81,7 +82,9 @@ func TestWithEngine(t *testing.T) {
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
require.NoError(t, err)

rw := httptest.NewRecorder()
Expand Down
5 changes: 4 additions & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
)
Expand All @@ -51,7 +52,9 @@ func TestGetGroups(t *testing.T) {

execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Group: g0}, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
require.NoError(t, err)

t.Run("list", func(t *testing.T) {
Expand Down
9 changes: 7 additions & 2 deletions api/v1/metric_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/stats"
Expand All @@ -45,7 +46,9 @@ func TestGetMetrics(t *testing.T) {
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
Expand Down Expand Up @@ -88,7 +91,9 @@ func TestGetMetric(t *testing.T) {
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
Expand Down
8 changes: 7 additions & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.k6.io/k6/core/local"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/loader"
Expand Down Expand Up @@ -135,14 +136,19 @@ func TestSetupData(t *testing.T) {
}
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
runner, err := js.New(
logger,
&loader.SourceData{URL: &url.URL{Path: "/script.js"}, Data: testCase.script},
nil,
lib.RuntimeOptions{},
builtinMetrics,
registry,
)
require.NoError(t, err)
runner.SetOptions(lib.Options{
Expand All @@ -155,7 +161,7 @@ func TestSetupData(t *testing.T) {
})
execScheduler, err := local.NewExecutionScheduler(runner, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger)
engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger, builtinMetrics)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
9 changes: 7 additions & 2 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
)
Expand All @@ -47,7 +48,9 @@ func TestGetStatus(t *testing.T) {
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
require.NoError(t, err)

rw := httptest.NewRecorder()
Expand Down Expand Up @@ -96,12 +99,14 @@ func TestPatchStatus(t *testing.T) {
"vus": 0, "maxVUs": 10, "duration": "1s"}}`), &scenarios)
require.NoError(t, err)
options := lib.Options{Scenarios: scenarios}
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)

for name, indata := range testdata {
t.Run(name, func(t *testing.T) {
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Options: options}, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger)
engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down
5 changes: 4 additions & 1 deletion cmd/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/loader"
)

Expand Down Expand Up @@ -66,7 +67,9 @@ An archive is a fully self-contained test run, and can be executed identically e
return err
}

r, err := newRunner(logger, src, runType, filesystems, runtimeOptions)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
r, err := newRunner(logger, src, runType, filesystems, runtimeOptions, builtinMetrics, registry)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/loader"
"go.k6.io/k6/ui/pb"
)
Expand Down Expand Up @@ -108,7 +109,9 @@ This will execute the test on the k6 cloud service. Use "k6 login cloud" to auth
}

modifyAndPrintBar(progressBar, pb.WithConstProgress(0, "Getting script options"))
r, err := newRunner(logger, src, runType, filesystems, runtimeOptions)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
r, err := newRunner(logger, src, runType, filesystems, runtimeOptions, builtinMetrics, registry)
if err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/loader"
)

Expand Down Expand Up @@ -61,6 +62,8 @@ func getInspectCmd(logger logrus.FieldLogger) *cobra.Command {
if err != nil {
return err
}
registry := metrics.NewRegistry()
_ = metrics.RegisterBuiltinMetrics(registry)

var (
opts lib.Options
Expand All @@ -73,13 +76,13 @@ func getInspectCmd(logger logrus.FieldLogger) *cobra.Command {
if err != nil {
return err
}
b, err = js.NewBundleFromArchive(logger, arc, runtimeOptions)
b, err = js.NewBundleFromArchive(logger, arc, runtimeOptions, registry)
if err != nil {
return err
}
opts = b.Options
case typeJS:
b, err = js.NewBundle(logger, src, filesystems, runtimeOptions)
b, err = js.NewBundle(logger, src, filesystems, runtimeOptions, registry)
if err != nil {
return err
}
Expand Down
14 changes: 9 additions & 5 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/loader"
"go.k6.io/k6/ui/pb"
)
Expand Down Expand Up @@ -115,7 +116,9 @@ a commandline interface for interacting with it.`,
return err
}

initRunner, err := newRunner(logger, src, runType, filesystems, runtimeOptions)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
initRunner, err := newRunner(logger, src, runType, filesystems, runtimeOptions, builtinMetrics, registry)
if err != nil {
return err
}
Expand Down Expand Up @@ -192,7 +195,7 @@ a commandline interface for interacting with it.`,

// Create the engine.
initBar.Modify(pb.WithConstProgress(0, "Init engine"))
engine, err := core.NewEngine(execScheduler, conf.Options, runtimeOptions, outputs, logger)
engine, err := core.NewEngine(execScheduler, conf.Options, runtimeOptions, outputs, logger, builtinMetrics)
if err != nil {
return err
}
Expand Down Expand Up @@ -386,12 +389,13 @@ func runCmdFlagSet() *pflag.FlagSet {
// Creates a new runner.
func newRunner(
logger *logrus.Logger, src *loader.SourceData, typ string, filesystems map[string]afero.Fs, rtOpts lib.RuntimeOptions,
builtinMetrics *metrics.BuiltinMetrics, registry *metrics.Registry,
Comment on lines 391 to +392
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting a bit crowded 😅 Not saying we should change it now, just making a note. IIRC we wanted to refactor and rename it anyway, so it's not a big deal, we should just do that sooner.

) (runner lib.Runner, err error) {
switch typ {
case "":
runner, err = newRunner(logger, src, detectType(src.Data), filesystems, rtOpts)
runner, err = newRunner(logger, src, detectType(src.Data), filesystems, rtOpts, builtinMetrics, registry)
case typeJS:
runner, err = js.New(logger, src, filesystems, rtOpts)
runner, err = js.New(logger, src, filesystems, rtOpts, builtinMetrics, registry)
case typeArchive:
var arc *lib.Archive
arc, err = lib.ReadArchive(bytes.NewReader(src.Data))
Expand All @@ -400,7 +404,7 @@ func newRunner(
}
switch arc.Type {
case typeJS:
runner, err = js.NewFromArchive(logger, arc, rtOpts)
runner, err = js.NewFromArchive(logger, arc, rtOpts, builtinMetrics, registry)
default:
return nil, fmt.Errorf("archive requests unsupported runner: %s", arc.Type)
}
Expand Down
7 changes: 7 additions & 0 deletions cmd/runtime_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/loader"
)
Expand Down Expand Up @@ -319,12 +320,16 @@ func testRuntimeOptionsCase(t *testing.T, tc runtimeOptionsTestCase) {

fs := afero.NewMemMapFs()
require.NoError(t, afero.WriteFile(fs, "/script.js", jsCode.Bytes(), 0o644))
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
runner, err := newRunner(
testutils.NewLogger(t),
&loader.SourceData{Data: jsCode.Bytes(), URL: &url.URL{Path: "/script.js", Scheme: "file"}},
typeJS,
map[string]afero.Fs{"file": fs},
rtOpts,
builtinMetrics,
registry,
)
require.NoError(t, err)

Expand All @@ -342,6 +347,8 @@ func testRuntimeOptionsCase(t *testing.T, tc runtimeOptionsTestCase) {
typeArchive,
nil,
rtOpts,
builtinMetrics,
registry,
)
}

Expand Down
5 changes: 4 additions & 1 deletion converter/har/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/loader"
)
Expand Down Expand Up @@ -58,10 +59,12 @@ func TestBuildK6RequestObject(t *testing.T) {
}
v, err := buildK6RequestObject(req)
assert.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
_, err = js.New(testutils.NewLogger(t), &loader.SourceData{
URL: &url.URL{Path: "/script.js"},
Data: []byte(fmt.Sprintf("export default function() { res = http.batch([%v]); }", v)),
}, nil, lib.RuntimeOptions{})
}, nil, lib.RuntimeOptions{}, builtinMetrics, registry)
assert.NoError(t, err)
}

Expand Down
15 changes: 11 additions & 4 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type Engine struct {
Metrics map[string]*stats.Metric
MetricsLock sync.Mutex

Samples chan stats.SampleContainer
builtinMetrics *metrics.BuiltinMetrics
Samples chan stats.SampleContainer

// Assigned to metrics upon first received sample.
thresholds map[string]stats.Thresholds
Expand All @@ -80,6 +81,7 @@ type Engine struct {
// NewEngine instantiates a new Engine, without doing any heavy initialization.
func NewEngine(
ex lib.ExecutionScheduler, opts lib.Options, rtOpts lib.RuntimeOptions, outputs []output.Output, logger *logrus.Logger,
builtinMetrics *metrics.BuiltinMetrics,
) (*Engine, error) {
if ex == nil {
return nil, errors.New("missing ExecutionScheduler instance")
Expand All @@ -96,6 +98,7 @@ func NewEngine(
Samples: make(chan stats.SampleContainer, opts.MetricSamplesBufferSize.Int64),
stopChan: make(chan struct{}),
logger: logger.WithField("component", "engine"),
builtinMetrics: builtinMetrics,
}

e.thresholds = opts.Thresholds
Expand Down Expand Up @@ -147,6 +150,10 @@ func (e *Engine) StartOutputs() error {
})
}

if builtinMetricOut, ok := out.(output.WithBuiltinMetrics); ok {
builtinMetricOut.SetBuiltinMetrics(e.builtinMetrics)
}

if err := out.Start(); err != nil {
e.stopOutputs(i)
return err
Expand Down Expand Up @@ -198,7 +205,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
processMetricsAfterRun := make(chan struct{})
runFn := func() error {
e.logger.Debug("Execution scheduler starting...")
err := e.ExecutionScheduler.Run(globalCtx, runSubCtx, e.Samples)
err := e.ExecutionScheduler.Run(globalCtx, runSubCtx, e.Samples, e.builtinMetrics)
e.logger.WithError(err).Debug("Execution scheduler terminated")

select {
Expand Down Expand Up @@ -416,12 +423,12 @@ func (e *Engine) emitMetrics() {
Samples: []stats.Sample{
{
Time: t,
Metric: metrics.VUs,
Metric: e.builtinMetrics.VUs,
Value: float64(executionState.GetCurrentlyActiveVUsCount()),
Tags: e.Options.RunTags,
}, {
Time: t,
Metric: metrics.VUsMax,
Metric: e.builtinMetrics.VUsMax,
Value: float64(executionState.GetInitializedVUsCount()),
Tags: e.Options.RunTags,
},
Expand Down