Skip to content

Commit

Permalink
feat(server): add nats subscription manager (#3522)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Jan 15, 2024
1 parent 7373145 commit 29ae31c
Show file tree
Hide file tree
Showing 30 changed files with 704 additions and 357 deletions.
3 changes: 0 additions & 3 deletions agent/workers/poller_test.go
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/collector"
Expand Down Expand Up @@ -72,8 +71,6 @@ func TestPollerWorker(t *testing.T) {
}
}

spew.Dump(pollingResponse)

assert.Len(t, spans, 2)
assert.Equal(t, "", spans[0].ParentId)
assert.Equal(t, spans[0].Id, spans[1].ParentId)
Expand Down
23 changes: 12 additions & 11 deletions server/app/app.go
Expand Up @@ -120,12 +120,13 @@ func provision(provisioner *provisioning.Provisioner, file string) {
fmt.Println("[Provisioning]: success")
}

func (app *App) subscribeToConfigChanges(sm *subscription.Manager) {
func (app *App) subscribeToConfigChanges(sm subscription.Manager) {
sm.Subscribe(config.ResourceID, subscription.NewSubscriberFunction(
func(m subscription.Message) error {
configFromDB, ok := m.Content.(config.Config)
if !ok {
return fmt.Errorf("cannot read update to configFromDB. unexpected type %T", m.Content)
configFromDB := config.Config{}
err := m.DecodeContent(&configFromDB)
if err != nil {
return fmt.Errorf("cannot read update to configFromDB: %w", err)
}

return app.initAnalytics(configFromDB)
Expand Down Expand Up @@ -174,7 +175,12 @@ func (app *App) Start(opts ...appOption) error {
log.Fatal(err)
}

subscriptionManager := subscription.NewManager()
natsConn, err := nats.Connect(app.cfg.NATSEndpoint())
if err != nil {
log.Printf("could not connect to NATS: %s. Defaulting to InMemory Queues", err)
}

subscriptionManager := subscription.NewManager(subscription.WithNats(natsConn))
app.subscribeToConfigChanges(subscriptionManager)

configRepo := config.NewRepository(db, config.WithPublisher(subscriptionManager))
Expand Down Expand Up @@ -241,11 +247,6 @@ func (app *App) Start(opts ...appOption) error {
registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer)
}

natsConn, err := nats.Connect(app.cfg.NATSEndpoint())
if err != nil {
log.Printf("could not connect to NATS: %s. Defaulting to InMemory Queues", err)
}

executorDriverFactory := pipeline.NewDriverFactory[executor.Job](natsConn)
testPipeline := buildTestPipeline(
executorDriverFactory,
Expand Down Expand Up @@ -527,7 +528,7 @@ type httpServerConfig interface {
ExperimentalFeatures() []string
}

func registerWSHandler(router *mux.Router, mappers mappings.Mappings, subscriptionManager *subscription.Manager) {
func registerWSHandler(router *mux.Router, mappers mappings.Mappings, subscriptionManager subscription.Manager) {
wsRouter := websocket.NewRouter()
wsRouter.Add("subscribe", websocket.NewSubscribeCommandExecutor(subscriptionManager, mappers))
wsRouter.Add("unsubscribe", websocket.NewUnsubscribeCommandExecutor(subscriptionManager))
Expand Down
2 changes: 1 addition & 1 deletion server/app/test_pipeline.go
Expand Up @@ -30,7 +30,7 @@ func buildTestPipeline(
testRepo test.Repository,
runRepo test.RunRepository,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
triggerRegistry *trigger.Registry,
tracedbFactory tracedb.FactoryFunc,
appConfig *config.AppConfig,
Expand Down
2 changes: 1 addition & 1 deletion server/app/test_suite_pipeline.go
Expand Up @@ -12,7 +12,7 @@ func buildTestSuitePipeline(
tranRepo *testsuite.Repository,
runRepo *testsuite.RunRepository,
testRunner *executor.TestPipeline,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
meter metric.Meter,
) *executor.TestSuitesPipeline {
tranRunner := executor.NewTestSuiteRunner(testRunner, runRepo, subscriptionManager)
Expand Down
4 changes: 2 additions & 2 deletions server/executor/assertion_runner.go
Expand Up @@ -20,15 +20,15 @@ type defaultAssertionRunner struct {
updater RunUpdater
assertionExecutor AssertionExecutor
outputsProcessor OutputsProcessorFn
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
eventEmitter EventEmitter
}

func NewAssertionRunner(
updater RunUpdater,
assertionExecutor AssertionExecutor,
op OutputsProcessorFn,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
eventEmitter EventEmitter,
) *defaultAssertionRunner {
return &defaultAssertionRunner{
Expand Down
11 changes: 8 additions & 3 deletions server/executor/eventemitter_test.go
Expand Up @@ -3,6 +3,7 @@ package executor_test
import (
"context"
"errors"
"fmt"
"testing"

"github.com/kubeshop/tracetest/server/executor"
Expand Down Expand Up @@ -135,13 +136,17 @@ func (s *testRunEventSubscriber) ID() string {
return "some-id"
}

func (s *testRunEventSubscriber) Notify(message subscription.Message) error {
event := message.Content.(model.TestRunEvent)
func (s *testRunEventSubscriber) Notify(m subscription.Message) error {
event := model.TestRunEvent{}
err := m.DecodeContent(&event)
if err != nil {
panic(fmt.Errorf("cannot read testRunEvent: %w", err))
}
s.events = append(s.events, event)
return nil
}

func getSubscriptionManagerMock(t *testing.T, event model.TestRunEvent) (*subscription.Manager, *testRunEventSubscriber) {
func getSubscriptionManagerMock(t *testing.T, event model.TestRunEvent) (subscription.Manager, *testRunEventSubscriber) {
t.Helper()

subscriptionManager := subscription.NewManager()
Expand Down
4 changes: 2 additions & 2 deletions server/executor/linter_runner.go
Expand Up @@ -20,15 +20,15 @@ type AnalyzerGetter interface {

type defaultLinterRunner struct {
updater RunUpdater
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
eventEmitter EventEmitter
analyzerGetter AnalyzerGetter
outputQueue pipeline.Enqueuer[Job]
}

func NewLinterRunner(
updater RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
eventEmitter EventEmitter,
analyzerGetter AnalyzerGetter,
) *defaultLinterRunner {
Expand Down
32 changes: 20 additions & 12 deletions server/executor/queue.go
Expand Up @@ -366,9 +366,10 @@ func (q Queue) listenForUserRequests(ctx context.Context, cancelCtx context.Canc

sfn := subscription.NewSubscriberFunction(func(m subscription.Message) error {
cancelCtx(nil)
request, ok := m.Content.(UserRequest)
if !ok {
return nil
request := UserRequest{}
err := m.DecodeContent(&request)
if err != nil {
return fmt.Errorf("cannot decode UserRequest message: %w", err)
}

run, err := q.runs.GetRun(ctx, request.TestID, request.RunID)
Expand All @@ -384,9 +385,10 @@ func (q Queue) listenForUserRequests(ctx context.Context, cancelCtx context.Canc
})

spfn := subscription.NewSubscriberFunction(func(m subscription.Message) error {
request, ok := m.Content.(UserRequest)
if !ok {
return nil
request := UserRequest{}
err := m.DecodeContent(&request)
if err != nil {
return fmt.Errorf("cannot decode UserRequest message: %w", err)
}

run, err := q.runs.GetRun(ctx, request.TestID, request.RunID)
Expand Down Expand Up @@ -416,7 +418,8 @@ func (q Queue) resolveTestSuite(ctx context.Context, job Job) testsuite.TestSuit
return testsuite.TestSuite{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve TestSuite: %s", err.Error())
return testsuite.TestSuite{}
}

return tran
Expand All @@ -431,7 +434,8 @@ func (q Queue) resolveTestSuiteRun(ctx context.Context, job Job) testsuite.TestS
return testsuite.TestSuiteRun{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve TestSuiteRun: %s", err.Error())
return testsuite.TestSuiteRun{}
}

return tranRun
Expand All @@ -447,7 +451,8 @@ func (q Queue) resolveTest(ctx context.Context, job Job) test.Test {
return test.Test{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve Test: %s", err.Error())
return test.Test{}
}

return t
Expand All @@ -463,7 +468,8 @@ func (q Queue) resolveTestRun(ctx context.Context, job Job) test.Run {
return test.Run{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve test run: %s", err.Error())
return test.Run{}
}

return run
Expand All @@ -479,7 +485,8 @@ func (q Queue) resolvePollingProfile(ctx context.Context, job Job) pollingprofil
return pollingprofile.PollingProfile{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve PollingProfile: %s", err.Error())
return pollingprofile.PollingProfile{}
}

return profile
Expand All @@ -495,7 +502,8 @@ func (q Queue) resolveDataStore(ctx context.Context, job Job) datastore.DataStor
return datastore.DataStore{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve DataStore: %s", err.Error())
return datastore.DataStore{}
}

return ds
Expand Down
4 changes: 2 additions & 2 deletions server/executor/run_updater.go
Expand Up @@ -50,10 +50,10 @@ func (u dbUpdater) Update(ctx context.Context, run test.Run) error {
}

type subscriptionUpdater struct {
manager *subscription.Manager
manager subscription.Manager
}

func NewSubscriptionUpdater(manager *subscription.Manager) RunUpdater {
func NewSubscriptionUpdater(manager subscription.Manager) RunUpdater {
return subscriptionUpdater{manager}
}

Expand Down
4 changes: 2 additions & 2 deletions server/executor/test_suite_run_updater.go
Expand Up @@ -50,10 +50,10 @@ func (u dbTransactionUpdater) Update(ctx context.Context, run testsuite.TestSuit
}

type subscriptionTransactionUpdater struct {
manager *subscription.Manager
manager subscription.Manager
}

func NewSubscriptionTransactionUpdater(manager *subscription.Manager) TestSuiteRunUpdater {
func NewSubscriptionTransactionUpdater(manager subscription.Manager) TestSuiteRunUpdater {
return subscriptionTransactionUpdater{manager}
}

Expand Down
10 changes: 7 additions & 3 deletions server/executor/test_suite_runner.go
Expand Up @@ -26,7 +26,7 @@ type testRunner interface {
func NewTestSuiteRunner(
testRunner testRunner,
transactionRuns testSuiteRunRepository,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
) *persistentTransactionRunner {
updater := (CompositeTransactionUpdater{}).
Add(NewDBTranasctionUpdater(transactionRuns)).
Expand All @@ -44,7 +44,7 @@ type persistentTransactionRunner struct {
testRunner testRunner
transactionRuns testSuiteRunRepository
updater TestSuiteRunUpdater
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
}

func (r *persistentTransactionRunner) SetOutputQueue(_ pipeline.Enqueuer[Job]) {
Expand Down Expand Up @@ -104,7 +104,11 @@ func (r persistentTransactionRunner) runTransactionStep(ctx context.Context, tr
// listen for updates and propagate them as if they were transaction updates
r.subscriptionManager.Subscribe(testRun.ResourceID(), subscription.NewSubscriberFunction(
func(m subscription.Message) error {
testRun := m.Content.(test.Run)
testRun := test.Run{}
err := m.DecodeContent(&testRun)
if err != nil {
return fmt.Errorf("cannot decode Run message: %w", err)
}
if testRun.LastError != nil {
tr.State = testsuite.TestSuiteStateFailed
tr.LastError = testRun.LastError
Expand Down
8 changes: 6 additions & 2 deletions server/executor/test_suite_runner_test.go
Expand Up @@ -26,7 +26,7 @@ import (

type fakeTestRunner struct {
db test.RunRepository
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
returnErr bool
uid int
}
Expand Down Expand Up @@ -181,7 +181,11 @@ func runTestSuiteRunnerTest(t *testing.T, withErrors bool, assert func(t *testin

done := make(chan testsuite.TestSuiteRun, 1)
sf := subscription.NewSubscriberFunction(func(m subscription.Message) error {
tr := m.Content.(testsuite.TestSuiteRun)
tr := testsuite.TestSuiteRun{}
err := m.DecodeContent(&tr)
if err != nil {
panic(fmt.Errorf("cannot decode TestSuiteRun message: %w", err))
}
if tr.State.IsFinal() {
done <- tr
}
Expand Down
4 changes: 2 additions & 2 deletions server/executor/trace_poller.go
Expand Up @@ -65,7 +65,7 @@ type TraceFetcher interface {
func NewTracePoller(
pe pollerExecutor,
updater RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
eventEmitter EventEmitter,
) *tracePoller {
return &tracePoller{
Expand All @@ -79,7 +79,7 @@ func NewTracePoller(
type tracePoller struct {
updater RunUpdater
pollerExecutor pollerExecutor
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
eventEmitter EventEmitter
inputQueue pipeline.Enqueuer[Job]
outputQueue pipeline.Enqueuer[Job]
Expand Down
2 changes: 1 addition & 1 deletion server/executor/tracepollerworker/common.go
Expand Up @@ -24,7 +24,7 @@ type workerState struct {
newTraceDBFn tracedb.FactoryFunc
dsRepo resourcemanager.Current[datastore.DataStore]
updater executor.RunUpdater
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
tracer trace.Tracer
inputQueue pipeline.Enqueuer[executor.Job]
}
Expand Down
2 changes: 1 addition & 1 deletion server/executor/tracepollerworker/evaluator_worker.go
Expand Up @@ -36,7 +36,7 @@ func NewEvaluatorWorker(
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
stopStrategy PollingStopStrategy,
tracer trace.Tracer,
) *tracePollerEvaluatorWorker {
Expand Down
2 changes: 1 addition & 1 deletion server/executor/tracepollerworker/fetcher_worker.go
Expand Up @@ -24,7 +24,7 @@ func NewFetcherWorker(
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
tracer trace.Tracer,
enabled bool,
) *traceFetcherWorker {
Expand Down
2 changes: 1 addition & 1 deletion server/executor/tracepollerworker/starter_worker.go
Expand Up @@ -26,7 +26,7 @@ func NewStarterWorker(
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
tracer trace.Tracer,
) *tracePollerStarterWorker {
state := &workerState{
Expand Down
4 changes: 2 additions & 2 deletions server/executor/trigger_result_processor_worker.go
Expand Up @@ -21,7 +21,7 @@ type RunResult struct {

func NewTriggerResultProcessorWorker(
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
eventEmitter EventEmitter,
updater RunUpdater,
) *triggerResultProcessorWorker {
Expand All @@ -35,7 +35,7 @@ func NewTriggerResultProcessorWorker(

type triggerResultProcessorWorker struct {
tracer trace.Tracer
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
eventEmitter EventEmitter
outputQueue pipeline.Enqueuer[Job]
updater RunUpdater
Expand Down

0 comments on commit 29ae31c

Please sign in to comment.