Skip to content

Commit

Permalink
refactor(experiment): make report goroutine safe
Browse files Browse the repository at this point in the history
This diff refactors *engine.experiment to make the report field
goroutine safe. It also moves at the bottom of experiment.go code
that was intermixed with *engine.experiment methods.

Part of ooni/probe#2607
  • Loading branch information
bassosimone committed Jun 5, 2024
1 parent d47043f commit 83a2f4a
Showing 1 changed file with 79 additions and 37 deletions.
116 changes: 79 additions & 37 deletions internal/engine/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"net/http"
"runtime"
"sync"
"time"

"github.com/ooni/probe-cli/v3/internal/bytecounter"
Expand All @@ -18,71 +19,81 @@ import (
"github.com/ooni/probe-cli/v3/internal/version"
)

// experiment implements Experiment.
// experimentMutableReport is the mutable experiment.report field.
//
// We isolate this into a separate data structure to ease code management. By using this
// pattern, we don't need to be concerned with locking mutexes multiple times and it's just
// a matter of using public methods exported by this struct, which are goroutine safe.
type experimentMutableReport struct {
mu sync.Mutex
report probeservices.ReportChannel
}

// Set atomically sets the report possibly overriding a previously set report.
//
// This method is goroutine safe.
func (emr *experimentMutableReport) Set(report probeservices.ReportChannel) {
emr.mu.Lock()
emr.report = report
emr.mu.Unlock()
}

// Get atomically gets the report possibly returning nil.
func (emr *experimentMutableReport) Get() (report probeservices.ReportChannel) {
emr.mu.Lock()
report = emr.report
emr.mu.Unlock()
return
}

// experiment implements [model.Experiment].
type experiment struct {
byteCounter *bytecounter.Counter
callbacks model.ExperimentCallbacks
measurer model.ExperimentMeasurer
report probeservices.ReportChannel
mrep *experimentMutableReport
session *Session
testName string
testStartTime string
testVersion string
}

// newExperiment creates a new experiment given a measurer.
// newExperiment creates a new [*experiment] given a [model.ExperimentMeasurer].
func newExperiment(sess *Session, measurer model.ExperimentMeasurer) *experiment {
return &experiment{
byteCounter: bytecounter.New(),
callbacks: model.NewPrinterCallbacks(sess.Logger()),
measurer: measurer,
mrep: &experimentMutableReport{},
session: sess,
testName: measurer.ExperimentName(),
testStartTime: model.MeasurementFormatTimeNowUTC(),
testVersion: measurer.ExperimentVersion(),
}
}

// KibiBytesReceived implements Experiment.KibiBytesReceived.
// KibiBytesReceived implements [model.Experiment].
func (e *experiment) KibiBytesReceived() float64 {
return e.byteCounter.KibiBytesReceived()
}

// KibiBytesSent implements Experiment.KibiBytesSent.
// KibiBytesSent implements [model.Experiment].
func (e *experiment) KibiBytesSent() float64 {
return e.byteCounter.KibiBytesSent()
}

// Name implements Experiment.Name.
// Name implements [model.Experiment].
func (e *experiment) Name() string {
return e.testName
}

// ExperimentMeasurementSummaryKeysNotImplemented is the [model.MeasurementSummary] we use when
// the experiment TestKeys do not provide an implementation of [model.MeasurementSummary].
type ExperimentMeasurementSummaryKeysNotImplemented struct{}

var _ model.MeasurementSummaryKeys = &ExperimentMeasurementSummaryKeysNotImplemented{}

// IsAnomaly implements MeasurementSummary.
func (*ExperimentMeasurementSummaryKeysNotImplemented) Anomaly() bool {
return false
}

// MeasurementSummaryKeys returns the [model.MeasurementSummaryKeys] associated with a given measurement.
func MeasurementSummaryKeys(m *model.Measurement) model.MeasurementSummaryKeys {
if tk, ok := m.TestKeys.(model.MeasurementSummaryKeysProvider); ok {
return tk.MeasurementSummaryKeys()
}
return &ExperimentMeasurementSummaryKeysNotImplemented{}
}

