Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): check if trace belongs to running test before saving it #3038

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 23 additions & 7 deletions server/otlp/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,23 @@ func (i ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequ
// each request can have different traces so we need to go over each individual trace
for ix, modelTrace := range receivedTraces {
i.log("processing trace %d/%d traceID %s", ix+1, len(receivedTraces), modelTrace.ID.String())
// if at some point we want to save all traces, not just the ones related to a running test
// we can just remove this check
run, err := i.getOngoinTestRunForTrace(ctx, modelTrace)
if errors.Is(err, errNoTestRun) {
i.log("trace %s is not part of any ongoing test run", modelTrace.ID.String())
continue
}
if err != nil {
return nil, fmt.Errorf("failed to get test run: %w", err)
}

err = i.tracePersister.UpdateTraceSpans(ctx, &modelTrace)
if err != nil {
return nil, fmt.Errorf("failed to save trace: %w", err)
}

err = i.notify(ctx, modelTrace, requestType)
err = i.notify(ctx, run, modelTrace, requestType)
if err != nil {
return nil, fmt.Errorf("failed to notify: %w", err)
}
Expand Down Expand Up @@ -110,29 +121,34 @@ func (i ingester) traces(input []*v1.ResourceSpans) []traces.Trace {
return modelTraces
}

func (i ingester) notify(ctx context.Context, trace traces.Trace, requestType string) error {
var errNoTestRun = errors.New("no test run")

func (i ingester) getOngoinTestRunForTrace(ctx context.Context, trace traces.Trace) (test.Run, error) {
run, err := i.runGetter.GetRunByTraceID(ctx, trace.ID)
if errors.Is(err, sql.ErrNoRows) {
// trace is not part of any known test run, no need to notify
return nil
return test.Run{}, errNoTestRun
}
if err != nil {
// there was an actual error accessing the DB
return fmt.Errorf("error getting run by traceID: %w", err)
return test.Run{}, fmt.Errorf("error getting run by traceID: %w", err)
}

if run.State != test.RunStateAwaitingTrace {
// run is not awaiting trace, no need to notify
return nil
return test.Run{}, errNoTestRun
}

return run, nil
}

func (i ingester) notify(ctx context.Context, run test.Run, trace traces.Trace, requestType string) error {
evt := events.TraceOtlpServerReceivedSpans(
run.TestID,
run.ID,
len(trace.Flat),
requestType,
)
err = i.eventEmitter.Emit(ctx, evt)
err := i.eventEmitter.Emit(ctx, evt)
if err != nil {
// there was an actual error accessing the DB
return fmt.Errorf("error getting run by traceID: %w", err)
Expand Down