Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Jul 25, 2023
1 parent eb85fd0 commit 56905b8
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 307 deletions.
14 changes: 8 additions & 6 deletions server/app/test_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ func buildTestPipeline(
lintRepo,
)

pollerExecutor := executor.NewPollerExecutor(
tracer,
execTestUpdater,
tracedb.Factory(runRepo),
dsRepo,
pollerExecutor := executor.NewSelectorBasedPoller(
executor.NewPollerExecutor(
tracer,
execTestUpdater,
tracedb.Factory(runRepo),
dsRepo,
eventEmitter,
),
eventEmitter,
)

pollerExecutor = executor.NewSelectorBasedPoller(pollerExecutor, eventEmitter)
tracePoller := executor.NewTracePoller(
pollerExecutor,
execTestUpdater,
Expand Down
192 changes: 103 additions & 89 deletions server/executor/default_poller_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,30 @@ type DefaultPollerExecutor struct {

type InstrumentedPollerExecutor struct {
tracer trace.Tracer
pollerExecutor PollerExecutor
pollerExecutor pollerExecutor
}

func (pe InstrumentedPollerExecutor) ExecuteRequest(ctx context.Context, request *PollingRequest) (bool, string, test.Run, error) {
_, span := pe.tracer.Start(request.Context(), "Fetch trace")
func (pe InstrumentedPollerExecutor) ExecuteRequest(ctx context.Context, job *Job) (PollResult, error) {
_, span := pe.tracer.Start(ctx, "Fetch trace")
defer span.End()

finished, finishReason, run, err := pe.pollerExecutor.ExecuteRequest(ctx, request)
res, err := pe.pollerExecutor.ExecuteRequest(ctx, job)

spanCount := 0
if run.Trace != nil {
spanCount = len(run.Trace.Flat)
if job.Run.Trace != nil {
spanCount = len(job.Run.Trace.Flat)
}

attrs := []attribute.KeyValue{
attribute.String("tracetest.run.trace_poller.trace_id", request.run.TraceID.String()),
attribute.String("tracetest.run.trace_poller.span_id", request.run.SpanID.String()),
attribute.Bool("tracetest.run.trace_poller.succesful", finished),
attribute.String("tracetest.run.trace_poller.test_id", string(request.test.ID)),
attribute.String("tracetest.run.trace_poller.trace_id", job.Run.TraceID.String()),
attribute.String("tracetest.run.trace_poller.span_id", job.Run.SpanID.String()),
attribute.Bool("tracetest.run.trace_poller.succesful", res.Finished()),
attribute.String("tracetest.run.trace_poller.test_id", string(job.Test.ID)),
attribute.Int("tracetest.run.trace_poller.amount_retrieved_spans", spanCount),
}

if finishReason != "" {
attrs = append(attrs, attribute.String("tracetest.run.trace_poller.finish_reason", finishReason))
if res.reason != "" {
attrs = append(attrs, attribute.String("tracetest.run.trace_poller.finish_reason", res.reason))
}

if err != nil {
Expand All @@ -58,7 +58,7 @@ func (pe InstrumentedPollerExecutor) ExecuteRequest(ctx context.Context, request
}

span.SetAttributes(attrs...)
return finished, finishReason, run, err
return res, err
}

func NewPollerExecutor(
Expand All @@ -67,18 +67,16 @@ func NewPollerExecutor(
newTraceDBFn traceDBFactoryFn,
dsRepo resourcemanager.Current[datastore.DataStore],
eventEmitter EventEmitter,
) PollerExecutor {

defaultExecutor := &DefaultPollerExecutor{
updater: updater,
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
eventEmitter: eventEmitter,
}
) *InstrumentedPollerExecutor {

return &InstrumentedPollerExecutor{
tracer: tracer,
pollerExecutor: defaultExecutor,
tracer: tracer,
pollerExecutor: &DefaultPollerExecutor{
updater: updater,
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
eventEmitter: eventEmitter,
},
}
}

Expand All @@ -96,109 +94,125 @@ func (pe DefaultPollerExecutor) traceDB(ctx context.Context) (tracedb.TraceDB, e
return tdb, nil
}

func (pe DefaultPollerExecutor) ExecuteRequest(ctx context.Context, request *PollingRequest) (bool, string, test.Run, error) {
log.Printf("[PollerExecutor] Test %s Run %d: ExecuteRequest", request.test.ID, request.run.ID)
run := request.run
func (pe DefaultPollerExecutor) ExecuteRequest(ctx context.Context, job *Job) (PollResult, error) {
log.Printf("[PollerExecutor] Test %s Run %d: ExecuteRequest", job.Test.ID, job.Run.ID)

traceDB, err := pe.traceDB(ctx)
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s", request.test.ID, request.run.ID, err.Error())
return false, "", test.Run{}, err
log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s", job.Test.ID, job.Run.ID, err.Error())
return PollResult{}, err
}

if request.IsFirstRequest() {
if testableTraceDB, ok := traceDB.(tracedb.TestableTraceDB); ok {
connectionResult := testableTraceDB.TestConnection(ctx)

err = pe.eventEmitter.Emit(ctx, events.TraceDataStoreConnectionInfo(request.test.ID, request.run.ID, connectionResult))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceDataStoreConnectionInfo event: error: %s", request.test.ID, request.run.ID, err.Error())
}
}

endpoints := traceDB.GetEndpoints()
ds, err := pe.dsRepo.Current(ctx)
if isFirstRequest(job) {
err := pe.testConnection(ctx, traceDB, job)
if err != nil {
return false, "", test.Run{}, fmt.Errorf("could not get current datastore: %w", err)
}

err = pe.eventEmitter.Emit(ctx, events.TracePollingStart(request.test.ID, request.run.ID, string(ds.Type), endpoints))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingStart event: error: %s", request.test.ID, request.run.ID, err.Error())
return PollResult{}, err
}
}

traceID := run.TraceID.String()
traceID := job.Run.TraceID.String()
trace, err := traceDB.GetTraceByID(ctx, traceID)
if err != nil {
anotherErr := pe.eventEmitter.Emit(ctx, events.TracePollingIterationInfo(request.test.ID, request.run.ID, 0, request.count, false, err.Error()))
if anotherErr != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s", request.test.ID, request.run.ID, anotherErr.Error())
}

log.Printf("[PollerExecutor] Test %s Run %d: GetTraceByID (traceID %s) error: %s", request.test.ID, request.run.ID, traceID, err.Error())
return false, "", test.Run{}, err
pe.emit(ctx, job, events.TracePollingIterationInfo(job.Test.ID, job.Run.ID, 0, job.EnqueueCount(), false, err.Error()))
log.Printf("[PollerExecutor] Test %s Run %d: GetTraceByID (traceID %s) error: %s", job.Test.ID, job.Run.ID, traceID, err.Error())
return PollResult{}, err
}

trace.ID = run.TraceID
done, reason := pe.donePollingTraces(request, traceDB, trace)
trace.ID = job.Run.TraceID
done, reason := pe.donePollingTraces(job, traceDB, trace)
// we need both values to be different to check for done, but after we want to have an updated job
job.Run.Trace = &trace

if !done {
err := pe.eventEmitter.Emit(ctx, events.TracePollingIterationInfo(request.test.ID, request.run.ID, len(trace.Flat), request.count, false, reason))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s", request.test.ID, request.run.ID, err.Error())
}
pe.emit(ctx, job, events.TracePollingIterationInfo(job.Test.ID, job.Run.ID, len(job.Run.Trace.Flat), job.EnqueueCount(), false, reason))
log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)", job.Test.ID, job.Run.ID, reason)

log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)", request.test.ID, request.run.ID, reason)
run.Trace = &trace
request.run = run
return false, "", run, nil
return PollResult{
finished: false,
reason: reason,
run: job.Run,
}, nil
}
log.Printf("[PollerExecutor] Test %s Run %d: Done polling. (%s)", job.Test.ID, job.Run.ID, reason)

log.Printf("[PollerExecutor] Test %s Run %d: Done polling. (%s)", request.test.ID, request.run.ID, reason)
log.Printf("[PollerExecutor] Test %s Run %d: Start Sorting", job.Test.ID, job.Run.ID)
sorted := job.Run.Trace.Sort()
job.Run.Trace = &sorted
log.Printf("[PollerExecutor] Test %s Run %d: Sorting complete", job.Test.ID, job.Run.ID)

log.Printf("[PollerExecutor] Test %s Run %d: Start Sorting", request.test.ID, request.run.ID)
trace = trace.Sort()
log.Printf("[PollerExecutor] Test %s Run %d: Sorting complete", request.test.ID, request.run.ID)
run.Trace = &trace
request.run = run

if !trace.HasRootSpan() {
newRoot := test.NewTracetestRootSpan(run)
run.Trace = run.Trace.InsertRootSpan(newRoot)
if !job.Run.Trace.HasRootSpan() {
newRoot := test.NewTracetestRootSpan(job.Run)
job.Run.Trace = job.Run.Trace.InsertRootSpan(newRoot)
} else {
run.Trace.RootSpan = model.AugmentRootSpan(run.Trace.RootSpan, run.TriggerResult)
job.Run.Trace.RootSpan = model.AugmentRootSpan(job.Run.Trace.RootSpan, job.Run.TriggerResult)
}
run = run.SuccessfullyPolledTraces(run.Trace)
job.Run = job.Run.SuccessfullyPolledTraces(job.Run.Trace)

log.Printf("[PollerExecutor] Completed polling process for Test Run %d after %d iterations, number of spans collected: %d ", job.Run.ID, job.EnqueueCount()+1, len(job.Run.Trace.Flat))

log.Printf("[PollerExecutor] Completed polling process for Test Run %d after %d iterations, number of spans collected: %d ", run.ID, request.count+1, len(run.Trace.Flat))
log.Printf("[PollerExecutor] Test %s Run %d: Start updating", job.Test.ID, job.Run.ID)
err = pe.updater.Update(ctx, job.Run)
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: Update error: %s", job.Test.ID, job.Run.ID, err.Error())
return PollResult{}, err
}

return PollResult{
finished: true,
reason: reason,
run: job.Run,
}, nil

}

func (pe DefaultPollerExecutor) emit(ctx context.Context, job *Job, event model.TestRunEvent) {
err := pe.eventEmitter.Emit(ctx, event)
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingIterationInfo event: error: %s", job.Test.ID, job.Run.ID, err.Error())
}
}

func (pe DefaultPollerExecutor) testConnection(ctx context.Context, traceDB tracedb.TraceDB, job *Job) error {
if testableTraceDB, ok := traceDB.(tracedb.TestableTraceDB); ok {
connectionResult := testableTraceDB.TestConnection(ctx)

err := pe.eventEmitter.Emit(ctx, events.TraceDataStoreConnectionInfo(job.Test.ID, job.Run.ID, connectionResult))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceDataStoreConnectionInfo event: error: %s", job.Test.ID, job.Run.ID, err.Error())
}
}

endpoints := traceDB.GetEndpoints()
ds, err := pe.dsRepo.Current(ctx)
if err != nil {
return fmt.Errorf("could not get current datastore: %w", err)
}

log.Printf("[PollerExecutor] Test %s Run %d: Start updating", request.test.ID, request.run.ID)
err = pe.updater.Update(ctx, run)
err = pe.eventEmitter.Emit(ctx, events.TracePollingStart(job.Test.ID, job.Run.ID, string(ds.Type), endpoints))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: Update error: %s", request.test.ID, request.run.ID, err.Error())
return false, "", test.Run{}, err
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TracePollingStart event: error: %s", job.Test.ID, job.Run.ID, err.Error())
}

return true, reason, run, nil
return nil
}