// ReportID implements Experiment.ReportID.
// ReportID implements [model.Experiment].
func (e *experiment) ReportID() string {
if e.report == nil {
report := e.mrep.Get()
if report == nil {
return ""
}
return e.report.ReportID()
return report.ReportID()
}

// experimentAsyncWrapper makes a sync experiment behave like it was async
Expand Down Expand Up @@ -122,7 +133,7 @@ func (eaw *experimentAsyncWrapper) RunAsync(
return out, nil
}

// MeasureAsync implements Experiment.MeasureAsync.
// MeasureAsync implements [model.Experiment].
func (e *experiment) MeasureAsync(
ctx context.Context, input string) (<-chan *model.Measurement, error) {
err := e.session.MaybeLookupLocationContext(ctx) // this already tracks session bytes
Expand Down Expand Up @@ -164,7 +175,7 @@ func (e *experiment) MeasureAsync(
return out, nil
}

// MeasureWithContext implements Experiment.MeasureWithContext.
// MeasureWithContext implements [model.Experiment].
func (e *experiment) MeasureWithContext(
ctx context.Context, input string,
) (measurement *model.Measurement, err error) {
Expand All @@ -183,13 +194,13 @@ func (e *experiment) MeasureWithContext(
return
}

// SubmitAndUpdateMeasurementContext implements Experiment.SubmitAndUpdateMeasurementContext.
func (e *experiment) SubmitAndUpdateMeasurementContext(
ctx context.Context, measurement *model.Measurement) error {
if e.report == nil {
// SubmitAndUpdateMeasurementContext implements [model.Experiment].
func (e *experiment) SubmitAndUpdateMeasurementContext(ctx context.Context, m *model.Measurement) error {
report := e.mrep.Get()
if report == nil {
return errors.New("report is not open")
}
return e.report.SubmitMeasurement(ctx, measurement)
return report.SubmitMeasurement(ctx, m)
}

// newMeasurement creates a new measurement for this experiment with the given input.
Expand Down Expand Up @@ -228,9 +239,12 @@ func (e *experiment) newMeasurement(input string) *model.Measurement {

// OpenReportContext implements Experiment.OpenReportContext.
func (e *experiment) OpenReportContext(ctx context.Context) error {
if e.report != nil {
// handle the case where we already opened the report
report := e.mrep.Get()
if report != nil {
return nil // already open
}

// use custom client to have proper byte accounting
httpClient := &http.Client{
Transport: bytecounter.WrapHTTPTransport(
Expand All @@ -244,12 +258,21 @@ func (e *experiment) OpenReportContext(ctx context.Context) error {
return err
}
client.HTTPClient = httpClient // patch HTTP client to use

// create the report template to open the report
template := e.newReportTemplate()
e.report, err = client.OpenReport(ctx, template)

// attempt to open the report
report, err = client.OpenReport(ctx, template)

// handle the error case
if err != nil {
e.session.logger.Debugf("experiment: probe services error: %s", err.Error())
return err
}

// on success, assign the new report
e.mrep.Set(report)
return nil
}

Expand All @@ -266,3 +289,22 @@ func (e *experiment) newReportTemplate() model.OOAPIReportTemplate {
TestVersion: e.testVersion,
}
}

// ExperimentMeasurementSummaryKeysNotImplemented is the [model.MeasurementSummary] we use when
// the experiment TestKeys do not provide an implementation of [model.MeasurementSummary].
type ExperimentMeasurementSummaryKeysNotImplemented struct{}

var _ model.MeasurementSummaryKeys = &ExperimentMeasurementSummaryKeysNotImplemented{}

// IsAnomaly implements MeasurementSummary.
func (*ExperimentMeasurementSummaryKeysNotImplemented) Anomaly() bool {
return false
}

// MeasurementSummaryKeys returns the [model.MeasurementSummaryKeys] associated with a given measurement.
func MeasurementSummaryKeys(m *model.Measurement) model.MeasurementSummaryKeys {
if tk, ok := m.TestKeys.(model.MeasurementSummaryKeysProvider); ok {
return tk.MeasurementSummaryKeys()
}
return &ExperimentMeasurementSummaryKeysNotImplemented{}
}

0 comments on commit 83a2f4a

Please sign in to comment.