Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): adding event emitter #2269

Merged
merged 8 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion local-config/tracetest.provision.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
dataStore:
---
type: DataStore
spec:
name: OpenTelemetry Collector
type: otlp
isdefault: true
73 changes: 4 additions & 69 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"log"
"net/http"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/kubeshop/tracetest/server/config"
"github.com/kubeshop/tracetest/server/config/configresource"
"github.com/kubeshop/tracetest/server/config/demoresource"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
httpServer "github.com/kubeshop/tracetest/server/http"
Expand Down Expand Up @@ -142,16 +140,16 @@ func (app *App) Start(opts ...appOption) error {
testdb.WithDB(db),
)

if err != nil {
log.Fatal(err)
}

subscriptionManager := subscription.NewManager()
app.subscribeToConfigChanges(subscriptionManager)

configRepo := configresource.NewRepository(db, configresource.WithPublisher(subscriptionManager))
configFromDB := configRepo.Current(ctx)

if err != nil {
log.Fatal(err)
}

tracer, err := tracing.NewTracer(ctx, app.cfg)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -201,7 +199,6 @@ func (app *App) Start(opts ...appOption) error {
// worker count. should be configurable
rf.tracePoller.Start(5)
rf.runner.Start(5)
rf.runner.Start(5)
rf.transactionRunner.Start(5)
rf.assertionRunner.Start(5)

Expand Down Expand Up @@ -321,68 +318,6 @@ func registerDemosResource(repository *demoresource.Repository, router *mux.Rout
provisioner.AddResourceProvisioner(manager)
}

type facadeConfig interface {
PoolingRetryDelay() time.Duration
PoolingMaxWaitTimeForTraceDuration() time.Duration
}

func newRunnerFacades(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved to ./server/app/facade.go to keep all concerns about the façade there.

ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(runner, testDB, subscriptionManager)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}

func getTriggerRegistry(tracer, appTracer trace.Tracer) *trigger.Registry {
triggerReg := trigger.NewRegsitry(tracer, appTracer)
triggerReg.Add(trigger.HTTP())
Expand Down
68 changes: 68 additions & 0 deletions server/app/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ import (
"context"

"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type runnerFacade struct {
Expand All @@ -25,3 +30,66 @@ func (rf runnerFacade) RunTransaction(ctx context.Context, tr model.Transaction,
func (rf runnerFacade) RunAssertions(ctx context.Context, request executor.AssertionRequest) {
rf.assertionRunner.RunAssertions(ctx, request)
}

func newRunnerFacades(
ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {
eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager)

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
eventEmitter,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(
runner,
testDB,
subscriptionManager,
)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}
1 change: 1 addition & 0 deletions server/executor/assertion_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewAssertionRunner(
assertionExecutor AssertionExecutor,
op OutputsProcessorFn,
subscriptionManager *subscription.Manager,
eventEmitter EventEmitter,
) AssertionRunner {
return &defaultAssertionRunner{
outputsProcessor: op,
Expand Down
38 changes: 38 additions & 0 deletions server/executor/eventemitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package executor

import (
"context"

"github.com/kubeshop/tracetest/server/model"
)

type EventEmitter interface {
Emit(ctx context.Context, event model.TestRunEvent) error
}

type publisher interface {
Publish(eventID string, message any)
}

type internalEventEmitter struct {
repository model.TestRunEventRepository
publisher publisher
}

func NewEventEmitter(repository model.TestRunEventRepository, publisher publisher) EventEmitter {
return &internalEventEmitter{
repository: repository,
publisher: publisher,
}
}

func (em *internalEventEmitter) Emit(ctx context.Context, event model.TestRunEvent) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the next PRs we can evolve this method to also manage/start trace spans with each phase that we designed.

err := em.repository.CreateTestRunEvent(ctx, event)
if err != nil {
return err
}

em.publisher.Publish(event.ResourceID(), event)

return nil
}