func (pe DefaultPollerExecutor) donePollingTraces(job *PollingRequest, traceDB tracedb.TraceDB, trace model.Trace) (bool, string) {
func (pe DefaultPollerExecutor) donePollingTraces(job *Job, traceDB tracedb.TraceDB, trace model.Trace) (bool, string) {
if !traceDB.ShouldRetry() {
return true, "TraceDB is not retryable"
}

maxTracePollRetry := job.pollingProfile.Periodic.MaxTracePollRetry()
maxTracePollRetry := job.PollingProfile.Periodic.MaxTracePollRetry()
// we're done if we have the same amount of spans after polling or `maxTracePollRetry` times
log.Printf("[PollerExecutor] Test %s Run %d: Job count %d, max retries: %d", job.test.ID, job.run.ID, job.count, maxTracePollRetry)
if job.count == maxTracePollRetry {
log.Printf("[PollerExecutor] Test %s Run %d: Job count %d, max retries: %d", job.Test.ID, job.Run.ID, job.EnqueueCount(), maxTracePollRetry)
if job.EnqueueCount() == maxTracePollRetry {
return true, fmt.Sprintf("Hit MaxRetry of %d", maxTracePollRetry)
}

if job.run.Trace == nil {
if job.Run.Trace == nil {
return false, "First iteration"
}

haveNotCollectedSpansSinceLastPoll := len(trace.Flat) == len(job.run.Trace.Flat)
haveNotCollectedSpansSinceLastPoll := len(trace.Flat) == len(job.Run.Trace.Flat)
haveCollectedSpansInTestRun := len(trace.Flat) > 0
haveCollectedOnlyRootNode := len(trace.Flat) == 1 && trace.HasRootSpan()

Expand All @@ -211,5 +225,5 @@ func (pe DefaultPollerExecutor) donePollingTraces(job *PollingRequest, traceDB t
return true, fmt.Sprintf("Trace has no new spans. Spans found: %d", len(trace.Flat))
}

return false, fmt.Sprintf("New spans found. Before: %d After: %d", len(job.run.Trace.Flat), len(trace.Flat))
return false, fmt.Sprintf("New spans found. Before: %d After: %d", len(job.Run.Trace.Flat), len(trace.Flat))
}
47 changes: 30 additions & 17 deletions server/executor/poller_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ type iterationExpectedValues struct {
expectRootSpan bool
}

func executeAndValidatePollingRequests(t *testing.T, retryDelay, maxWaitTimeForTrace time.Duration, pollerExecutor executor.PollerExecutor, expectedValues []iterationExpectedValues) {
func executeAndValidatePollingRequests(t *testing.T, retryDelay, maxWaitTimeForTrace time.Duration, pollerExecutor *executor.InstrumentedPollerExecutor, expectedValues []iterationExpectedValues) {
ctx := context.Background()
run := test.NewRun()

Expand All @@ -457,30 +457,40 @@ func executeAndValidatePollingRequests(t *testing.T, retryDelay, maxWaitTimeForT
},
}

for i, expected := range expectedValues {
pp := pollingprofile.DefaultPollingProfile
pp.Periodic.RetryDelay = retryDelay.String()
pp.Periodic.Timeout = maxWaitTimeForTrace.String()
request := executor.NewPollingRequest(ctx, test, run, i, pollingprofile.DefaultPollingProfile)
// using `pollingprofile.DefaultPollingProfile` and changing the periodic configs directly
// causes a race condition because `DefaultPollingProfile.Periodic` is a reference, so it's shared by the copies.
// The easiest solution is to create a new polling profile for each test.
pp := pollingprofile.PollingProfile{
Strategy: pollingprofile.Periodic,
Periodic: &pollingprofile.PeriodicPollingConfig{
RetryDelay: retryDelay.String(),
Timeout: maxWaitTimeForTrace.String(),
},
}

finished, finishReason, nextRun, err := pollerExecutor.ExecuteRequest(ctx, request)
run = nextRun // should store a run to use in another iteration
job := executor.NewJob()
job.Test = test
job.Run = run
job.PollingProfile = pp

require.NotNilf(t, run, "The test run should not be nil on iteration %d", i)
for i, expected := range expectedValues {
res, err := pollerExecutor.ExecuteRequest(ctx, &job)
run = res.Run() // should store a run to use in another iteration

if expected.finished {
require.Truef(t, finished, "The poller should have finished on iteration %d", i)
require.NotEmptyf(t, finishReason, "The poller should not have finish reason on iteration %d", i)
} else {
require.Falsef(t, finished, "The poller should have not finished on iteration %d", i)
require.Emptyf(t, finishReason, "The poller should have finish reason on iteration %d", i)
}
require.NotNilf(t, run, "The test run should not be nil on iteration %d", i)

if expected.expectNoTraceError {
require.Errorf(t, err, "An error should have happened on iteration %d", i)
require.ErrorIsf(t, err, connection.ErrTraceNotFound, "An connection error should have happened on iteration %d", i)
} else {
require.NoErrorf(t, err, "An error should not have happened on iteration %d", i)
require.NotEmptyf(t, res.Reason(), "The poller should have a reason on iteration %d", i)

if expected.finished {
require.Truef(t, res.Finished(), "The poller should have finished on iteration %d", i)
} else {
require.Falsef(t, res.Finished(), "The poller should have not finished on iteration %d", i)
}

// only validate root span if we have a root span
if expected.expectRootSpan {
Expand All @@ -489,10 +499,13 @@ func executeAndValidatePollingRequests(t *testing.T, retryDelay, maxWaitTimeForT
require.Falsef(t, run.Trace.HasRootSpan(), "The trace associated with the run on iteration %d should not have a root span", i)
}
}

job.IncreaseEnqueueCount()
job.Headers.SetBool("requeued", true)
}
}

func getPollerExecutorWithMocks(t *testing.T, tracePerIteration []model.Trace) executor.PollerExecutor {
func getPollerExecutorWithMocks(t *testing.T, tracePerIteration []model.Trace) *executor.InstrumentedPollerExecutor {
updater := getRunUpdaterMock(t)
tracer := getTracerMock(t)
testDB := getRunRepositoryMock(t)
Expand Down

0 comments on commit 56905b8

Please sign in to comment.