diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 94ad0b1e8a..4e451070a4 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -304,9 +304,9 @@ jobs: TEST_ENV="${{ matrix.test_env }}" \ ./run.bash || (cat /tmp/docker-log; exit 1) - trace-testing: + trace-testing-memory: needs: [build-docker] - name: Tracetesting API Server + name: Tracetesting API Server (InMemory) runs-on: ubuntu-latest steps: @@ -333,6 +333,35 @@ jobs: ./scripts/wait-for-port.sh 11633 ./run.sh tracetests || (cat /tmp/docker-log; exit 1) + trace-testing-nats: + needs: [build-docker] + name: Tracetesting API Server (NATS) + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v3 + + - uses: actions/download-artifact@v3 + with: + name: tracetest-dist + path: dist/ + + - name: Import image + run: | + docker load --input dist/image.tar + + - name: Start services + run: | + ./run.sh down up + ./run.sh tracetest-logs > /tmp/docker-log & + - name: Run tests + run: | + chmod +x ./dist/tracetest ./dist/tracetest-server + + ./scripts/wait-for-port.sh 11633 + NATS=true ./run.sh tracetests || (cat /tmp/docker-log; exit 1) + e2e-cli: name: CLI e2e tests needs: [build-docker] diff --git a/docker-compose.nats.yaml b/docker-compose.nats.yaml new file mode 100644 index 0000000000..1a341f1b43 --- /dev/null +++ b/docker-compose.nats.yaml @@ -0,0 +1,67 @@ +version: "3.2" +services: + tracetest: + restart: unless-stopped + image: kubeshop/tracetest:${TAG:-latest} + extra_hosts: + - "host.docker.internal:host-gateway" + build: + context: . + volumes: + - type: bind + source: ./local-config/tracetest.config.nats.yaml + target: /app/tracetest.yaml + - type: bind + source: ./local-config/tracetest.provision.yaml + target: /app/provisioning.yaml + ports: + - 11633:11633 + command: --provisioning-file /app/provisioning.yaml + healthcheck: + test: ["CMD", "wget", "--spider", "localhost:11633"] + interval: 1s + timeout: 3s + retries: 60 + depends_on: + postgres: + condition: service_healthy + environment: + TRACETEST_DEV: ${TRACETEST_DEV} + TRACETEST_TESTPIPELINES_TRIGGEREXECUTE_ENABLED: ${TRACETEST_TESTPIPELINES_TRIGGEREXECUTE_ENABLED} + TRACETEST_TESTPIPELINES_TRACEFETCH_ENABLED: ${TRACETEST_TESTPIPELINES_TRACEFETCH_ENABLED} + TRACETEST_DATASTOREPIPELINES_TESTCONNECTION_ENABLED: ${TRACETEST_DATASTOREPIPELINES_TESTCONNECTION_ENABLED} + + postgres: + image: postgres:15.2 + environment: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + ports: + - 5432:5432 + healthcheck: + test: pg_isready -U "$$POSTGRES_USER" -d "$$POSTGRES_DB" + interval: 1s + timeout: 5s + retries: 60 + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.59.0 + extra_hosts: + - "host.docker.internal:host-gateway" + ports: + - "55679:55679" + - "4317:4317" + - "8888:8888" + command: + - "--config" + - "/otel-local-config.yaml" + volumes: + - ./local-config/collector.config.yaml:/otel-local-config.yaml + depends_on: + - tracetest + + nats: + image: nats:2.10-alpine + ports: + - "4222:4222" # connecting + - "8222:8222" # reporting server diff --git a/go.mod b/go.mod index 5812fbc6c2..25a3407036 100644 --- a/go.mod +++ b/go.mod @@ -37,11 +37,11 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/j2gg0s/otsql v0.14.0 github.com/jackc/pgx/v5 v5.4.2 - github.com/jdvr/go-again v1.0.0 github.com/jhump/protoreflect v1.12.0 github.com/json-iterator/go v1.1.12 github.com/labstack/gommon v0.3.0 github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 + github.com/nats-io/nats.go v1.31.0 github.com/ohler55/ojg v1.14.4 github.com/opensearch-project/opensearch-go v1.1.0 github.com/orlangure/gnomock v0.20.0 @@ -148,6 +148,8 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.2.1 // indirect + github.com/nats-io/nkeys v0.4.5 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect diff --git a/go.sum b/go.sum index 360f503cb8..44dbc22507 100644 --- a/go.sum +++ b/go.sum @@ -1170,8 +1170,6 @@ github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/jdvr/go-again v1.0.0 h1:eNHlD8mFc9Rq7aLMKhbyaFEz68zBuU9YsfPet99dZnA= -github.com/jdvr/go-again v1.0.0/go.mod h1:QCsfhX2LgPTeaHyC7xtqQ/a5LKtAp4Ukb0eEXV8I+7M= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= @@ -1425,8 +1423,13 @@ github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5Vgl github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI= diff --git a/local-config/tracetest.config.nats.yaml b/local-config/tracetest.config.nats.yaml new file mode 100644 index 0000000000..be3343af64 --- /dev/null +++ b/local-config/tracetest.config.nats.yaml @@ -0,0 +1,24 @@ +postgres: + host: postgres + user: postgres + password: postgres + port: 5432 + dbname: postgres + params: sslmode=disable + +nats: + endpoint: nats:4222 + +telemetry: + exporters: + collector: + serviceName: tracetest + sampling: 100 # 100% + exporter: + type: collector + collector: + endpoint: otel-collector:4317 + +server: + telemetry: + exporter: collector diff --git a/run.sh b/run.sh index 8830aef652..41c2d58467 100755 --- a/run.sh +++ b/run.sh @@ -5,6 +5,10 @@ set -e export TAG=${TAG:-dev} opts="-f docker-compose.yaml -f examples/docker-compose.demo.yaml" +# use nats version of docker-compose if NATS is set to true +if [ "$NATS" == "true" ]; then + opts="-f docker-compose.nats.yaml -f examples/docker-compose.demo.yaml" +fi help_message() { echo "usage: ./run.sh [cypress|tracetests|up|stop|build|down|tracetest-logs|logs|ps|restart]" diff --git a/server/app/app.go b/server/app/app.go index 7e980b7323..a53ddc0e42 100644 --- a/server/app/app.go +++ b/server/app/app.go @@ -30,6 +30,7 @@ import ( "github.com/kubeshop/tracetest/server/openapi" "github.com/kubeshop/tracetest/server/otlp" "github.com/kubeshop/tracetest/server/pkg/id" + "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/provisioning" "github.com/kubeshop/tracetest/server/resourcemanager" "github.com/kubeshop/tracetest/server/subscription" @@ -42,6 +43,7 @@ import ( "github.com/kubeshop/tracetest/server/traces" "github.com/kubeshop/tracetest/server/variableset" "github.com/kubeshop/tracetest/server/version" + "github.com/nats-io/nats.go" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" ) @@ -240,7 +242,14 @@ func (app *App) Start(opts ...appOption) error { registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer) } + natsConn, err := nats.Connect(app.cfg.NATSEndpoint()) + if err != nil { + log.Printf("could not connect to NATS: %s. Defaulting to InMemory Queues", err) + } + + executorDriverFactory := pipeline.NewDriverFactory[executor.Job](natsConn) testPipeline := buildTestPipeline( + executorDriverFactory, pool, pollingProfileRepo, dataStoreRepo, @@ -274,9 +283,10 @@ func (app *App) Start(opts ...appOption) error { testSuitePipeline.Stop() }) + testConnectionDriverFactory := pipeline.NewDriverFactory[testconnection.Job](natsConn) dsTestListener := testconnection.NewListener() dsTestPipeline := buildDataStoreTestPipeline( - pool, + testConnectionDriverFactory, dsTestListener, tracer, tracedbFactory, diff --git a/server/app/ds_test_connection_pipeline.go b/server/app/ds_test_connection_pipeline.go index 536ea6faeb..8ce6a3b652 100644 --- a/server/app/ds_test_connection_pipeline.go +++ b/server/app/ds_test_connection_pipeline.go @@ -1,7 +1,6 @@ package app import ( - "github.com/jackc/pgx/v5/pgxpool" "github.com/kubeshop/tracetest/server/config" "github.com/kubeshop/tracetest/server/pkg/pipeline" "github.com/kubeshop/tracetest/server/testconnection" @@ -11,7 +10,7 @@ import ( ) func buildDataStoreTestPipeline( - pool *pgxpool.Pool, + driverFactory pipeline.DriverFactory[testconnection.Job], dsTestListener *testconnection.Listener, tracer trace.Tracer, newTraceDBFn tracedb.FactoryFunc, @@ -21,11 +20,9 @@ func buildDataStoreTestPipeline( requestWorker := testconnection.NewDsTestConnectionRequest(tracer, newTraceDBFn, appConfig.DataStorePipelineTestConnectionEnabled()) notifyWorker := testconnection.NewDsTestConnectionNotify(dsTestListener, tracer) - pgQueue := pipeline.NewPostgresQueueDriver[testconnection.Job](pool, pgChannelName) - pipeline := pipeline.New(testconnection.NewConfigurer(meter), - pipeline.Step[testconnection.Job]{Processor: requestWorker, Driver: pgQueue.Channel("datastore_test_connection_request")}, - pipeline.Step[testconnection.Job]{Processor: notifyWorker, Driver: pgQueue.Channel("datastore_test_connection_notify")}, + pipeline.Step[testconnection.Job]{Processor: requestWorker, Driver: driverFactory.NewDriver("datastore_test_connection_request")}, + pipeline.Step[testconnection.Job]{Processor: notifyWorker, Driver: driverFactory.NewDriver("datastore_test_connection_notify")}, ) return testconnection.NewDataStoreTestPipeline(pipeline, dsTestListener) diff --git a/server/app/test_pipeline.go b/server/app/test_pipeline.go index 19b4ea7e17..0f4c18aa3c 100644 --- a/server/app/test_pipeline.go +++ b/server/app/test_pipeline.go @@ -20,6 +20,7 @@ import ( ) func buildTestPipeline( + driverFactory pipeline.DriverFactory[executor.Job], pool *pgxpool.Pool, ppRepo *pollingprofile.Repository, dsRepo *datastore.Repository, @@ -123,17 +124,15 @@ func buildTestPipeline( WithInstanceID(instanceID). WithMetricMeter(meter) - pgQueue := pipeline.NewPostgresQueueDriver[executor.Job](pool, pgChannelName) - pipeline := pipeline.New(queueBuilder, - pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_resolver", triggerResolverWorker), Driver: pgQueue.Channel("trigger_resolve")}, - pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_executer", triggerExecuterWorker), Driver: pgQueue.Channel("trigger_execute")}, - pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_result_processor", triggerResultProcessorWorker), Driver: pgQueue.Channel("trigger_result")}, - pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_poller_starter", tracePollerStarterWorker), Driver: pgQueue.Channel("tracePoller_start")}, - pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_fetcher", traceFetcherWorker), Driver: pgQueue.Channel("tracePoller_fetch")}, - pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_poller_evaluator", tracePollerEvaluatorWorker), Driver: pgQueue.Channel("tracePoller_evaluate"), InputQueueOffset: -1}, - pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("linter_runner", linterRunner), Driver: pgQueue.Channel("linterRunner")}, - pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("assertion_runner", assertionRunner), Driver: pgQueue.Channel("assertionRunner")}, + pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_resolver", triggerResolverWorker), Driver: driverFactory.NewDriver("trigger_resolve")}, + pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_executer", triggerExecuterWorker), Driver: driverFactory.NewDriver("trigger_execute")}, + pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_result_processor", triggerResultProcessorWorker), Driver: driverFactory.NewDriver("trigger_result")}, + pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_poller_starter", tracePollerStarterWorker), Driver: driverFactory.NewDriver("tracePoller_start")}, + pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_fetcher", traceFetcherWorker), Driver: driverFactory.NewDriver("tracePoller_fetch")}, + pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_poller_evaluator", tracePollerEvaluatorWorker), Driver: driverFactory.NewDriver("tracePoller_evaluate"), InputQueueOffset: -1}, + pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("linter_runner", linterRunner), Driver: driverFactory.NewDriver("linterRunner")}, + pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("assertion_runner", assertionRunner), Driver: driverFactory.NewDriver("assertionRunner")}, ) const assertionRunnerStepIndex = 7 diff --git a/server/config/server.go b/server/config/server.go index d7511a7106..1f6adc91b6 100644 --- a/server/config/server.go +++ b/server/config/server.go @@ -205,3 +205,10 @@ func (c *AppConfig) AnalyticsFrontendKey() string { return c.vp.GetString("analytics.frontendKey") } + +func (c *AppConfig) NATSEndpoint() string { + c.mu.Lock() + defer c.mu.Unlock() + + return c.vp.GetString("nats.endpoint") +} diff --git a/server/pkg/pipeline/factory.go b/server/pkg/pipeline/factory.go new file mode 100644 index 0000000000..af7f821558 --- /dev/null +++ b/server/pkg/pipeline/factory.go @@ -0,0 +1,21 @@ +package pipeline + +import "github.com/nats-io/nats.go" + +func NewDriverFactory[T any](natsConn *nats.Conn) DriverFactory[T] { + return DriverFactory[T]{ + natsConn: natsConn, + } +} + +type DriverFactory[T any] struct { + natsConn *nats.Conn +} + +func (df DriverFactory[T]) NewDriver(channelName string) WorkerDriver[T] { + if df.natsConn != nil { + return NewNatsDriver[T](df.natsConn, channelName) + } + + return NewInMemoryQueueDriver[T](channelName) +} diff --git a/server/pkg/pipeline/nats_driver.go b/server/pkg/pipeline/nats_driver.go new file mode 100644 index 0000000000..b00efdfcfb --- /dev/null +++ b/server/pkg/pipeline/nats_driver.go @@ -0,0 +1,73 @@ +package pipeline + +import ( + "encoding/json" + "fmt" + + "github.com/nats-io/nats.go" +) + +type NatsDriver[T any] struct { + log loggerFn + conn *nats.Conn + topic string + subscription *nats.Subscription +} + +func NewNatsDriver[T any](conn *nats.Conn, topic string) *NatsDriver[T] { + return &NatsDriver[T]{ + log: newLoggerFn(fmt.Sprintf("NatsDriver - %s", topic)), + conn: conn, + topic: topic, + } +} + +func (d *NatsDriver[T]) Enqueue(msg T) { + msgJson, err := json.Marshal(msg) + if err != nil { + fmt.Printf("could not marshal message: %s\n", err.Error()) + } + + err = d.conn.PublishMsg(&nats.Msg{ + Subject: d.topic, + Data: msgJson, + }) + + if err != nil { + fmt.Printf("could not send publish message request: %s\n", err.Error()) + } +} + +// SetListener implements QueueDriver. +func (d *NatsDriver[T]) SetListener(listener Listener[T]) { + subscription, err := d.conn.Subscribe(d.topic, func(msg *nats.Msg) { + var target T + err := json.Unmarshal(msg.Data, &target) + if err != nil { + fmt.Printf(`could not unmarshal message got in queue "%s": %s\n`, d.topic, err.Error()) + } + + // TODO: We probably should return an error for acking or nacking this message + listener.Listen(target) + + msg.Ack() + }) + + if err != nil { + panic(err) + } + + d.subscription = subscription +} + +func (d *NatsDriver[T]) Start() { + d.log("start") +} + +func (d *NatsDriver[T]) Stop() { + err := d.subscription.Unsubscribe() + if err != nil { + d.log(`could not unsubscribe to topic "%s"\n`, d.topic) + } + d.subscription = nil +} diff --git a/server/pkg/pipeline/pipeline.go b/server/pkg/pipeline/pipeline.go index 271da9b0f8..714ed7fe77 100644 --- a/server/pkg/pipeline/pipeline.go +++ b/server/pkg/pipeline/pipeline.go @@ -20,7 +20,7 @@ type Pipeline[T any] struct { queues []*Queue[T] // N + 1 } -type workerDriver[T any] interface { +type WorkerDriver[T any] interface { QueueDriver[T] Start() Stop() @@ -40,7 +40,7 @@ type InputQueueSetter[T any] interface { } type Step[T any] struct { - Driver workerDriver[T] + Driver WorkerDriver[T] Processor StepProcessor[T] InputQueueOffset int } diff --git a/server/pkg/pipeline/postgres_driver.go b/server/pkg/pipeline/postgres_driver.go deleted file mode 100644 index d84b3f44cb..0000000000 --- a/server/pkg/pipeline/postgres_driver.go +++ /dev/null @@ -1,209 +0,0 @@ -package pipeline - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/jackc/pgx/v5/pgxpool" - "github.com/jdvr/go-again" - "github.com/kubeshop/tracetest/server/pkg/id" -) - -func NewPostgresQueueDriver[T any](pool *pgxpool.Pool, channelName string) *postgresQueueDriver[T] { - id := id.GenerateID() - return &postgresQueueDriver[T]{ - log: newLoggerFn("PostgresQueueDriver - " + id.String()), - channelName: channelName, - pool: pool, - channels: map[string]*channel[T]{}, - exit: make(chan bool), - } -} - -// postgresQueueDriver is a queue driver that uses Postgres LISTEN/NOTIFY -// Since each queue needs its own connection, it's not practical/scalable -// to create a new Driver instance for each queue. Instead, we create a -// single Driver instance and use it to create channels for each queue. -// -// This driver requires one connection that listens to messages in any queue -// and routes them to the correct worker. -type postgresQueueDriver[T any] struct { - log loggerFn - channelName string - pool *pgxpool.Pool - channels map[string]*channel[T] - running bool - exit chan bool -} - -func (qd *postgresQueueDriver[T]) getChannel(name string) (*channel[T], error) { - ch, ok := qd.channels[name] - if !ok { - return nil, fmt.Errorf("channel %s not found", name) - } - - return ch, nil -} - -type pgJob[T any] struct { - Channel string `json:"channel"` - Item T `json:"job"` -} - -func (qd *postgresQueueDriver[T]) Start() { - if qd.running { - // we want only 1 worker here - qd.log("already running") - return - } - qd.running = true - - go func(qd *postgresQueueDriver[T]) { - qd.log("start") - - for { - select { - case <-qd.exit: - qd.log("exit") - return - default: - _, err := again.Retry(context.Background(), func(_ context.Context) (bool, error) { - qd.log("acquiring connection") - conn, err := qd.pool.Acquire(context.Background()) - if err != nil { - err = fmt.Errorf("error acquiring connection: %w", err) - qd.log("%s", err.Error()) - return false, err - } - defer conn.Release() - - err = qd.worker(conn) - if err != nil { - err = fmt.Errorf("error in worker: %w", err) - qd.log("%s", err.Error()) - return false, err - } - return true, nil - }) - if err != nil { - // this panic is intentional. forces the app to crash and restart - panic(err) - } - } - } - }(qd) -} - -func (qd *postgresQueueDriver[T]) worker(conn *pgxpool.Conn) error { - qd.log("listening for notifications") - _, err := conn.Exec(context.Background(), "listen "+qd.channelName) - if err != nil { - return fmt.Errorf("error listening for notifications: %w", err) - } - qd.log("waiting for notification") - notification, err := conn.Conn().WaitForNotification(context.Background()) - if err != nil { - return fmt.Errorf("error waiting for notification: %w", err) - } - - job := pgJob[T]{} - err = json.Unmarshal([]byte(notification.Payload), &job) - if err != nil { - // this error is not fatal. we can ignore it and continue - qd.log("error unmarshalling pgJob: %s", err.Error()) - return nil - } - - qd.log("received job for channel: %s", job.Channel) - - channel, err := qd.getChannel(job.Channel) - if err != nil { - // this error is not fatal. we can ignore it and continue - qd.log("error getting channel: %s", err.Error()) - return nil - } - - qd.log("processing job for channel: %s", job.Channel) - channel.listener.Listen(job.Item) - return nil -} - -func (qd *postgresQueueDriver[T]) Stop() { - qd.log("stopping") - qd.exit <- true -} - -// Channel registers a new queue channel and returns it -func (qd *postgresQueueDriver[T]) Channel(name string) *channel[T] { - if _, channelNameExists := qd.channels[name]; channelNameExists { - panic(fmt.Errorf("channel %s already exists", name)) - } - - ch := &channel[T]{ - postgresQueueDriver: qd, - name: name, - log: newLoggerFn(fmt.Sprintf("PostgresQueueDriver - %s", name)), - pool: qd.pool, - } - - qd.channels[name] = ch - - return ch -} - -type channel[T any] struct { - *postgresQueueDriver[T] - name string - log loggerFn - pool *pgxpool.Pool - listener Listener[T] -} - -func (ch *channel[T]) SetListener(l Listener[T]) { - ch.listener = l -} - -const enqueueTimeout = 5 * time.Minute - -func (ch *channel[T]) Enqueue(item T) { - ch.log("enqueue item") - - jj, err := json.Marshal(pgJob[T]{ - Channel: ch.name, - Item: item, - }) - - if err != nil { - ch.log("error marshalling pgJob: %s", err.Error()) - return - } - - ctx, cancelCtx := context.WithTimeout(context.Background(), enqueueTimeout) - defer cancelCtx() - - conn, err := again.Retry[*pgxpool.Conn](ctx, func(ctx context.Context) (*pgxpool.Conn, error) { - ch.log("trying to acquire connection") - return ch.pool.Acquire(context.Background()) - }) - - if err != nil { - ch.log("error acquiring connection: %s", err.Error()) - return - } - ch.log("acquired connection for") - defer conn.Release() - - _, err = conn.Query(ctx, fmt.Sprintf(`select pg_notify('%s', $1)`, ch.postgresQueueDriver.channelName), jj) - if err != nil { - ch.log("error notifying postgres: %s", err.Error()) - return - } - - ch.log("notified postgres") -} - -func (ch *channel[T]) Name() string { - return ch.name -} diff --git a/server/test/run_repository.go b/server/test/run_repository.go index 0cd7497566..3a836f49c9 100644 --- a/server/test/run_repository.go +++ b/server/test/run_repository.go @@ -655,6 +655,11 @@ func readRunRow(row scanner) (Run, error) { return Run{}, fmt.Errorf("cannot parse TriggerResult: %w", err) } + err = json.Unmarshal(jsonResolvedTrigger, &r.ResolvedTrigger) + if err != nil { + return Run{}, fmt.Errorf("cannot parse ResolvedTrigger: %w", err) + } + err = json.Unmarshal(jsonTestResults, &r.Results) if err != nil { return Run{}, fmt.Errorf("cannot parse Results: %w", err)