Skip to content

Commit

Permalink
fix(server): check if trace belongs to running test before saving it (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Aug 8, 2023
1 parent 5a25f4c commit 3a52fad
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions server/otlp/ingester.go
Expand Up @@ -65,12 +65,23 @@ func (i ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequ
// each request can have different traces so we need to go over each individual trace
for ix, modelTrace := range receivedTraces {
i.log("processing trace %d/%d traceID %s", ix+1, len(receivedTraces), modelTrace.ID.String())
// if at some point we want to save all traces, not just the ones related to a running test
// we can just remove this check
run, err := i.getOngoinTestRunForTrace(ctx, modelTrace)
if errors.Is(err, errNoTestRun) {
i.log("trace %s is not part of any ongoing test run", modelTrace.ID.String())
continue
}
if err != nil {
return nil, fmt.Errorf("failed to get test run: %w", err)
}

err = i.tracePersister.UpdateTraceSpans(ctx, &modelTrace)
if err != nil {
return nil, fmt.Errorf("failed to save trace: %w", err)
}

err = i.notify(ctx, modelTrace, requestType)
err = i.notify(ctx, run, modelTrace, requestType)
if err != nil {
return nil, fmt.Errorf("failed to notify: %w", err)
}
Expand Down Expand Up @@ -110,29 +121,34 @@ func (i ingester) traces(input []*v1.ResourceSpans) []traces.Trace {
return modelTraces
}

func (i ingester) notify(ctx context.Context, trace traces.Trace, requestType string) error {
var errNoTestRun = errors.New("no test run")

func (i ingester) getOngoinTestRunForTrace(ctx context.Context, trace traces.Trace) (test.Run, error) {
run, err := i.runGetter.GetRunByTraceID(ctx, trace.ID)
if errors.Is(err, sql.ErrNoRows) {
// trace is not part of any known test run, no need to notify
return nil
return test.Run{}, errNoTestRun
}
if err != nil {
// there was an actual error accessing the DB
return fmt.Errorf("error getting run by traceID: %w", err)
return test.Run{}, fmt.Errorf("error getting run by traceID: %w", err)
}

if run.State != test.RunStateAwaitingTrace {
// run is not awaiting trace, no need to notify
return nil
return test.Run{}, errNoTestRun
}

return run, nil
}

func (i ingester) notify(ctx context.Context, run test.Run, trace traces.Trace, requestType string) error {
evt := events.TraceOtlpServerReceivedSpans(
run.TestID,
run.ID,
len(trace.Flat),
requestType,
)
err = i.eventEmitter.Emit(ctx, evt)
err := i.eventEmitter.Emit(ctx, evt)
if err != nil {
// there was an actual error accessing the DB
return fmt.Errorf("error getting run by traceID: %w", err)
Expand Down

0 comments on commit 3a52fad

Please sign in to comment.