From a6bdeb792c732ef24c380e42c81f39c404d5b49e Mon Sep 17 00:00:00 2001 From: Simone Basso Date: Fri, 7 Jun 2024 08:45:10 +0200 Subject: [PATCH] x --- pkg/oonimkall/taskrunner.go | 116 +++++------- pkg/oonimkall/taskrunner_test.go | 297 ++++++++----------------------- 2 files changed, 118 insertions(+), 295 deletions(-) diff --git a/pkg/oonimkall/taskrunner.go b/pkg/oonimkall/taskrunner.go index 000532e25..c53b6458d 100644 --- a/pkg/oonimkall/taskrunner.go +++ b/pkg/oonimkall/taskrunner.go @@ -11,7 +11,6 @@ import ( "github.com/ooni/probe-cli/v3/internal/kvstore" "github.com/ooni/probe-cli/v3/internal/model" "github.com/ooni/probe-cli/v3/internal/runtimex" - "github.com/ooni/probe-cli/v3/internal/targetloading" ) // runnerForTask runs a specific task @@ -180,47 +179,23 @@ func (r *runnerForTask) Run(rootCtx context.Context) { builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter}) - // TODO(bassosimone): replace the following code with an - // invocation of the targetloading.Loader. Since I am making these - // changes before a release and I've already changed the - // code a lot, I'd rather avoid changing it even more, - // for the following reason: - // - // If we add and call targetloading.Loader here, this code will - // magically invoke check-in for InputOrQueryBackend, - // which we need to make sure the app can handle. This is - // the main reason why now I don't fill like properly - // fixing this code and use targetloading.Loader: too much work - // in too little time, so mistakes more likely. - // - // In fact, our current app assumes that it's its - // responsibility to load the inputs, not oonimkall's. - switch builder.InputPolicy() { - case model.InputOrQueryBackend, model.InputStrictlyRequired: - if len(r.settings.Inputs) <= 0 { - r.emitter.EmitFailureStartup("no input provided") - return - } - case model.InputOrStaticDefault: - if len(r.settings.Inputs) <= 0 { - inputs, err := targetloading.StaticBareInputForExperiment(r.settings.Name) - if err != nil { - r.emitter.EmitFailureStartup("no default static input for this experiment") - return - } - r.settings.Inputs = inputs - } - case model.InputOptional: - if len(r.settings.Inputs) <= 0 { - r.settings.Inputs = append(r.settings.Inputs, "") - } - default: // treat this case as engine.InputNone. - if len(r.settings.Inputs) > 0 { - r.emitter.EmitFailureStartup("experiment does not accept input") - return - } - r.settings.Inputs = append(r.settings.Inputs, "") + // Load targets. Note that, for Web Connectivity, the mobile app has + // already loaded inputs and provides them as r.settings.Inputs. + loader := builder.NewTargetLoader(&model.ExperimentTargetLoaderConfig{ + CheckInConfig: &model.OOAPICheckInConfig{ + // Not needed since the app already provides the + // inputs to use for Web Connectivity. + }, + Session: sess, + StaticInputs: r.settings.Inputs, + SourceFiles: []string{}, + }) + targets, err := loader.Load(rootCtx) + if err != nil { + r.emitter.EmitFailureStartup(err.Error()) + return } + experiment := builder.NewExperiment() defer func() { endEvent.DownloadedKB = experiment.KibiBytesReceived() @@ -241,48 +216,43 @@ func (r *runnerForTask) Run(rootCtx context.Context) { defer measCancel() submitCtx, submitCancel := context.WithCancel(rootCtx) defer submitCancel() + // This deviates a little bit from measurement-kit, for which // a zero timeout is actually valid. Since it does not make much // sense, here we're changing the behaviour. // // See https://github.com/measurement-kit/measurement-kit/issues/1922 - if r.settings.Options.MaxRuntime > 0 { - // We want to honour max_runtime only when we're running an - // experiment that clearly wants specific input. We could refine - // this policy in the future, but for now this covers in a - // reasonable way web connectivity, so we should be ok. - switch builder.InputPolicy() { - case model.InputOrQueryBackend, model.InputStrictlyRequired: - var ( - cancelMeas context.CancelFunc - cancelSubmit context.CancelFunc - ) - // We give the context used for submitting extra time so that - // it's possible to submit the last measurement. - // - // See https://github.com/ooni/probe/issues/2037 for more info. - maxRuntime := time.Duration(r.settings.Options.MaxRuntime) * time.Second - measCtx, cancelMeas = context.WithTimeout(measCtx, maxRuntime) - defer cancelMeas() - maxRuntime += 30 * time.Second - submitCtx, cancelSubmit = context.WithTimeout(submitCtx, maxRuntime) - defer cancelSubmit() - } + if r.settings.Options.MaxRuntime > 0 && len(targets) > 1 { + var ( + cancelMeas context.CancelFunc + cancelSubmit context.CancelFunc + ) + // We give the context used for submitting extra time so that + // it's possible to submit the last measurement. + // + // See https://github.com/ooni/probe/issues/2037 for more info. + maxRuntime := time.Duration(r.settings.Options.MaxRuntime) * time.Second + measCtx, cancelMeas = context.WithTimeout(measCtx, maxRuntime) + defer cancelMeas() + maxRuntime += 30 * time.Second + submitCtx, cancelSubmit = context.WithTimeout(submitCtx, maxRuntime) + defer cancelSubmit() } - inputCount := len(r.settings.Inputs) + + inputCount := len(targets) start := time.Now() inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10 eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second) - for idx, input := range r.settings.Inputs { + for idx, target := range targets { if measCtx.Err() != nil { break } logger.Infof("Starting measurement with index %d", idx) r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{ Idx: int64(idx), - Input: input, + Input: target.Input(), }) - if input != "" && inputCount > 0 { + if target.Input() != "" && inputCount > 0 { var percentage float64 if r.settings.Options.MaxRuntime > 0 { now := time.Now() @@ -291,7 +261,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) { percentage = (float64(idx)/float64(inputCount))*0.6 + 0.4 } r.emitter.EmitStatusProgress(percentage, fmt.Sprintf( - "processing %s", input, + "processing %s", target, )) } @@ -305,7 +275,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) { // we can always do, since it only has string accessors. m, err := experiment.MeasureWithContext( r.contextForExperiment(measCtx, builder), - model.NewOOAPIURLInfoWithDefaultCategoryAndCountry(input), + target, ) if builder.Interruptible() && measCtx.Err() != nil { @@ -317,7 +287,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) { r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{ Failure: err.Error(), Idx: int64(idx), - Input: input, + Input: target.Input(), }) // Historical note: here we used to fallthrough but, since we have // implemented async measurements, the case where there is an error @@ -330,7 +300,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) { runtimex.PanicOnError(err, "measurement.MarshalJSON failed") r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{ Idx: int64(idx), - Input: input, + Input: target.Input(), JSONStr: string(data), }) if !r.settings.Options.NoCollector { @@ -339,14 +309,14 @@ func (r *runnerForTask) Run(rootCtx context.Context) { warnOnFailure(logger, "cannot submit measurement", err) r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{ Idx: int64(idx), - Input: input, + Input: target.Input(), JSONStr: string(data), Failure: measurementSubmissionFailure(err), }) } r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{ Idx: int64(idx), - Input: input, + Input: target.Input(), }) } } diff --git a/pkg/oonimkall/taskrunner_test.go b/pkg/oonimkall/taskrunner_test.go index 4d503f6f5..920b7b71a 100644 --- a/pkg/oonimkall/taskrunner_test.go +++ b/pkg/oonimkall/taskrunner_test.go @@ -10,7 +10,6 @@ import ( "github.com/ooni/probe-cli/v3/internal/engine" "github.com/ooni/probe-cli/v3/internal/mocks" "github.com/ooni/probe-cli/v3/internal/model" - "github.com/ooni/probe-cli/v3/internal/targetloading" ) func TestMeasurementSubmissionEventName(t *testing.T) { @@ -173,9 +172,10 @@ func TestTaskRunnerRun(t *testing.T) { // reduceEventsKeysIgnoreLog reduces the list of event keys // counting equal subsequent keys and ignoring log events - reduceEventsKeysIgnoreLog := func(events []*event) (out []eventKeyCount) { + reduceEventsKeysIgnoreLog := func(t *testing.T, events []*event) (out []eventKeyCount) { var current eventKeyCount for _, ev := range events { + t.Log(ev) if ev.Key == eventTypeLog { continue } @@ -195,12 +195,12 @@ func TestTaskRunnerRun(t *testing.T) { return } - // fakeSuccessfulRun returns a new set of dependencies that + // fakeSuccessfulDeps returns a new set of dependencies that // will perform a fully successful, but fake, run. // // You MAY override some functions to provoke specific errors // or generally change the operating conditions. - fakeSuccessfulRun := func() *MockableTaskRunnerDependencies { + fakeSuccessfulDeps := func() *MockableTaskRunnerDependencies { deps := &MockableTaskRunnerDependencies{ // Configure the fake experiment @@ -269,6 +269,15 @@ func TestTaskRunnerRun(t *testing.T) { return "GARR" }, }, + + Loader: &mocks.ExperimentTargetLoader{ + MockLoad: func(ctx context.Context) ([]model.ExperimentTarget, error) { + targets := []model.ExperimentTarget{ + model.NewOOAPIURLInfoWithDefaultCategoryAndCountry(""), + } + return targets, nil + }, + }, } // The fake session MUST return the fake experiment builder @@ -297,13 +306,13 @@ func TestTaskRunnerRun(t *testing.T) { t.Run("with invalid experiment name", func(t *testing.T) { runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() fake.Session.MockNewExperimentBuilder = func(name string) (model.ExperimentBuilder, error) { return nil, errors.New("invalid experiment name") } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -315,13 +324,13 @@ func TestTaskRunnerRun(t *testing.T) { t.Run("with error during backends lookup", func(t *testing.T) { runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() fake.Session.MockMaybeLookupBackendsContext = func(ctx context.Context) error { return errors.New("mocked error") } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -333,13 +342,13 @@ func TestTaskRunnerRun(t *testing.T) { t.Run("with error during location lookup", func(t *testing.T) { runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() fake.Session.MockMaybeLookupLocationContext = func(ctx context.Context) error { return errors.New("mocked error") } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -353,81 +362,15 @@ func TestTaskRunnerRun(t *testing.T) { assertReducedEventsLike(t, expect, reduced) }) - t.Run("with missing input and InputOrQueryBackend policy", func(t *testing.T) { - runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputOrQueryBackend - } - runner.newSession = fake.NewSession - events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) - expect := []eventKeyCount{ - {Key: eventTypeStatusQueued, Count: 1}, - {Key: eventTypeStatusStarted, Count: 1}, - {Key: eventTypeStatusProgress, Count: 3}, - {Key: eventTypeStatusGeoIPLookup, Count: 1}, - {Key: eventTypeStatusResolverLookup, Count: 1}, - {Key: eventTypeFailureStartup, Count: 1}, - {Key: eventTypeStatusEnd, Count: 1}, - } - assertReducedEventsLike(t, expect, reduced) - }) - - t.Run("with missing input and InputStrictlyRequired policy", func(t *testing.T) { + t.Run("with error during target loading", func(t *testing.T) { runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputStrictlyRequired + fake := fakeSuccessfulDeps() + fake.Loader.MockLoad = func(ctx context.Context) ([]model.ExperimentTarget, error) { + return nil, errors.New("mocked error") } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) - expect := []eventKeyCount{ - {Key: eventTypeStatusQueued, Count: 1}, - {Key: eventTypeStatusStarted, Count: 1}, - {Key: eventTypeStatusProgress, Count: 3}, - {Key: eventTypeStatusGeoIPLookup, Count: 1}, - {Key: eventTypeStatusResolverLookup, Count: 1}, - {Key: eventTypeFailureStartup, Count: 1}, - {Key: eventTypeStatusEnd, Count: 1}, - } - assertReducedEventsLike(t, expect, reduced) - }) - - t.Run("with InputOrStaticDefault policy and experiment with no static input", - func(t *testing.T) { - runner, emitter := newRunnerForTesting() - runner.settings.Name = "Antani" // no input for this experiment - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputOrStaticDefault - } - runner.newSession = fake.NewSession - events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) - expect := []eventKeyCount{ - {Key: eventTypeStatusQueued, Count: 1}, - {Key: eventTypeStatusStarted, Count: 1}, - {Key: eventTypeStatusProgress, Count: 3}, - {Key: eventTypeStatusGeoIPLookup, Count: 1}, - {Key: eventTypeStatusResolverLookup, Count: 1}, - {Key: eventTypeFailureStartup, Count: 1}, - {Key: eventTypeStatusEnd, Count: 1}, - } - assertReducedEventsLike(t, expect, reduced) - }) - - t.Run("with InputNone policy and provided input", func(t *testing.T) { - runner, emitter := newRunnerForTesting() - runner.settings.Inputs = append(runner.settings.Inputs, "https://x.org/") - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputNone - } - runner.newSession = fake.NewSession - events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -442,13 +385,13 @@ func TestTaskRunnerRun(t *testing.T) { t.Run("with failure opening report", func(t *testing.T) { runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() fake.Experiment.MockOpenReportContext = func(ctx context.Context) error { return errors.New("mocked error") } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -461,15 +404,13 @@ func TestTaskRunnerRun(t *testing.T) { assertReducedEventsLike(t, expect, reduced) }) - t.Run("with success and InputNone policy", func(t *testing.T) { + t.Run("with success and just a single entry", func(t *testing.T) { runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputNone - } + fake := fakeSuccessfulDeps() + // Note that a single entry is the default we use runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -487,18 +428,16 @@ func TestTaskRunnerRun(t *testing.T) { assertReducedEventsLike(t, expect, reduced) }) - t.Run("with measurement failure and InputNone policy", func(t *testing.T) { + t.Run("with failure and just a single entry", func(t *testing.T) { runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputNone - } + fake := fakeSuccessfulDeps() + // Note that a single entry is the default we use fake.Experiment.MockMeasureWithContext = func(ctx context.Context, target model.ExperimentTarget) (measurement *model.Measurement, err error) { return nil, errors.New("preconditions error") } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -519,10 +458,8 @@ func TestTaskRunnerRun(t *testing.T) { // we are not crashing when the measurement fails and there are annotations, // which is what was happening in the above referenced issue. runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputNone - } + fake := fakeSuccessfulDeps() + // Note that a single entry is the default we use fake.Experiment.MockMeasureWithContext = func(ctx context.Context, target model.ExperimentTarget) (measurement *model.Measurement, err error) { return nil, errors.New("preconditions error") } @@ -531,7 +468,7 @@ func TestTaskRunnerRun(t *testing.T) { "architecture": "arm64", } events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -547,64 +484,23 @@ func TestTaskRunnerRun(t *testing.T) { assertReducedEventsLike(t, expect, reduced) }) - t.Run("with success and InputStrictlyRequired", func(t *testing.T) { + t.Run("with success and explicit input provided", func(t *testing.T) { runner, emitter := newRunnerForTesting() runner.settings.Inputs = []string{"a", "b", "c", "d"} - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() fake.Builder.MockInputPolicy = func() model.InputPolicy { return model.InputStrictlyRequired } - runner.newSession = fake.NewSession - events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) - expect := []eventKeyCount{ - {Key: eventTypeStatusQueued, Count: 1}, - {Key: eventTypeStatusStarted, Count: 1}, - {Key: eventTypeStatusProgress, Count: 3}, - {Key: eventTypeStatusGeoIPLookup, Count: 1}, - {Key: eventTypeStatusResolverLookup, Count: 1}, - {Key: eventTypeStatusProgress, Count: 1}, - {Key: eventTypeStatusReportCreate, Count: 1}, - // - {Key: eventTypeStatusMeasurementStart, Count: 1}, - {Key: eventTypeStatusProgress, Count: 1}, - {Key: eventTypeMeasurement, Count: 1}, - {Key: eventTypeStatusMeasurementSubmission, Count: 1}, - {Key: eventTypeStatusMeasurementDone, Count: 1}, - // - {Key: eventTypeStatusMeasurementStart, Count: 1}, - {Key: eventTypeStatusProgress, Count: 1}, - {Key: eventTypeMeasurement, Count: 1}, - {Key: eventTypeStatusMeasurementSubmission, Count: 1}, - {Key: eventTypeStatusMeasurementDone, Count: 1}, - // - {Key: eventTypeStatusMeasurementStart, Count: 1}, - {Key: eventTypeStatusProgress, Count: 1}, - {Key: eventTypeMeasurement, Count: 1}, - {Key: eventTypeStatusMeasurementSubmission, Count: 1}, - {Key: eventTypeStatusMeasurementDone, Count: 1}, - // - {Key: eventTypeStatusMeasurementStart, Count: 1}, - {Key: eventTypeStatusProgress, Count: 1}, - {Key: eventTypeMeasurement, Count: 1}, - {Key: eventTypeStatusMeasurementSubmission, Count: 1}, - {Key: eventTypeStatusMeasurementDone, Count: 1}, - // - {Key: eventTypeStatusEnd, Count: 1}, - } - assertReducedEventsLike(t, expect, reduced) - }) - - t.Run("with success and InputOptional and input", func(t *testing.T) { - runner, emitter := newRunnerForTesting() - runner.settings.Inputs = []string{"a", "b", "c", "d"} - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputOptional + fake.Loader.MockLoad = func(ctx context.Context) ([]model.ExperimentTarget, error) { + targets := []model.ExperimentTarget{} + for _, input := range runner.settings.Inputs { + targets = append(targets, model.NewOOAPIURLInfoWithDefaultCategoryAndCountry(input)) + } + return targets, nil } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -643,85 +539,28 @@ func TestTaskRunnerRun(t *testing.T) { assertReducedEventsLike(t, expect, reduced) }) - t.Run("with success and InputOptional and no input", func(t *testing.T) { - runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputOptional - } - runner.newSession = fake.NewSession - events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) - expect := []eventKeyCount{ - {Key: eventTypeStatusQueued, Count: 1}, - {Key: eventTypeStatusStarted, Count: 1}, - {Key: eventTypeStatusProgress, Count: 3}, - {Key: eventTypeStatusGeoIPLookup, Count: 1}, - {Key: eventTypeStatusResolverLookup, Count: 1}, - {Key: eventTypeStatusProgress, Count: 1}, - {Key: eventTypeStatusReportCreate, Count: 1}, - // - {Key: eventTypeStatusMeasurementStart, Count: 1}, - {Key: eventTypeMeasurement, Count: 1}, - {Key: eventTypeStatusMeasurementSubmission, Count: 1}, - {Key: eventTypeStatusMeasurementDone, Count: 1}, - // - {Key: eventTypeStatusEnd, Count: 1}, - } - assertReducedEventsLike(t, expect, reduced) - }) - - t.Run("with success and InputOrStaticDefault", func(t *testing.T) { - experimentName := "DNSCheck" - runner, emitter := newRunnerForTesting() - runner.settings.Name = experimentName - fake := fakeSuccessfulRun() - fake.Builder.MockInputPolicy = func() model.InputPolicy { - return model.InputOrStaticDefault - } - runner.newSession = fake.NewSession - events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) - expect := []eventKeyCount{ - {Key: eventTypeStatusQueued, Count: 1}, - {Key: eventTypeStatusStarted, Count: 1}, - {Key: eventTypeStatusProgress, Count: 3}, - {Key: eventTypeStatusGeoIPLookup, Count: 1}, - {Key: eventTypeStatusResolverLookup, Count: 1}, - {Key: eventTypeStatusProgress, Count: 1}, - {Key: eventTypeStatusReportCreate, Count: 1}, - } - allEntries, err := targetloading.StaticBareInputForExperiment(experimentName) - if err != nil { - t.Fatal(err) - } - // write the correct entries for each expected measurement. - for idx := 0; idx < len(allEntries); idx++ { - expect = append(expect, eventKeyCount{Key: eventTypeStatusMeasurementStart, Count: 1}) - expect = append(expect, eventKeyCount{Key: eventTypeStatusProgress, Count: 1}) - expect = append(expect, eventKeyCount{Key: eventTypeMeasurement, Count: 1}) - expect = append(expect, eventKeyCount{Key: eventTypeStatusMeasurementSubmission, Count: 1}) - expect = append(expect, eventKeyCount{Key: eventTypeStatusMeasurementDone, Count: 1}) - } - expect = append(expect, eventKeyCount{Key: eventTypeStatusEnd, Count: 1}) - assertReducedEventsLike(t, expect, reduced) - }) - t.Run("with success and max runtime", func(t *testing.T) { runner, emitter := newRunnerForTesting() runner.settings.Inputs = []string{"a", "b", "c", "d"} runner.settings.Options.MaxRuntime = 2 - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() fake.Builder.MockInputPolicy = func() model.InputPolicy { return model.InputStrictlyRequired } + fake.Loader.MockLoad = func(ctx context.Context) ([]model.ExperimentTarget, error) { + targets := []model.ExperimentTarget{} + for _, input := range runner.settings.Inputs { + targets = append(targets, model.NewOOAPIURLInfoWithDefaultCategoryAndCountry(input)) + } + return targets, nil + } fake.Experiment.MockMeasureWithContext = func(ctx context.Context, target model.ExperimentTarget) (measurement *model.Measurement, err error) { time.Sleep(1 * time.Second) return &model.Measurement{}, nil } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -752,13 +591,20 @@ func TestTaskRunnerRun(t *testing.T) { runner, emitter := newRunnerForTesting() runner.settings.Inputs = []string{"a", "b", "c", "d"} runner.settings.Options.MaxRuntime = 2 - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() fake.Builder.MockInputPolicy = func() model.InputPolicy { return model.InputStrictlyRequired } fake.Builder.MockInterruptible = func() bool { return true } + fake.Loader.MockLoad = func(ctx context.Context) ([]model.ExperimentTarget, error) { + targets := []model.ExperimentTarget{} + for _, input := range runner.settings.Inputs { + targets = append(targets, model.NewOOAPIURLInfoWithDefaultCategoryAndCountry(input)) + } + return targets, nil + } ctx, cancel := context.WithCancel(context.Background()) fake.Experiment.MockMeasureWithContext = func(ctx context.Context, target model.ExperimentTarget) (measurement *model.Measurement, err error) { cancel() @@ -766,7 +612,7 @@ func TestTaskRunnerRun(t *testing.T) { } runner.newSession = fake.NewSession events := runAndCollectContext(ctx, runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -787,16 +633,23 @@ func TestTaskRunnerRun(t *testing.T) { t.Run("with measurement submission failure", func(t *testing.T) { runner, emitter := newRunnerForTesting() runner.settings.Inputs = []string{"a"} - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() fake.Builder.MockInputPolicy = func() model.InputPolicy { return model.InputStrictlyRequired } fake.Experiment.MockSubmitAndUpdateMeasurementContext = func(ctx context.Context, measurement *model.Measurement) error { return errors.New("cannot submit") } + fake.Loader.MockLoad = func(ctx context.Context) ([]model.ExperimentTarget, error) { + targets := []model.ExperimentTarget{} + for _, input := range runner.settings.Inputs { + targets = append(targets, model.NewOOAPIURLInfoWithDefaultCategoryAndCountry(input)) + } + return targets, nil + } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1}, @@ -819,7 +672,7 @@ func TestTaskRunnerRun(t *testing.T) { t.Run("with success and progress", func(t *testing.T) { runner, emitter := newRunnerForTesting() - fake := fakeSuccessfulRun() + fake := fakeSuccessfulDeps() var callbacks model.ExperimentCallbacks fake.Builder.MockSetCallbacks = func(cbs model.ExperimentCallbacks) { callbacks = cbs @@ -830,7 +683,7 @@ func TestTaskRunnerRun(t *testing.T) { } runner.newSession = fake.NewSession events := runAndCollect(runner, emitter) - reduced := reduceEventsKeysIgnoreLog(events) + reduced := reduceEventsKeysIgnoreLog(t, events) expect := []eventKeyCount{ {Key: eventTypeStatusQueued, Count: 1}, {Key: eventTypeStatusStarted, Count: 1},