Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
bassosimone committed Jun 7, 2024
1 parent c4f28d2 commit a6bdeb7
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 295 deletions.
116 changes: 43 additions & 73 deletions pkg/oonimkall/taskrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ooni/probe-cli/v3/internal/kvstore"
"github.com/ooni/probe-cli/v3/internal/model"
"github.com/ooni/probe-cli/v3/internal/runtimex"
"github.com/ooni/probe-cli/v3/internal/targetloading"
)

// runnerForTask runs a specific task
Expand Down Expand Up @@ -180,47 +179,23 @@ func (r *runnerForTask) Run(rootCtx context.Context) {

builder.SetCallbacks(&runnerCallbacks{emitter: r.emitter})

// TODO(bassosimone): replace the following code with an
// invocation of the targetloading.Loader. Since I am making these
// changes before a release and I've already changed the
// code a lot, I'd rather avoid changing it even more,
// for the following reason:
//
// If we add and call targetloading.Loader here, this code will
// magically invoke check-in for InputOrQueryBackend,
// which we need to make sure the app can handle. This is
// the main reason why now I don't fill like properly
// fixing this code and use targetloading.Loader: too much work
// in too little time, so mistakes more likely.
//
// In fact, our current app assumes that it's its
// responsibility to load the inputs, not oonimkall's.
switch builder.InputPolicy() {
case model.InputOrQueryBackend, model.InputStrictlyRequired:
if len(r.settings.Inputs) <= 0 {
r.emitter.EmitFailureStartup("no input provided")
return
}
case model.InputOrStaticDefault:
if len(r.settings.Inputs) <= 0 {
inputs, err := targetloading.StaticBareInputForExperiment(r.settings.Name)
if err != nil {
r.emitter.EmitFailureStartup("no default static input for this experiment")
return
}
r.settings.Inputs = inputs
}
case model.InputOptional:
if len(r.settings.Inputs) <= 0 {
r.settings.Inputs = append(r.settings.Inputs, "")
}
default: // treat this case as engine.InputNone.
if len(r.settings.Inputs) > 0 {
r.emitter.EmitFailureStartup("experiment does not accept input")
return
}
r.settings.Inputs = append(r.settings.Inputs, "")
// Load targets. Note that, for Web Connectivity, the mobile app has
// already loaded inputs and provides them as r.settings.Inputs.
loader := builder.NewTargetLoader(&model.ExperimentTargetLoaderConfig{
CheckInConfig: &model.OOAPICheckInConfig{
// Not needed since the app already provides the
// inputs to use for Web Connectivity.
},
Session: sess,
StaticInputs: r.settings.Inputs,
SourceFiles: []string{},
})
targets, err := loader.Load(rootCtx)
if err != nil {
r.emitter.EmitFailureStartup(err.Error())
return
}

experiment := builder.NewExperiment()
defer func() {
endEvent.DownloadedKB = experiment.KibiBytesReceived()
Expand All @@ -241,48 +216,43 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
defer measCancel()
submitCtx, submitCancel := context.WithCancel(rootCtx)
defer submitCancel()

// This deviates a little bit from measurement-kit, for which
// a zero timeout is actually valid. Since it does not make much
// sense, here we're changing the behaviour.
//
// See https://github.com/measurement-kit/measurement-kit/issues/1922
if r.settings.Options.MaxRuntime > 0 {
// We want to honour max_runtime only when we're running an
// experiment that clearly wants specific input. We could refine
// this policy in the future, but for now this covers in a
// reasonable way web connectivity, so we should be ok.
switch builder.InputPolicy() {
case model.InputOrQueryBackend, model.InputStrictlyRequired:
var (
cancelMeas context.CancelFunc
cancelSubmit context.CancelFunc
)
// We give the context used for submitting extra time so that
// it's possible to submit the last measurement.
//
// See https://github.com/ooni/probe/issues/2037 for more info.
maxRuntime := time.Duration(r.settings.Options.MaxRuntime) * time.Second
measCtx, cancelMeas = context.WithTimeout(measCtx, maxRuntime)
defer cancelMeas()
maxRuntime += 30 * time.Second
submitCtx, cancelSubmit = context.WithTimeout(submitCtx, maxRuntime)
defer cancelSubmit()
}
if r.settings.Options.MaxRuntime > 0 && len(targets) > 1 {
var (
cancelMeas context.CancelFunc
cancelSubmit context.CancelFunc
)
// We give the context used for submitting extra time so that
// it's possible to submit the last measurement.
//
// See https://github.com/ooni/probe/issues/2037 for more info.
maxRuntime := time.Duration(r.settings.Options.MaxRuntime) * time.Second
measCtx, cancelMeas = context.WithTimeout(measCtx, maxRuntime)
defer cancelMeas()
maxRuntime += 30 * time.Second
submitCtx, cancelSubmit = context.WithTimeout(submitCtx, maxRuntime)
defer cancelSubmit()
}
inputCount := len(r.settings.Inputs)

inputCount := len(targets)
start := time.Now()
inflatedMaxRuntime := r.settings.Options.MaxRuntime + r.settings.Options.MaxRuntime/10
eta := start.Add(time.Duration(inflatedMaxRuntime) * time.Second)
for idx, input := range r.settings.Inputs {
for idx, target := range targets {
if measCtx.Err() != nil {
break
}
logger.Infof("Starting measurement with index %d", idx)
r.emitter.Emit(eventTypeStatusMeasurementStart, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
Input: target.Input(),
})
if input != "" && inputCount > 0 {
if target.Input() != "" && inputCount > 0 {
var percentage float64
if r.settings.Options.MaxRuntime > 0 {
now := time.Now()
Expand All @@ -291,7 +261,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
percentage = (float64(idx)/float64(inputCount))*0.6 + 0.4
}
r.emitter.EmitStatusProgress(percentage, fmt.Sprintf(
"processing %s", input,
"processing %s", target,
))
}

Expand All @@ -305,7 +275,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
// we can always do, since it only has string accessors.
m, err := experiment.MeasureWithContext(
r.contextForExperiment(measCtx, builder),
model.NewOOAPIURLInfoWithDefaultCategoryAndCountry(input),
target,
)

if builder.Interruptible() && measCtx.Err() != nil {
Expand All @@ -317,7 +287,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
r.emitter.Emit(eventTypeFailureMeasurement, eventMeasurementGeneric{
Failure: err.Error(),
Idx: int64(idx),
Input: input,
Input: target.Input(),
})
// Historical note: here we used to fallthrough but, since we have
// implemented async measurements, the case where there is an error
Expand All @@ -330,7 +300,7 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
runtimex.PanicOnError(err, "measurement.MarshalJSON failed")
r.emitter.Emit(eventTypeMeasurement, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
Input: target.Input(),
JSONStr: string(data),
})
if !r.settings.Options.NoCollector {
Expand All @@ -339,14 +309,14 @@ func (r *runnerForTask) Run(rootCtx context.Context) {
warnOnFailure(logger, "cannot submit measurement", err)
r.emitter.Emit(measurementSubmissionEventName(err), eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
Input: target.Input(),
JSONStr: string(data),
Failure: measurementSubmissionFailure(err),
})
}
r.emitter.Emit(eventTypeStatusMeasurementDone, eventMeasurementGeneric{
Idx: int64(idx),
Input: input,
Input: target.Input(),
})
}
}
Expand Down
Loading

0 comments on commit a6bdeb7

Please sign in to comment.