Skip to content

Commit

Permalink
feat(server): adding event emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
danielbdias committed Mar 29, 2023
1 parent a7ad995 commit 96ad075
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 69 deletions.
72 changes: 4 additions & 68 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"log"
"net/http"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/kubeshop/tracetest/server/config"
"github.com/kubeshop/tracetest/server/config/configresource"
"github.com/kubeshop/tracetest/server/config/demoresource"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
httpServer "github.com/kubeshop/tracetest/server/http"
Expand Down Expand Up @@ -142,16 +140,16 @@ func (app *App) Start(opts ...appOption) error {
testdb.WithDB(db),
)

if err != nil {
log.Fatal(err)
}

subscriptionManager := subscription.NewManager()
app.subscribeToConfigChanges(subscriptionManager)

configRepo := configresource.NewRepository(db, configresource.WithPublisher(subscriptionManager))
configFromDB := configRepo.Current(ctx)

if err != nil {
log.Fatal(err)
}

tracer, err := tracing.NewTracer(ctx, app.cfg)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -321,68 +319,6 @@ func registerDemosResource(repository *demoresource.Repository, router *mux.Rout
provisioner.AddResourceProvisioner(manager)
}

type facadeConfig interface {
PoolingRetryDelay() time.Duration
PoolingMaxWaitTimeForTraceDuration() time.Duration
}

func newRunnerFacades(
ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(runner, testDB, subscriptionManager)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}

func getTriggerRegistry(tracer, appTracer trace.Tracer) *trigger.Registry {
triggerReg := trigger.NewRegsitry(tracer, appTracer)
triggerReg.Add(trigger.HTTP())
Expand Down
72 changes: 72 additions & 0 deletions server/app/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import (
"context"

"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/event"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type runnerFacade struct {
Expand All @@ -25,3 +31,69 @@ func (rf runnerFacade) RunTransaction(ctx context.Context, tr model.Transaction,
func (rf runnerFacade) RunAssertions(ctx context.Context, request executor.AssertionRequest) {
rf.assertionRunner.RunAssertions(ctx, request)
}

func newRunnerFacades(
ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {
eventEmitter := event.NewEmitter(testDB, subscriptionManager)

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
eventEmitter,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
eventEmitter,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
eventEmitter,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
eventEmitter,
)

transactionRunner := executor.NewTransactionRunner(
runner,
testDB,
subscriptionManager,
)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}
2 changes: 2 additions & 0 deletions server/executor/assertion_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"time"

"github.com/kubeshop/tracetest/server/executor/event"
"github.com/kubeshop/tracetest/server/expression"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
Expand Down Expand Up @@ -46,6 +47,7 @@ func NewAssertionRunner(
assertionExecutor AssertionExecutor,
op OutputsProcessorFn,
subscriptionManager *subscription.Manager,
eventEmitter event.Emitter,
) AssertionRunner {
return &defaultAssertionRunner{
outputsProcessor: op,
Expand Down
40 changes: 40 additions & 0 deletions server/executor/event/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package event

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/server/model"
)

type Emitter interface {
Emit(ctx context.Context, event model.TestRunEvent) error
}

type publisher interface {
Publish(eventID string, message any)
}

type internalEmitter struct {
repository model.TestRunEventRepository
publisher publisher
}

func NewEmitter(repository model.TestRunEventRepository, publisher publisher) Emitter {
return &internalEmitter{
repository: repository,
publisher: publisher,
}
}

func (em *internalEmitter) Emit(ctx context.Context, event model.TestRunEvent) error {
err := em.repository.CreateTestRunEvent(ctx, event)
if err != nil {
return err
}

eventIDAsString := fmt.Sprintf("%d", event.ID)
em.publisher.Publish(eventIDAsString, event)

return nil
}
2 changes: 2 additions & 0 deletions server/executor/poller_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"

"github.com/kubeshop/tracetest/server/executor/event"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -59,6 +60,7 @@ func NewPollerExecutor(
updater RunUpdater,
newTraceDBFn traceDBFactoryFn,
dsRepo model.DataStoreRepository,
eventEmitter event.Emitter,
) PollerExecutor {

pollerExecutor := &DefaultPollerExecutor{
Expand Down
17 changes: 17 additions & 0 deletions server/executor/poller_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/kubeshop/tracetest/server/config"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/event"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/id"
"github.com/kubeshop/tracetest/server/model"
Expand Down Expand Up @@ -505,13 +506,15 @@ func getPollerExecutorWithMocks(t *testing.T, retryDelay, maxWaitTimeForTrace ti
tracer := getTracerMock(t)
testDB := getDataStoreRepositoryMock(t)
traceDBFactory := getTraceDBMockFactory(t, tracePerIteration, &traceDBState{})
eventEmitter := getEventEmitterMock(t)

return executor.NewPollerExecutor(
defaultProfileGetter{retryDelay, maxWaitTimeForTrace},
tracer,
updater,
traceDBFactory,
testDB,
eventEmitter,
)
}

Expand Down Expand Up @@ -556,6 +559,20 @@ func getTracerMock(t *testing.T) trace.Tracer {
return tracer
}

// EventEmitter
type eventEmitterMock struct {
}

func (em *eventEmitterMock) Emit(ctx context.Context, event model.TestRunEvent) error {
return nil
}

func getEventEmitterMock(t *testing.T) event.Emitter {
t.Helper()

return &eventEmitterMock{}
}

// TraceDB
type traceDBMock struct {
tracePerIteration []model.Trace
Expand Down
2 changes: 2 additions & 0 deletions server/executor/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/kubeshop/tracetest/server/executor/event"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/expression"
"github.com/kubeshop/tracetest/server/model"
Expand Down Expand Up @@ -37,6 +38,7 @@ func NewPersistentRunner(
subscriptionManager *subscription.Manager,
newTraceDBFn traceDBFactoryFn,
dsRepo model.DataStoreRepository,
eventEmitter event.Emitter,
) PersistentRunner {
return persistentRunner{
triggers: triggers,
Expand Down
4 changes: 3 additions & 1 deletion server/executor/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ func runnerSetup(t *testing.T) runnerFixture {

testDB.Mock.On("DefaultDataStore", mock.Anything).Return(model.DataStore{Type: model.DataStoreTypeOTLP}, nil)

eventEmitter := getEventEmitterMock(t)

mtp.Test(t)
return runnerFixture{
runner: executor.NewPersistentRunner(reg, db, executor.NewDBUpdater(db), mtp, tracer, subscription.NewManager(), tracedb.Factory(&testDB), &testDB),
runner: executor.NewPersistentRunner(reg, db, executor.NewDBUpdater(db), mtp, tracer, subscription.NewManager(), tracedb.Factory(&testDB), &testDB, eventEmitter),
mockExecutor: me,
mockDB: db,
mockTracePoller: mtp,
Expand Down
2 changes: 2 additions & 0 deletions server/executor/trace_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"time"

"github.com/kubeshop/tracetest/server/executor/event"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
Expand Down Expand Up @@ -41,6 +42,7 @@ func NewTracePoller(
updater RunUpdater,
assertionRunner AssertionRunner,
subscriptionManager *subscription.Manager,
eventEmitter event.Emitter,
) PersistentTracePoller {
return tracePoller{
updater: updater,
Expand Down

0 comments on commit 96ad075

Please sign in to comment.