Skip to content

Commit

Permalink
feat(server): use NATS queues instead of postgres notify/listen (#3329)
Browse files Browse the repository at this point in the history
* feat: add nats driver

* have a factory to decide which driver to use

* delete postgres driver

* update gomod

* fix: unmarshal resolved trigger in test run

---------

Co-authored-by: Matheus Nogueira <matheus.nogueira2008@gmail.com>
  • Loading branch information
schoren and mathnogueira committed Nov 3, 2023
1 parent e7e8d21 commit 29936fd
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 233 deletions.
33 changes: 31 additions & 2 deletions .github/workflows/pull-request.yaml
Expand Up @@ -304,9 +304,9 @@ jobs:
TEST_ENV="${{ matrix.test_env }}" \
./run.bash || (cat /tmp/docker-log; exit 1)
trace-testing:
trace-testing-memory:
needs: [build-docker]
name: Tracetesting API Server
name: Tracetesting API Server (InMemory)
runs-on: ubuntu-latest

steps:
Expand All @@ -333,6 +333,35 @@ jobs:
./scripts/wait-for-port.sh 11633
./run.sh tracetests || (cat /tmp/docker-log; exit 1)
trace-testing-nats:
needs: [build-docker]
name: Tracetesting API Server (NATS)
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3

- uses: actions/download-artifact@v3
with:
name: tracetest-dist
path: dist/

- name: Import image
run: |
docker load --input dist/image.tar
- name: Start services
run: |
./run.sh down up
./run.sh tracetest-logs > /tmp/docker-log &
- name: Run tests
run: |
chmod +x ./dist/tracetest ./dist/tracetest-server
./scripts/wait-for-port.sh 11633
NATS=true ./run.sh tracetests || (cat /tmp/docker-log; exit 1)
e2e-cli:
name: CLI e2e tests
needs: [build-docker]
Expand Down
67 changes: 67 additions & 0 deletions docker-compose.nats.yaml
@@ -0,0 +1,67 @@
version: "3.2"
services:
tracetest:
restart: unless-stopped
image: kubeshop/tracetest:${TAG:-latest}
extra_hosts:
- "host.docker.internal:host-gateway"
build:
context: .
volumes:
- type: bind
source: ./local-config/tracetest.config.nats.yaml
target: /app/tracetest.yaml
- type: bind
source: ./local-config/tracetest.provision.yaml
target: /app/provisioning.yaml
ports:
- 11633:11633
command: --provisioning-file /app/provisioning.yaml
healthcheck:
test: ["CMD", "wget", "--spider", "localhost:11633"]
interval: 1s
timeout: 3s
retries: 60
depends_on:
postgres:
condition: service_healthy
environment:
TRACETEST_DEV: ${TRACETEST_DEV}
TRACETEST_TESTPIPELINES_TRIGGEREXECUTE_ENABLED: ${TRACETEST_TESTPIPELINES_TRIGGEREXECUTE_ENABLED}
TRACETEST_TESTPIPELINES_TRACEFETCH_ENABLED: ${TRACETEST_TESTPIPELINES_TRACEFETCH_ENABLED}
TRACETEST_DATASTOREPIPELINES_TESTCONNECTION_ENABLED: ${TRACETEST_DATASTOREPIPELINES_TESTCONNECTION_ENABLED}

postgres:
image: postgres:15.2
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
ports:
- 5432:5432
healthcheck:
test: pg_isready -U "$$POSTGRES_USER" -d "$$POSTGRES_DB"
interval: 1s
timeout: 5s
retries: 60

otel-collector:
image: otel/opentelemetry-collector-contrib:0.59.0
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
- "55679:55679"
- "4317:4317"
- "8888:8888"
command:
- "--config"
- "/otel-local-config.yaml"
volumes:
- ./local-config/collector.config.yaml:/otel-local-config.yaml
depends_on:
- tracetest

nats:
image: nats:2.10-alpine
ports:
- "4222:4222" # connecting
- "8222:8222" # reporting server
4 changes: 3 additions & 1 deletion go.mod
Expand Up @@ -37,11 +37,11 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/j2gg0s/otsql v0.14.0
github.com/jackc/pgx/v5 v5.4.2
github.com/jdvr/go-again v1.0.0
github.com/jhump/protoreflect v1.12.0
github.com/json-iterator/go v1.1.12
github.com/labstack/gommon v0.3.0
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
github.com/nats-io/nats.go v1.31.0
github.com/ohler55/ojg v1.14.4
github.com/opensearch-project/opensearch-go v1.1.0
github.com/orlangure/gnomock v0.20.0
Expand Down Expand Up @@ -148,6 +148,8 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.2.1 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Expand Up @@ -1170,8 +1170,6 @@ github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE
github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jdvr/go-again v1.0.0 h1:eNHlD8mFc9Rq7aLMKhbyaFEz68zBuU9YsfPet99dZnA=
github.com/jdvr/go-again v1.0.0/go.mod h1:QCsfhX2LgPTeaHyC7xtqQ/a5LKtAp4Ukb0eEXV8I+7M=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
Expand Down Expand Up @@ -1425,8 +1423,13 @@ github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5Vgl
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI=
Expand Down
24 changes: 24 additions & 0 deletions local-config/tracetest.config.nats.yaml
@@ -0,0 +1,24 @@
postgres:
host: postgres
user: postgres
password: postgres
port: 5432
dbname: postgres
params: sslmode=disable

nats:
endpoint: nats:4222

telemetry:
exporters:
collector:
serviceName: tracetest
sampling: 100 # 100%
exporter:
type: collector
collector:
endpoint: otel-collector:4317

server:
telemetry:
exporter: collector
4 changes: 4 additions & 0 deletions run.sh
Expand Up @@ -5,6 +5,10 @@ set -e
export TAG=${TAG:-dev}

opts="-f docker-compose.yaml -f examples/docker-compose.demo.yaml"
# use nats version of docker-compose if NATS is set to true
if [ "$NATS" == "true" ]; then
opts="-f docker-compose.nats.yaml -f examples/docker-compose.demo.yaml"
fi

help_message() {
echo "usage: ./run.sh [cypress|tracetests|up|stop|build|down|tracetest-logs|logs|ps|restart]"
Expand Down
12 changes: 11 additions & 1 deletion server/app/app.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kubeshop/tracetest/server/openapi"
"github.com/kubeshop/tracetest/server/otlp"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/provisioning"
"github.com/kubeshop/tracetest/server/resourcemanager"
"github.com/kubeshop/tracetest/server/subscription"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/kubeshop/tracetest/server/traces"
"github.com/kubeshop/tracetest/server/variableset"
"github.com/kubeshop/tracetest/server/version"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -240,7 +242,14 @@ func (app *App) Start(opts ...appOption) error {
registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo, tracer)
}

natsConn, err := nats.Connect(app.cfg.NATSEndpoint())
if err != nil {
log.Printf("could not connect to NATS: %s. Defaulting to InMemory Queues", err)
}

executorDriverFactory := pipeline.NewDriverFactory[executor.Job](natsConn)
testPipeline := buildTestPipeline(
executorDriverFactory,
pool,
pollingProfileRepo,
dataStoreRepo,
Expand Down Expand Up @@ -274,9 +283,10 @@ func (app *App) Start(opts ...appOption) error {
testSuitePipeline.Stop()
})

testConnectionDriverFactory := pipeline.NewDriverFactory[testconnection.Job](natsConn)
dsTestListener := testconnection.NewListener()
dsTestPipeline := buildDataStoreTestPipeline(
pool,
testConnectionDriverFactory,
dsTestListener,
tracer,
tracedbFactory,
Expand Down
9 changes: 3 additions & 6 deletions server/app/ds_test_connection_pipeline.go
@@ -1,7 +1,6 @@
package app

import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/kubeshop/tracetest/server/config"
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/testconnection"
Expand All @@ -11,7 +10,7 @@ import (
)

func buildDataStoreTestPipeline(
pool *pgxpool.Pool,
driverFactory pipeline.DriverFactory[testconnection.Job],
dsTestListener *testconnection.Listener,
tracer trace.Tracer,
newTraceDBFn tracedb.FactoryFunc,
Expand All @@ -21,11 +20,9 @@ func buildDataStoreTestPipeline(
requestWorker := testconnection.NewDsTestConnectionRequest(tracer, newTraceDBFn, appConfig.DataStorePipelineTestConnectionEnabled())
notifyWorker := testconnection.NewDsTestConnectionNotify(dsTestListener, tracer)

pgQueue := pipeline.NewPostgresQueueDriver[testconnection.Job](pool, pgChannelName)

pipeline := pipeline.New(testconnection.NewConfigurer(meter),
pipeline.Step[testconnection.Job]{Processor: requestWorker, Driver: pgQueue.Channel("datastore_test_connection_request")},
pipeline.Step[testconnection.Job]{Processor: notifyWorker, Driver: pgQueue.Channel("datastore_test_connection_notify")},
pipeline.Step[testconnection.Job]{Processor: requestWorker, Driver: driverFactory.NewDriver("datastore_test_connection_request")},
pipeline.Step[testconnection.Job]{Processor: notifyWorker, Driver: driverFactory.NewDriver("datastore_test_connection_notify")},
)

return testconnection.NewDataStoreTestPipeline(pipeline, dsTestListener)
Expand Down
19 changes: 9 additions & 10 deletions server/app/test_pipeline.go
Expand Up @@ -20,6 +20,7 @@ import (
)

func buildTestPipeline(
driverFactory pipeline.DriverFactory[executor.Job],
pool *pgxpool.Pool,
ppRepo *pollingprofile.Repository,
dsRepo *datastore.Repository,
Expand Down Expand Up @@ -123,17 +124,15 @@ func buildTestPipeline(
WithInstanceID(instanceID).
WithMetricMeter(meter)

pgQueue := pipeline.NewPostgresQueueDriver[executor.Job](pool, pgChannelName)

pipeline := pipeline.New(queueBuilder,
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_resolver", triggerResolverWorker), Driver: pgQueue.Channel("trigger_resolve")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_executer", triggerExecuterWorker), Driver: pgQueue.Channel("trigger_execute")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_result_processor", triggerResultProcessorWorker), Driver: pgQueue.Channel("trigger_result")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_poller_starter", tracePollerStarterWorker), Driver: pgQueue.Channel("tracePoller_start")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_fetcher", traceFetcherWorker), Driver: pgQueue.Channel("tracePoller_fetch")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_poller_evaluator", tracePollerEvaluatorWorker), Driver: pgQueue.Channel("tracePoller_evaluate"), InputQueueOffset: -1},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("linter_runner", linterRunner), Driver: pgQueue.Channel("linterRunner")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("assertion_runner", assertionRunner), Driver: pgQueue.Channel("assertionRunner")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_resolver", triggerResolverWorker), Driver: driverFactory.NewDriver("trigger_resolve")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_executer", triggerExecuterWorker), Driver: driverFactory.NewDriver("trigger_execute")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trigger_result_processor", triggerResultProcessorWorker), Driver: driverFactory.NewDriver("trigger_result")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_poller_starter", tracePollerStarterWorker), Driver: driverFactory.NewDriver("tracePoller_start")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_fetcher", traceFetcherWorker), Driver: driverFactory.NewDriver("tracePoller_fetch")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("trace_poller_evaluator", tracePollerEvaluatorWorker), Driver: driverFactory.NewDriver("tracePoller_evaluate"), InputQueueOffset: -1},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("linter_runner", linterRunner), Driver: driverFactory.NewDriver("linterRunner")},
pipeline.Step[executor.Job]{Processor: workerMetricMiddlewareBuilder.New("assertion_runner", assertionRunner), Driver: driverFactory.NewDriver("assertionRunner")},
)

const assertionRunnerStepIndex = 7
Expand Down
7 changes: 7 additions & 0 deletions server/config/server.go
Expand Up @@ -205,3 +205,10 @@ func (c *AppConfig) AnalyticsFrontendKey() string {

return c.vp.GetString("analytics.frontendKey")
}

func (c *AppConfig) NATSEndpoint() string {
c.mu.Lock()
defer c.mu.Unlock()

return c.vp.GetString("nats.endpoint")
}
21 changes: 21 additions & 0 deletions server/pkg/pipeline/factory.go
@@ -0,0 +1,21 @@
package pipeline

import "github.com/nats-io/nats.go"

func NewDriverFactory[T any](natsConn *nats.Conn) DriverFactory[T] {
return DriverFactory[T]{
natsConn: natsConn,
}
}

type DriverFactory[T any] struct {
natsConn *nats.Conn
}

func (df DriverFactory[T]) NewDriver(channelName string) WorkerDriver[T] {
if df.natsConn != nil {
return NewNatsDriver[T](df.natsConn, channelName)
}

return NewInMemoryQueueDriver[T](channelName)
}

0 comments on commit 29936fd

Please sign in to comment.