Skip to content

Commit

Permalink
Adding PollingEvents (#2291)
Browse files Browse the repository at this point in the history
* Adding PollingEvents draft

* Change mocking mechanisms for trace poller

* Updating code with PR suggestions and integration env fixes
  • Loading branch information
danielbdias committed Apr 5, 2023
1 parent 5a82464 commit 61fdf00
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 183 deletions.
73 changes: 36 additions & 37 deletions TEST_RUN_EVENTS.csv
@@ -1,37 +1,36 @@
Test Run Events,,,
,,,
EVENT_SUFFIX,Description,,
_INFO,Adding extra context for an specific task,,
_SUCCESS,The task has finalized with no errors,,
_ERROR,An execution problem was found,,
_START,A new task section has begun,,
,,,
,,,
,,,
,,,
,,,
Stage ,Event Type,Description,Definition
Trigger,CREATED_INFO,Trigger Run has been created,
Trigger,RESOLVE_ERROR,Resolving trigger details failed,
Trigger,RESOLVE_SUCCESS,Successful resolving of trigger details,
Trigger,RESOLVE_START,Resolving trigger details based on environment variables,
Trigger,EXECUTION_START,Initial trigger execution,
Trigger,EXECUTION_SUCCESS,Successful trigger execution,
Trigger,HTTP_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger,
Trigger,DOCKER_COMPOSE_HOST_MISMATCH_ERROR,"We identified Tracetest is running inside a docker compose container, so if you are trying to access your local host machine please use the host.docker.internal hostname. For more information, see https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds",
Trigger,GRPC_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger,
Trace,FETCHING_START,Starting the trace fetching process,
Trace,QUEUED_INFO,Trace Run has been queued to start the fetching process,
Trace,DATA_STORE_CONNECTION_INFO,A Data store connection request has been executed,test connection result information
Trace,POLLING_START,Starting the trace polling process,
Trace,POLLING_ITERATION_INFO,A polling iteration has been executed,# of spans - iteration # - reason of next iteration
Trace,POLLING_SUCCESS,The polling strategy has succeeded in fetching the trace from the Data Store,
Trace,POLLING_ERROR,The polling strategy has failed to fetch the trace,
Trace,FETCHING_SUCCESS,The trace was successfully processed by the backend,
Trace,FETCHING_ERROR,The trace was not able to be fetched,
Trace,STOPPED_INFO,The test run was stopped during its execution,
Test,OUTPUT_GENERATION_WARNING,The value for output <output_name> could not be generated,
Test,TEST_SPECS_RUN_SUCCESS,Test Specs were successfully executed,
Test,TEST_SPECS_RUN_ERROR,Test specs execution error,
Test,TEST_SPECS_RUN_START,Test specs execution start,
Test,TEST_SPECS_ASSERTION_ERROR,An assertion in the test spec failed
Test Run Events,,,
,,,
EVENT_SUFFIX,Description,,
_INFO,Adding extra context for an specific task,,
_SUCCESS,The task has finalized with no errors,,
_ERROR,An execution problem was found,,
_START,A new task section has begun,,
,,,
,,,
,,,
,,,
,,,
Stage ,Event Type,Description,Definition
Trigger,CREATED_INFO,Trigger Run has been created,
Trigger,RESOLVE_ERROR,Resolving trigger details failed,
Trigger,RESOLVE_SUCCESS,Successful resolving of trigger details,
Trigger,RESOLVE_START,Resolving trigger details based on environment variables,
Trigger,EXECUTION_START,Initial trigger execution,
Trigger,EXECUTION_SUCCESS,Successful trigger execution,
Trigger,HTTP_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger,
Trigger,DOCKER_COMPOSE_HOST_MISMATCH_ERROR,"We identified Tracetest is running inside a docker compose container, so if you are trying to access your local host machine please use the host.docker.internal hostname. For more information, see https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds",
Trigger,GRPC_UNREACHABLE_HOST_ERROR,Tracetest could not reach the defined host in the trigger,
Trace,DATA_STORE_CONNECTION_INFO,A Data store connection request has been executed,test connection result information
Trace,POLLING_START,Starting the trace polling process,
Trace,POLLING_ITERATION_INFO,A polling iteration has been executed,# of spans - iteration # - reason of next iteration
Trace,POLLING_SUCCESS,The polling strategy has succeeded in fetching the trace from the Data Store,
Trace,POLLING_ERROR,The polling strategy has failed to fetch the trace,
Trace,FETCHING_START,Starting the trace fetching process,
Trace,FETCHING_SUCCESS,The trace was successfully processed by the backend,
Trace,FETCHING_ERROR,The trace was not able to be fetched,
Trace,STOPPED_INFO,The test run was stopped during its execution,
Test,OUTPUT_GENERATION_WARNING,The value for output <output_name> could not be generated,
Test,TEST_SPECS_RUN_SUCCESS,Test Specs were successfully executed,
Test,TEST_SPECS_RUN_ERROR,Test specs execution error,
Test,TEST_SPECS_RUN_START,Test specs execution start,
Test,TEST_SPECS_ASSERTION_ERROR,An assertion in the test spec failed
2 changes: 0 additions & 2 deletions api/testEvents.yaml
Expand Up @@ -39,8 +39,6 @@ components:
type: string
enum:
- periodic
reasonNextIteration:
type: string
isComplete:
type: boolean
periodic:
Expand Down
42 changes: 3 additions & 39 deletions cli/openapi/model_polling_info.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions server/app/facade.go
Expand Up @@ -59,6 +59,7 @@ func newRunnerFacades(
execTestUpdater,
tracedb.Factory(testDB),
testDB,
eventEmitter,
)

tracePoller := executor.NewTracePoller(
Expand All @@ -67,6 +68,7 @@ func newRunnerFacades(
execTestUpdater,
assertionRunner,
subscriptionManager,
eventEmitter,
)

runner := executor.NewPersistentRunner(
Expand Down
59 changes: 50 additions & 9 deletions server/executor/poller_executor.go
Expand Up @@ -2,11 +2,14 @@ package executor

import (
"context"
"errors"
"fmt"
"log"

"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/tracedb/connection"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -18,18 +21,19 @@ type DefaultPollerExecutor struct {
updater RunUpdater
newTraceDBFn traceDBFactoryFn
dsRepo model.DataStoreRepository
eventEmitter EventEmitter
}

type InstrumentedPollerExecutor struct {
tracer trace.Tracer
pollerExecutor PollerExecutor
}

func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, model.Run, error) {
func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, string, model.Run, error) {
_, span := pe.tracer.Start(request.ctx, "Fetch trace")
defer span.End()

finished, run, err := pe.pollerExecutor.ExecuteRequest(request)
finished, finishReason, run, err := pe.pollerExecutor.ExecuteRequest(request)

spanCount := 0
if run.Trace != nil {
Expand All @@ -44,13 +48,17 @@ func (pe InstrumentedPollerExecutor) ExecuteRequest(request *PollingRequest) (bo
attribute.Int("tracetest.run.trace_poller.amount_retrieved_spans", spanCount),
}

if finishReason != "" {
attrs = append(attrs, attribute.String("tracetest.run.trace_poller.finish_reason", finishReason))
}

if err != nil {
attrs = append(attrs, attribute.String("tracetest.run.trace_poller.error", err.Error()))
span.RecordError(err)
}

span.SetAttributes(attrs...)
return finished, run, err
return finished, finishReason, run, err
}

func NewPollerExecutor(
Expand All @@ -59,13 +67,15 @@ func NewPollerExecutor(
updater RunUpdater,
newTraceDBFn traceDBFactoryFn,
dsRepo model.DataStoreRepository,
eventEmitter EventEmitter,
) PollerExecutor {

pollerExecutor := &DefaultPollerExecutor{
ppGetter: ppGetter,
updater: updater,
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
eventEmitter: eventEmitter,
}

return &InstrumentedPollerExecutor{
Expand All @@ -88,21 +98,52 @@ func (pe DefaultPollerExecutor) traceDB(ctx context.Context) (tracedb.TraceDB, e
return tdb, nil
}

func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, model.Run, error) {
func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, string, model.Run, error) {
log.Printf("[PollerExecutor] Test %s Run %d: ExecuteRequest\n", request.test.ID, request.run.ID)
run := request.run

traceDB, err := pe.traceDB(request.ctx)
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: GetDataStore error: %s\n", request.test.ID, request.run.ID, err.Error())
return false, model.Run{}, err
return false, "", model.Run{}, err
}

if request.IsFirstRequest() {
connectionResult := traceDB.TestConnection(request.ctx)

err = pe.eventEmitter.Emit(request.ctx, events.TraceDataStoreConnectionInfo(request.test.ID, request.run.ID, connectionResult))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceDataStoreConnectionInfo event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}
}

err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingStart(request.test.ID, request.run.ID))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingStart event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}

traceID := run.TraceID.String()
trace, err := traceDB.GetTraceByID(request.ctx, traceID)
if err != nil {
connectionResult := model.ConnectionResult{}

if !errors.Is(err, connection.ErrTraceNotFound) {
// run test connection to give a diagnostic when an unknown error happens
connectionResult = traceDB.TestConnection(request.ctx)
}

anotherErr := pe.eventEmitter.Emit(request.ctx, events.TraceFetchingError(request.test.ID, request.run.ID, connectionResult, err))
if anotherErr != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingError event: error: %s\n", request.test.ID, request.run.ID, anotherErr.Error())
}

log.Printf("[PollerExecutor] Test %s Run %d: GetTraceByID (traceID %s) error: %s\n", request.test.ID, request.run.ID, traceID, err.Error())
return false, model.Run{}, err
return false, "", model.Run{}, err
}

err = pe.eventEmitter.Emit(request.ctx, events.TraceFetchingSuccess(request.test.ID, request.run.ID))
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: failed to emit TraceFetchingSuccess event: error: %s\n", request.test.ID, request.run.ID, err.Error())
}

trace.ID = run.TraceID
Expand All @@ -113,7 +154,7 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, m
log.Printf("[PollerExecutor] Test %s Run %d: Not done polling. (%s)\n", request.test.ID, request.run.ID, reason)
run.Trace = &trace
request.run = run
return false, run, nil
return false, "", run, nil
}

