Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): add nats subscription manager #3522

Merged
merged 12 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions agent/workers/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/client/mocks"
"github.com/kubeshop/tracetest/agent/collector"
Expand Down Expand Up @@ -72,8 +71,6 @@ func TestPollerWorker(t *testing.T) {
}
}

spew.Dump(pollingResponse)

assert.Len(t, spans, 2)
assert.Equal(t, "", spans[0].ParentId)
assert.Equal(t, spans[0].Id, spans[1].ParentId)
Expand Down
23 changes: 12 additions & 11 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ func provision(provisioner *provisioning.Provisioner, file string) {
fmt.Println("[Provisioning]: success")
}

func (app *App) subscribeToConfigChanges(sm *subscription.Manager) {
func (app *App) subscribeToConfigChanges(sm subscription.Manager) {
sm.Subscribe(config.ResourceID, subscription.NewSubscriberFunction(
func(m subscription.Message) error {
configFromDB, ok := m.Content.(config.Config)
if !ok {
return fmt.Errorf("cannot read update to configFromDB. unexpected type %T", m.Content)
configFromDB := config.Config{}
err := m.DecodeContent(&configFromDB)
if err != nil {
return fmt.Errorf("cannot read update to configFromDB: %w", err)
}

return app.initAnalytics(configFromDB)
Expand Down Expand Up @@ -174,7 +175,12 @@ func (app *App) Start(opts ...appOption) error {
log.Fatal(err)
}

subscriptionManager := subscription.NewManager()
natsConn, err := nats.Connect(app.cfg.NATSEndpoint())
if err != nil {
log.Printf("could not connect to NATS: %s. Defaulting to InMemory Queues", err)
}

subscriptionManager := subscription.NewManager(subscription.WithNats(natsConn))
app.subscribeToConfigChanges(subscriptionManager)

configRepo := config.NewRepository(db, config.WithPublisher(subscriptionManager))
Expand Down Expand Up @@ -241,11 +247,6 @@ func (app *App) Start(opts ...appOption) error {
registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer)
}

natsConn, err := nats.Connect(app.cfg.NATSEndpoint())
if err != nil {
log.Printf("could not connect to NATS: %s. Defaulting to InMemory Queues", err)
}

executorDriverFactory := pipeline.NewDriverFactory[executor.Job](natsConn)
testPipeline := buildTestPipeline(
executorDriverFactory,
Expand Down Expand Up @@ -527,7 +528,7 @@ type httpServerConfig interface {
ExperimentalFeatures() []string
}

func registerWSHandler(router *mux.Router, mappers mappings.Mappings, subscriptionManager *subscription.Manager) {
func registerWSHandler(router *mux.Router, mappers mappings.Mappings, subscriptionManager subscription.Manager) {
wsRouter := websocket.NewRouter()
wsRouter.Add("subscribe", websocket.NewSubscribeCommandExecutor(subscriptionManager, mappers))
wsRouter.Add("unsubscribe", websocket.NewUnsubscribeCommandExecutor(subscriptionManager))
Expand Down
2 changes: 1 addition & 1 deletion server/app/test_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func buildTestPipeline(
testRepo test.Repository,
runRepo test.RunRepository,
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
triggerRegistry *trigger.Registry,
tracedbFactory tracedb.FactoryFunc,
appConfig *config.AppConfig,
Expand Down
2 changes: 1 addition & 1 deletion server/app/test_suite_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func buildTestSuitePipeline(
tranRepo *testsuite.Repository,
runRepo *testsuite.RunRepository,
testRunner *executor.TestPipeline,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
meter metric.Meter,
) *executor.TestSuitesPipeline {
tranRunner := executor.NewTestSuiteRunner(testRunner, runRepo, subscriptionManager)
Expand Down
4 changes: 2 additions & 2 deletions server/executor/assertion_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ type defaultAssertionRunner struct {
updater RunUpdater
assertionExecutor AssertionExecutor
outputsProcessor OutputsProcessorFn
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
eventEmitter EventEmitter
}

func NewAssertionRunner(
updater RunUpdater,
assertionExecutor AssertionExecutor,
op OutputsProcessorFn,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
eventEmitter EventEmitter,
) *defaultAssertionRunner {
return &defaultAssertionRunner{
Expand Down
11 changes: 8 additions & 3 deletions server/executor/eventemitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executor_test
import (
"context"
"errors"
"fmt"
"testing"

"github.com/kubeshop/tracetest/server/executor"
Expand Down Expand Up @@ -135,13 +136,17 @@ func (s *testRunEventSubscriber) ID() string {
return "some-id"
}

func (s *testRunEventSubscriber) Notify(message subscription.Message) error {
event := message.Content.(model.TestRunEvent)
func (s *testRunEventSubscriber) Notify(m subscription.Message) error {
event := model.TestRunEvent{}
err := m.DecodeContent(&event)
if err != nil {
panic(fmt.Errorf("cannot read testRunEvent: %w", err))
}
s.events = append(s.events, event)
return nil
}

func getSubscriptionManagerMock(t *testing.T, event model.TestRunEvent) (*subscription.Manager, *testRunEventSubscriber) {
func getSubscriptionManagerMock(t *testing.T, event model.TestRunEvent) (subscription.Manager, *testRunEventSubscriber) {
t.Helper()

subscriptionManager := subscription.NewManager()
Expand Down
4 changes: 2 additions & 2 deletions server/executor/linter_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ type AnalyzerGetter interface {

type defaultLinterRunner struct {
updater RunUpdater
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
eventEmitter EventEmitter
analyzerGetter AnalyzerGetter
outputQueue pipeline.Enqueuer[Job]
}

func NewLinterRunner(
updater RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
eventEmitter EventEmitter,
analyzerGetter AnalyzerGetter,
) *defaultLinterRunner {
Expand Down
32 changes: 20 additions & 12 deletions server/executor/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,10 @@ func (q Queue) listenForUserRequests(ctx context.Context, cancelCtx context.Canc

sfn := subscription.NewSubscriberFunction(func(m subscription.Message) error {
cancelCtx(nil)
request, ok := m.Content.(UserRequest)
if !ok {
return nil
request := UserRequest{}
err := m.DecodeContent(&request)
if err != nil {
return fmt.Errorf("cannot decode UserRequest message: %w", err)
}

run, err := q.runs.GetRun(ctx, request.TestID, request.RunID)
Expand All @@ -384,9 +385,10 @@ func (q Queue) listenForUserRequests(ctx context.Context, cancelCtx context.Canc
})

spfn := subscription.NewSubscriberFunction(func(m subscription.Message) error {
request, ok := m.Content.(UserRequest)
if !ok {
return nil
request := UserRequest{}
err := m.DecodeContent(&request)
if err != nil {
return fmt.Errorf("cannot decode UserRequest message: %w", err)
}

run, err := q.runs.GetRun(ctx, request.TestID, request.RunID)
Expand Down Expand Up @@ -416,7 +418,8 @@ func (q Queue) resolveTestSuite(ctx context.Context, job Job) testsuite.TestSuit
return testsuite.TestSuite{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve TestSuite: %s", err.Error())
return testsuite.TestSuite{}
}

return tran
Expand All @@ -431,7 +434,8 @@ func (q Queue) resolveTestSuiteRun(ctx context.Context, job Job) testsuite.TestS
return testsuite.TestSuiteRun{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve TestSuiteRun: %s", err.Error())
return testsuite.TestSuiteRun{}
}

return tranRun
Expand All @@ -447,7 +451,8 @@ func (q Queue) resolveTest(ctx context.Context, job Job) test.Test {
return test.Test{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve Test: %s", err.Error())
return test.Test{}
}

return t
Expand All @@ -463,7 +468,8 @@ func (q Queue) resolveTestRun(ctx context.Context, job Job) test.Run {
return test.Run{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve test run: %s", err.Error())
return test.Run{}
}

return run
Expand All @@ -479,7 +485,8 @@ func (q Queue) resolvePollingProfile(ctx context.Context, job Job) pollingprofil
return pollingprofile.PollingProfile{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve PollingProfile: %s", err.Error())
return pollingprofile.PollingProfile{}
}

return profile
Expand All @@ -495,7 +502,8 @@ func (q Queue) resolveDataStore(ctx context.Context, job Job) datastore.DataStor
return datastore.DataStore{}
}
if err != nil {
panic(err)
log.Printf("cannot resolve DataStore: %s", err.Error())
return datastore.DataStore{}
}

return ds
Expand Down
4 changes: 2 additions & 2 deletions server/executor/run_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func (u dbUpdater) Update(ctx context.Context, run test.Run) error {
}

type subscriptionUpdater struct {
manager *subscription.Manager
manager subscription.Manager
}

func NewSubscriptionUpdater(manager *subscription.Manager) RunUpdater {
func NewSubscriptionUpdater(manager subscription.Manager) RunUpdater {
return subscriptionUpdater{manager}
}

Expand Down
4 changes: 2 additions & 2 deletions server/executor/test_suite_run_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func (u dbTransactionUpdater) Update(ctx context.Context, run testsuite.TestSuit
}

type subscriptionTransactionUpdater struct {
manager *subscription.Manager
manager subscription.Manager
}

func NewSubscriptionTransactionUpdater(manager *subscription.Manager) TestSuiteRunUpdater {
func NewSubscriptionTransactionUpdater(manager subscription.Manager) TestSuiteRunUpdater {
return subscriptionTransactionUpdater{manager}
}

Expand Down
10 changes: 7 additions & 3 deletions server/executor/test_suite_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type testRunner interface {
func NewTestSuiteRunner(
testRunner testRunner,
transactionRuns testSuiteRunRepository,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
) *persistentTransactionRunner {
updater := (CompositeTransactionUpdater{}).
Add(NewDBTranasctionUpdater(transactionRuns)).
Expand All @@ -44,7 +44,7 @@ type persistentTransactionRunner struct {
testRunner testRunner
transactionRuns testSuiteRunRepository
updater TestSuiteRunUpdater
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
}

func (r *persistentTransactionRunner) SetOutputQueue(_ pipeline.Enqueuer[Job]) {
Expand Down Expand Up @@ -104,7 +104,11 @@ func (r persistentTransactionRunner) runTransactionStep(ctx context.Context, tr
// listen for updates and propagate them as if they were transaction updates
r.subscriptionManager.Subscribe(testRun.ResourceID(), subscription.NewSubscriberFunction(
func(m subscription.Message) error {
testRun := m.Content.(test.Run)
testRun := test.Run{}
err := m.DecodeContent(&testRun)
if err != nil {
return fmt.Errorf("cannot decode Run message: %w", err)
}
if testRun.LastError != nil {
tr.State = testsuite.TestSuiteStateFailed
tr.LastError = testRun.LastError
Expand Down
8 changes: 6 additions & 2 deletions server/executor/test_suite_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

type fakeTestRunner struct {
db test.RunRepository
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
returnErr bool
uid int
}
Expand Down Expand Up @@ -181,7 +181,11 @@ func runTestSuiteRunnerTest(t *testing.T, withErrors bool, assert func(t *testin

done := make(chan testsuite.TestSuiteRun, 1)
sf := subscription.NewSubscriberFunction(func(m subscription.Message) error {
tr := m.Content.(testsuite.TestSuiteRun)
tr := testsuite.TestSuiteRun{}
err := m.DecodeContent(&tr)
if err != nil {
panic(fmt.Errorf("cannot decode TestSuiteRun message: %w", err))
}
if tr.State.IsFinal() {
done <- tr
}
Expand Down
4 changes: 2 additions & 2 deletions server/executor/trace_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type TraceFetcher interface {
func NewTracePoller(
pe pollerExecutor,
updater RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
eventEmitter EventEmitter,
) *tracePoller {
return &tracePoller{
Expand All @@ -79,7 +79,7 @@ func NewTracePoller(
type tracePoller struct {
updater RunUpdater
pollerExecutor pollerExecutor
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
eventEmitter EventEmitter
inputQueue pipeline.Enqueuer[Job]
outputQueue pipeline.Enqueuer[Job]
Expand Down
2 changes: 1 addition & 1 deletion server/executor/tracepollerworker/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type workerState struct {
newTraceDBFn tracedb.FactoryFunc
dsRepo resourcemanager.Current[datastore.DataStore]
updater executor.RunUpdater
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
tracer trace.Tracer
inputQueue pipeline.Enqueuer[executor.Job]
}
Expand Down
2 changes: 1 addition & 1 deletion server/executor/tracepollerworker/evaluator_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewEvaluatorWorker(
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
stopStrategy PollingStopStrategy,
tracer trace.Tracer,
) *tracePollerEvaluatorWorker {
Expand Down
2 changes: 1 addition & 1 deletion server/executor/tracepollerworker/fetcher_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewFetcherWorker(
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
tracer trace.Tracer,
enabled bool,
) *traceFetcherWorker {
Expand Down
2 changes: 1 addition & 1 deletion server/executor/tracepollerworker/starter_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewStarterWorker(
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
tracer trace.Tracer,
) *tracePollerStarterWorker {
state := &workerState{
Expand Down
4 changes: 2 additions & 2 deletions server/executor/trigger_result_processor_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type RunResult struct {

func NewTriggerResultProcessorWorker(
tracer trace.Tracer,
subscriptionManager *subscription.Manager,
subscriptionManager subscription.Manager,
eventEmitter EventEmitter,
updater RunUpdater,
) *triggerResultProcessorWorker {
Expand All @@ -35,7 +35,7 @@ func NewTriggerResultProcessorWorker(

type triggerResultProcessorWorker struct {
tracer trace.Tracer
subscriptionManager *subscription.Manager
subscriptionManager subscription.Manager
eventEmitter EventEmitter
outputQueue pipeline.Enqueuer[Job]
updater RunUpdater
Expand Down