Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
danielbdias committed Mar 28, 2023
1 parent c9db177 commit 47af86d
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 68 deletions.
77 changes: 9 additions & 68 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"log"
"net/http"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
Expand All @@ -16,7 +15,7 @@ import (
"github.com/kubeshop/tracetest/server/config"
"github.com/kubeshop/tracetest/server/config/configresource"
"github.com/kubeshop/tracetest/server/config/demoresource"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/event"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
httpServer "github.com/kubeshop/tracetest/server/http"
Expand Down Expand Up @@ -142,16 +141,16 @@ func (app *App) Start(opts ...appOption) error {
testdb.WithDB(db),
)

if err != nil {
log.Fatal(err)
}

subscriptionManager := subscription.NewManager()
app.subscribeToConfigChanges(subscriptionManager)

configRepo := configresource.NewRepository(db, configresource.WithPublisher(subscriptionManager))
configFromDB := configRepo.Current(ctx)

if err != nil {
log.Fatal(err)
}

tracer, err := tracing.NewTracer(ctx, app.cfg)
if err != nil {
log.Fatal(err)
Expand All @@ -161,6 +160,9 @@ func (app *App) Start(opts ...appOption) error {
tracing.ShutdownTracer(ctx)
})

eventRepository := event.NewRepository(db)
eventEmitter := event.NewEmitter(eventRepository, subscriptionManager)

serverID, isNewInstall, err := testDB.ServerID()
if err != nil {
return err
Expand Down Expand Up @@ -196,6 +198,7 @@ func (app *App) Start(opts ...appOption) error {
tracer,
subscriptionManager,
triggerRegistry,
eventEmitter,
)

// worker count. should be configurable
Expand Down Expand Up @@ -321,68 +324,6 @@ func registerDemosResource(repository *demoresource.Repository, router *mux.Rout
provisioner.AddResourceProvisioner(manager)
}

type facadeConfig interface {
PoolingRetryDelay() time.Duration
PoolingMaxWaitTimeForTraceDuration() time.Duration
}

func newRunnerFacades(
ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
) *runnerFacade {

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(runner, testDB, subscriptionManager)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}

func getTriggerRegistry(tracer, appTracer trace.Tracer) *trigger.Registry {
triggerReg := trigger.NewRegsitry(tracer, appTracer)
triggerReg.Add(trigger.HTTP())
Expand Down
64 changes: 64 additions & 0 deletions server/app/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import (
"context"

"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/event"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type runnerFacade struct {
Expand All @@ -25,3 +31,61 @@ func (rf runnerFacade) RunTransaction(ctx context.Context, tr model.Transaction,
func (rf runnerFacade) RunAssertions(ctx context.Context, request executor.AssertionRequest) {
rf.assertionRunner.RunAssertions(ctx, request)
}

func newRunnerFacades(
ppRepo *pollingprofile.Repository,
testDB model.Repository,
appTracer trace.Tracer,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
triggerRegistry *trigger.Registry,
eventEmitter *event.Emitter,
) *runnerFacade {

execTestUpdater := (executor.CompositeUpdater{}).
Add(executor.NewDBUpdater(testDB)).
Add(executor.NewSubscriptionUpdater(subscriptionManager))

assertionRunner := executor.NewAssertionRunner(
execTestUpdater,
executor.NewAssertionExecutor(tracer),
executor.InstrumentedOutputProcessor(tracer),
subscriptionManager,
)

pollerExecutor := executor.NewPollerExecutor(
ppRepo,
tracer,
execTestUpdater,
tracedb.Factory(testDB),
testDB,
)

tracePoller := executor.NewTracePoller(
pollerExecutor,
ppRepo,
execTestUpdater,
assertionRunner,
subscriptionManager,
)

runner := executor.NewPersistentRunner(
triggerRegistry,
testDB,
execTestUpdater,
tracePoller,
tracer,
subscriptionManager,
tracedb.Factory(testDB),
testDB,
)

transactionRunner := executor.NewTransactionRunner(runner, testDB, subscriptionManager)

return &runnerFacade{
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
tracePoller: tracePoller,
}
}
23 changes: 23 additions & 0 deletions server/executor/event/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package event

import "context"

type Emitter struct {
repository *Repository
publisher publisher
}

type publisher interface {
Publish(eventID string, message any)
}

func NewEmitter(repository *Repository, publisher publisher) *Emitter {
return &Emitter{
repository: repository,
publisher: publisher,
}
}

func (em *Emitter) Emit(ctx context.Context, event Event) {

}
5 changes: 5 additions & 0 deletions server/executor/event/entity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package event

type Event struct {
Name string
}
20 changes: 20 additions & 0 deletions server/executor/event/repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package event

import (
"context"
"database/sql"
)

type Repository struct {
db *sql.DB
}

func NewRepository(db *sql.DB) *Repository {
return &Repository{
db: db,
}
}

func (r *Repository) Insert(ctx context.Context, event Event) (Event, error) {
return event, nil
}

0 comments on commit 47af86d

Please sign in to comment.