Skip to content

Commit

Permalink
feat(server): refactor runner channels into abstract queues (#2971)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Jul 27, 2023
1 parent b49fe7a commit f9e4bc2
Show file tree
Hide file tree
Showing 44 changed files with 1,826 additions and 1,488 deletions.
153 changes: 12 additions & 141 deletions go.work.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions local-config/tracetest.provision.yaml
Expand Up @@ -21,6 +21,7 @@ spec:
periodic:
timeout: 30s
retryDelay: 1s
selectorMatchRetries: 3
---
type: TestRunner
spec:
Expand Down
4 changes: 4 additions & 0 deletions server/Makefile
Expand Up @@ -7,6 +7,7 @@ GO_LDFLAGS := $(shell echo \
-X "'github.com/kubeshop/tracetest/server/analytics.FrontendKey=$(ANALYTICS_FE_KEY)'" \
| sed 's/ / /g')

.PHONY: help
help: Makefile ## show list of commands
@echo "Choose a command run:"
@echo ""
Expand All @@ -18,12 +19,15 @@ init-submodule:
git submodule init
git submodule update

.PHONY: test
test: ## run go tests for this application
go test -timeout 150s -coverprofile=coverage.out ./...

.PHONY: vet
vet: ## run vet tool to analyze the code for suspicious, abnormal, or useless code
go vet -structtag=false ./...

.PHONY: run
run: ## run server locally
go run -ldflags="$(GO_LDFLAGS)" main.go serve

Expand Down
102 changes: 56 additions & 46 deletions server/app/app.go
Expand Up @@ -208,44 +208,33 @@ func (app *App) Start(opts ...appOption) error {
eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager)
registerOtlpServer(app, runRepo, eventEmitter, dataStoreRepo)

rf := newRunnerFacades(
testPipeline := buildTestPipeline(
pollingProfileRepo,
dataStoreRepo,
linterRepo,
testRunnerRepo,
testDB,
testRepo,
runRepo,
transactionRunRepository,
applicationTracer,
tracer,
subscriptionManager,
triggerRegistry,
)

// worker count. should be configurable
rf.tracePoller.Start(5)
rf.runner.Start(5)
rf.runner.Start(5)
rf.transactionRunner.Start(5)
rf.assertionRunner.Start(5)
rf.linterRunner.Start(5)

app.registerStopFn(func() {
fmt.Println("stopping tracePoller")
rf.tracePoller.Stop()
})
app.registerStopFn(func() {
fmt.Println("stopping runner")
rf.runner.Stop()
})
testPipeline.Start()
app.registerStopFn(func() {
fmt.Println("stopping transactionRunner")
rf.transactionRunner.Stop()
testPipeline.Stop()
})

transactionPipeline := buildTransactionPipeline(
transactionsRepository,
transactionRunRepository,
testPipeline,
subscriptionManager,
)

transactionPipeline.Start()
app.registerStopFn(func() {
fmt.Println("stopping assertionRunner")
rf.assertionRunner.Stop()
transactionPipeline.Stop()
})

err = analytics.SendEvent("Server Started", "beacon", "", nil)
Expand All @@ -256,14 +245,17 @@ func (app *App) Start(opts ...appOption) error {
provisioner := provisioning.New()

router, mappers := controller(app.cfg,
tracer,

testPipeline,
transactionPipeline,

testDB,
transactionsRepository,
transactionRunRepository,
testRepo,
runRepo,
tracer,
environmentRepo,
rf,
)
registerWSHandler(router, mappers, subscriptionManager)

Expand Down Expand Up @@ -314,7 +306,7 @@ func (app *App) Start(opts ...appOption) error {
var (
matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)")
matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")
matchResourceName = regexp.MustCompile("(\\w)(\\.)(\\w)")
matchResourceName = regexp.MustCompile(`(\w)(\.)(\w)`)
)

func toWords(str string) string {
Expand Down Expand Up @@ -506,27 +498,36 @@ func registerWSHandler(router *mux.Router, mappers mappings.Mappings, subscripti

func controller(
cfg httpServerConfig,
testDB model.Repository,
transactionRepository *transaction.Repository,
transactionRunRepository *transaction.RunRepository,
testRepository test.Repository,
runRepository test.RunRepository,

tracer trace.Tracer,

testRunner *executor.TestPipeline,
transactionRunner *executor.TransactionPipeline,

testRunEvents model.TestRunEventRepository,
transactionRepo *transaction.Repository,
transactionRunRepo *transaction.RunRepository,
testRepo test.Repository,
testRunRepo test.RunRepository,
environmentRepo *environment.Repository,
rf *runnerFacade,
) (*mux.Router, mappings.Mappings) {
mappers := mappings.New(tracesConversionConfig(), comparator.DefaultRegistry())

router := openapi.NewRouter(httpRouter(
cfg,
testDB,
transactionRepository,
transactionRunRepository,
testRepository,
runRepository,

tracer,

testRunner,
transactionRunner,

testRunEvents,
transactionRepo,
transactionRunRepo,
testRepo,
testRunRepo,
environmentRepo,
rf,

mappers,
))

Expand All @@ -535,27 +536,36 @@ func controller(

func httpRouter(
cfg httpServerConfig,
testDB model.Repository,

tracer trace.Tracer,

testRunner *executor.TestPipeline,
transactionRunner *executor.TransactionPipeline,

testRunEvents model.TestRunEventRepository,
transactionRepo *transaction.Repository,
transactionRunRepo *transaction.RunRepository,
testRepo test.Repository,
testRunRepo test.RunRepository,
tracer trace.Tracer,
environmentRepo *environment.Repository,
rf *runnerFacade,

mappers mappings.Mappings,
) openapi.Router {
controller := httpServer.NewController(
testDB,
tracer,

testRunner,
transactionRunner,

testRunEvents,
transactionRepo,
transactionRunRepo,
testRepo,
testRunRepo,
environmentRepo,

tracedb.Factory(testRunRepo),
rf,
mappers,
environmentRepo,
tracer,
Version,
)
apiApiController := openapi.NewApiApiController(controller)
Expand Down
140 changes: 0 additions & 140 deletions server/app/facade.go

This file was deleted.

0 comments on commit f9e4bc2

Please sign in to comment.