log.Printf("[PollerExecutor] Test %s Run %d: Done polling. (%s)\n", request.test.ID, request.run.ID, reason)
Expand All @@ -138,10 +179,10 @@ func (pe DefaultPollerExecutor) ExecuteRequest(request *PollingRequest) (bool, m
err = pe.updater.Update(request.ctx, run)
if err != nil {
log.Printf("[PollerExecutor] Test %s Run %d: Update error: %s\n", request.test.ID, request.run.ID, err.Error())
return false, model.Run{}, err
return false, "", model.Run{}, err
}

return true, run, nil
return true, reason, run, nil
}

func (pe DefaultPollerExecutor) donePollingTraces(job *PollingRequest, traceDB tracedb.TraceDB, trace model.Trace) (bool, string) {
Expand Down
31 changes: 18 additions & 13 deletions server/executor/poller_executor_test.go
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/id"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/testdb"
"github.com/kubeshop/tracetest/server/tracedb"
"github.com/kubeshop/tracetest/server/tracedb/connection"
"github.com/kubeshop/tracetest/server/tracing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -455,15 +457,17 @@ func executeAndValidatePollingRequests(t *testing.T, pollerExecutor executor.Pol
for i, value := range expectedValues {
request := executor.NewPollingRequest(ctx, test, run, i)

finished, anotherRun, err := pollerExecutor.ExecuteRequest(request)
finished, finishReason, anotherRun, err := pollerExecutor.ExecuteRequest(request)
run = anotherRun // should store a run to use in another iteration

require.NotNilf(t, run, "The test run should not be nil on iteration %d", i)

if value.finished {
require.Truef(t, finished, "The poller should have finished on iteration %d", i)
require.NotEmptyf(t, finishReason, "The poller should not have finish reason on iteration %d", i)
} else {
require.Falsef(t, finished, "The poller should have not finished on iteration %d", i)
require.Emptyf(t, finishReason, "The poller should have finish reason on iteration %d", i)
}

if value.expectNoTraceError {
Expand Down Expand Up @@ -505,13 +509,15 @@ func getPollerExecutorWithMocks(t *testing.T, retryDelay, maxWaitTimeForTrace ti
tracer := getTracerMock(t)
testDB := getDataStoreRepositoryMock(t)
traceDBFactory := getTraceDBMockFactory(t, tracePerIteration, &traceDBState{})
eventEmitter := getEventEmitterMock(t, testDB)

return executor.NewPollerExecutor(
defaultProfileGetter{retryDelay, maxWaitTimeForTrace},
tracer,
updater,
traceDBFactory,
testDB,
eventEmitter,
)
}

Expand All @@ -527,23 +533,22 @@ func getRunUpdaterMock(t *testing.T) executor.RunUpdater {
}

// DataStoreRepository
type dataStoreRepositoryMock struct {
testdb.MockRepository
// ...
}
func getDataStoreRepositoryMock(t *testing.T) model.Repository {
t.Helper()

testDB := testdb.MockRepository{}

func (m dataStoreRepositoryMock) DefaultDataStore(_ context.Context) (model.DataStore, error) {
return model.DataStore{}, nil
testDB.Mock.On("DefaultDataStore", mock.Anything).Return(model.DataStore{Type: model.DataStoreTypeOTLP}, nil)
testDB.Mock.On("CreateTestRunEvent", mock.Anything).Return(noError)

return &testDB
}

func getDataStoreRepositoryMock(t *testing.T) model.Repository {
// EventEmitter
func getEventEmitterMock(t *testing.T, db model.Repository) executor.EventEmitter {
t.Helper()

mock := new(dataStoreRepositoryMock)
mock.T = t
mock.Test(t)

return mock
return executor.NewEventEmitter(db, subscription.NewManager())
}

// Tracer
Expand Down

0 comments on commit 61fdf00

Please sign in to comment.