Skip to content

Commit

Permalink
feat: add trigger resolve and execution events (#2281)
Browse files Browse the repository at this point in the history
* add trigger resolve and execution events

* fix mock

* Fix event creation
  • Loading branch information
mathnogueira committed Mar 31, 2023
1 parent 174a398 commit bfa19de
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 8 deletions.
1 change: 1 addition & 0 deletions server/app/facade.go
Expand Up @@ -78,6 +78,7 @@ func newRunnerFacades(
subscriptionManager,
tracedb.Factory(testDB),
testDB,
eventEmitter,
)

transactionRunner := executor.NewTransactionRunner(
Expand Down
52 changes: 48 additions & 4 deletions server/executor/runner.go
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/expression"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -37,6 +38,7 @@ func NewPersistentRunner(
subscriptionManager *subscription.Manager,
newTraceDBFn traceDBFactoryFn,
dsRepo model.DataStoreRepository,
eventEmitter EventEmitter,
) PersistentRunner {
return persistentRunner{
triggers: triggers,
Expand All @@ -47,6 +49,7 @@ func NewPersistentRunner(
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
subscriptionManager: subscriptionManager,
eventEmitter: eventEmitter,
executeQueue: make(chan execReq, 5),
exit: make(chan bool, 1),
}
Expand All @@ -61,6 +64,7 @@ type persistentRunner struct {
subscriptionManager *subscription.Manager
newTraceDBFn traceDBFactoryFn
dsRepo model.DataStoreRepository
eventEmitter EventEmitter

executeQueue chan execReq
exit chan bool
Expand All @@ -81,6 +85,12 @@ func (r persistentRunner) handleDBError(run model.Run, err error) {
}
}

func (r persistentRunner) handleError(run model.Run, err error) {
if err != nil {
fmt.Printf("test %s run #%d trigger DB error: %s\n", run.TestID, run.ID, err.Error())
}
}

func (r persistentRunner) Start(workers int) {
for i := 0; i < workers; i++ {
go func() {
Expand Down Expand Up @@ -160,15 +170,19 @@ func (r persistentRunner) processExecQueue(job execReq) {
run := job.run.Start()
r.handleDBError(run, r.updater.Update(job.ctx, run))

err := r.eventEmitter.Emit(job.ctx, events.TriggerCreatedInfo(job.run.TestID, job.run.ID))
if err != nil {
r.handleError(job.run, err)
}

triggerer, err := r.triggers.Get(job.test.ServiceUnderTest.Type)
if err != nil {
// TODO: actually handle the error
panic(err)
r.handleError(job.run, err)
}

tdb, err := r.traceDB(job.ctx)
if err != nil {
panic(err)
r.handleError(job.run, err)
}

traceID := tdb.GetTraceID()
Expand All @@ -180,9 +194,24 @@ func (r persistentRunner) processExecQueue(job execReq) {
Executor: job.executor,
}

err = r.eventEmitter.Emit(job.ctx, events.TriggerResolveStart(job.run.TestID, job.run.ID))
if err != nil {
r.handleError(job.run, err)
}

resolvedTest, err := triggerer.Resolve(job.ctx, job.test, triggerOptions)
if err != nil {
panic(err)
emitErr := r.eventEmitter.Emit(job.ctx, events.TriggerResolveError(job.run.TestID, job.run.ID, err))
if emitErr != nil {
r.handleError(job.run, emitErr)
}

r.handleError(job.run, err)
}

err = r.eventEmitter.Emit(job.ctx, events.TriggerResolveSuccess(job.run.TestID, job.run.ID))
if err != nil {
r.handleError(job.run, err)
}

if job.test.ServiceUnderTest.Type == model.TriggerTypeTRACEID {
Expand All @@ -192,15 +221,30 @@ func (r persistentRunner) processExecQueue(job execReq) {
}
}

err = r.eventEmitter.Emit(job.ctx, events.TriggerExecutionStart(job.run.TestID, job.run.ID))
if err != nil {
r.handleError(job.run, err)
}

response, err := triggerer.Trigger(job.ctx, resolvedTest, triggerOptions)
run = r.handleExecutionResult(run, response, err)
if err != nil {
emitErr := r.eventEmitter.Emit(job.ctx, events.TriggerExecutionError(job.run.TestID, job.run.ID, err))
if emitErr != nil {
r.handleError(job.run, emitErr)
}

fmt.Printf("test %s run #%d trigger error: %s\n", run.TestID, run.ID, err.Error())
r.subscriptionManager.PublishUpdate(subscription.Message{
ResourceID: run.TransactionStepResourceID(),
Type: "run_update",
Content: RunResult{Run: run, Err: err},
})
} else {
err = r.eventEmitter.Emit(job.ctx, events.TriggerExecutionSuccess(job.run.TestID, job.run.ID))
if err != nil {
r.handleDBError(job.run, err)
}
}

run.SpanID = response.SpanID
Expand Down
6 changes: 4 additions & 2 deletions server/executor/runner_test.go
Expand Up @@ -48,7 +48,6 @@ func TestPersistentRunner(t *testing.T) {
test2 := model.Test{ID: id.ID("test2"), ServiceUnderTest: sampleTrigger}

f := runnerSetup(t)

f.expectSuccessExecLong(test1)
f.expectSuccessExec(test2)

Expand Down Expand Up @@ -145,10 +144,13 @@ func runnerSetup(t *testing.T) runnerFixture {
testDB := testdb.MockRepository{}

testDB.Mock.On("DefaultDataStore", mock.Anything).Return(model.DataStore{Type: model.DataStoreTypeOTLP}, nil)
testDB.Mock.On("CreateTestRunEvent", mock.Anything).Return(noError)

eventEmitter := executor.NewEventEmitter(&testDB, subscription.NewManager())

mtp.Test(t)
return runnerFixture{
runner: executor.NewPersistentRunner(reg, db, executor.NewDBUpdater(db), mtp, tracer, subscription.NewManager(), tracedb.Factory(&testDB), &testDB),
runner: executor.NewPersistentRunner(reg, db, executor.NewDBUpdater(db), mtp, tracer, subscription.NewManager(), tracedb.Factory(&testDB), &testDB, eventEmitter),
mockExecutor: me,
mockDB: db,
mockTracePoller: mtp,
Expand Down
19 changes: 17 additions & 2 deletions server/model/events/events.go
Expand Up @@ -23,14 +23,14 @@ func TriggerCreatedInfo(testID id.ID, runID int) model.TestRunEvent {
}
}

func TriggerResolveError(testID id.ID, runID int) model.TestRunEvent {
func TriggerResolveError(testID id.ID, runID int, err error) model.TestRunEvent {
return model.TestRunEvent{
TestID: testID,
RunID: runID,
Stage: model.StageTrigger,
Type: "RESOLVE_ERROR",
Title: "Resolving trigger details failed",
Description: "Resolving trigger details failed",
Description: fmt.Sprintf("Resolving trigger details failed: %s", err.Error()),
CreatedAt: time.Now(),
DataStoreConnection: model.ConnectionResult{},
Polling: model.PollingInfo{},
Expand Down Expand Up @@ -98,6 +98,21 @@ func TriggerExecutionSuccess(testID id.ID, runID int) model.TestRunEvent {
}
}

func TriggerExecutionError(testID id.ID, runID int, err error) model.TestRunEvent {
return model.TestRunEvent{
TestID: testID,
RunID: runID,
Stage: model.StageTrigger,
Type: "EXECUTION_ERROR",
Title: "Failed to trigger execution",
Description: fmt.Sprintf("Failed to trigger execution: %s", err.Error()),
CreatedAt: time.Now(),
DataStoreConnection: model.ConnectionResult{},
Polling: model.PollingInfo{},
Outputs: []model.OutputInfo{},
}
}

func TriggerHTTPUnreachableHostError(testID id.ID, runID int) model.TestRunEvent {
return model.TestRunEvent{
TestID: testID,
Expand Down

0 comments on commit bfa19de

Please sign in to comment.