Skip to content

Commit

Permalink
feat(server): implement postgres queue driver (#3030)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Aug 4, 2023
1 parent 64da9a7 commit f296ab5
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 60 deletions.
12 changes: 12 additions & 0 deletions server/app/app.go
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/kubeshop/tracetest/server/analytics"
"github.com/kubeshop/tracetest/server/assertions/comparator"
"github.com/kubeshop/tracetest/server/config"
Expand Down Expand Up @@ -140,6 +141,16 @@ func (app *App) Start(opts ...appOption) error {
fmt.Println("Starting")
ctx := context.Background()

poolcfg, err := pgxpool.ParseConfig(app.cfg.PostgresConnString())
if err != nil {
return err
}

pool, err := pgxpool.NewWithConfig(context.Background(), poolcfg)
if err != nil {
return err
}

db, err := testdb.Connect(app.cfg.PostgresConnString())
if err != nil {
return err
Expand Down Expand Up @@ -212,6 +223,7 @@ func (app *App) Start(opts ...appOption) error {
registerOtlpServer(app, tracesRepo, runRepo, eventEmitter, dataStoreRepo)

testPipeline := buildTestPipeline(
pool,
pollingProfileRepo,
dataStoreRepo,
linterRepo,
Expand Down
14 changes: 8 additions & 6 deletions server/app/test_pipeline.go
@@ -1,6 +1,7 @@
package app

import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
Expand All @@ -15,6 +16,7 @@ import (
)

func buildTestPipeline(
pool *pgxpool.Pool,
ppRepo *pollingprofile.Repository,
dsRepo *datastore.Repository,
lintRepo *analyzer.Repository,
Expand Down Expand Up @@ -86,15 +88,15 @@ func buildTestPipeline(
WithTestGetter(testRepo).
WithRunGetter(runRepo)

pgQueue := executor.NewPostgresQueueDriver(pool)

pipeline := executor.NewPipeline(queueBuilder,
executor.PipelineStep{Processor: runner, Driver: executor.NewInMemoryQueueDriver("runner")},
executor.PipelineStep{Processor: tracePoller, Driver: executor.NewInMemoryQueueDriver("tracePoller")},
executor.PipelineStep{Processor: linterRunner, Driver: executor.NewInMemoryQueueDriver("linterRunner")},
executor.PipelineStep{Processor: assertionRunner, Driver: executor.NewInMemoryQueueDriver("assertionRunner")},
executor.PipelineStep{Processor: runner, Driver: pgQueue.Channel("runner")},
executor.PipelineStep{Processor: tracePoller, Driver: pgQueue.Channel("tracePoller")},
executor.PipelineStep{Processor: linterRunner, Driver: pgQueue.Channel("linterRunner")},
executor.PipelineStep{Processor: assertionRunner, Driver: pgQueue.Channel("assertionRunner")},
)

pipeline.Start()

const assertionRunnerStepIndex = 3

return executor.NewTestPipeline(
Expand Down
98 changes: 47 additions & 51 deletions server/executor/queue.go
Expand Up @@ -3,9 +3,9 @@ package executor
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"strconv"

"github.com/kubeshop/tracetest/server/datastore"
Expand All @@ -18,6 +18,8 @@ import (
)

const (
QueueWorkerCount = 5

JobCountHeader string = "X-Tracetest-Job-Count"
)

Expand Down Expand Up @@ -77,6 +79,46 @@ type Job struct {
DataStore datastore.DataStore
}

type jsonJob struct {
Headers *headers `json:"headers"`
TransactionID string `json:"transaction_id"`
TransactionRunID int `json:"transaction_run_id"`
TestID string `json:"test_id"`
RunID int `json:"run_id"`
PollingProfileID string `json:"polling_profile_id"`
DataStoreID string `json:"data_store_id"`
}

func (job Job) MarshalJSON() ([]byte, error) {
return json.Marshal(jsonJob{
Headers: job.Headers,
TransactionID: job.Transaction.ID.String(),
TransactionRunID: job.TransactionRun.ID,
TestID: job.Test.ID.String(),
RunID: job.Run.ID,
PollingProfileID: job.PollingProfile.ID.String(),
DataStoreID: job.DataStore.ID.String(),
})
}

func (job *Job) UnmarshalJSON(data []byte) error {
var jj jsonJob
err := json.Unmarshal(data, &jj)
if err != nil {
return err
}

job.Headers = jj.Headers
job.Transaction.ID = id.ID(jj.TransactionID)
job.TransactionRun.ID = jj.TransactionRunID
job.Test.ID = id.ID(jj.TestID)
job.Run.ID = jj.RunID
job.PollingProfile.ID = id.ID(jj.PollingProfileID)
job.DataStore.ID = id.ID(jj.DataStoreID)

return nil
}

func NewJob() Job {
return Job{
Headers: &headers{},
Expand Down Expand Up @@ -252,7 +294,7 @@ func (q Queue) Enqueue(ctx context.Context, job Job) {
Headers: job.Headers,

Test: test.Test{ID: job.Test.ID},
Run: job.Run,
Run: test.Run{ID: job.Run.ID},

Transaction: transaction.Transaction{ID: job.Transaction.ID},
TransactionRun: transaction.TransactionRun{ID: job.TransactionRun.ID},
Expand All @@ -277,8 +319,9 @@ func (q Queue) Listen(job Job) {
}
newJob.Test = q.resolveTest(ctx, job)
// todo: revert when using actual queues
// newJob.Run = q.resolveTestRun(ctx, job)
newJob.Run = job.Run
newJob.Run = q.resolveTestRun(ctx, job)
// todo: change the otlp server to have its own table
// newJob.Run = job.Run

newJob.Transaction = q.resolveTransaction(ctx, job)
newJob.TransactionRun = q.resolveTransactionRun(ctx, job)
Expand Down Expand Up @@ -427,50 +470,3 @@ func (q Queue) resolveDataStore(ctx context.Context, job Job) datastore.DataStor

return ds
}

func NewInMemoryQueueDriver(name string) *InMemoryQueueDriver {
return &InMemoryQueueDriver{
queue: make(chan Job),
exit: make(chan bool),
name: name,
}
}

type InMemoryQueueDriver struct {
queue chan Job
exit chan bool
q *Queue
name string
}

func (r *InMemoryQueueDriver) SetQueue(q *Queue) {
r.q = q
}

func (r InMemoryQueueDriver) Enqueue(job Job) {
r.queue <- job
}

const inMemoryQueueWorkerCount = 5

func (r InMemoryQueueDriver) Start() {
for i := 0; i < inMemoryQueueWorkerCount; i++ {
go func() {
log.Printf("[InMemoryQueueDriver - %s] start", r.name)
for {
select {
case <-r.exit:
log.Printf("[InMemoryQueueDriver - %s] exit", r.name)
return
case job := <-r.queue:
r.q.Listen(job)
}
}
}()
}
}

func (r InMemoryQueueDriver) Stop() {
log.Printf("[InMemoryQueueDriver - %s] stopping", r.name)
r.exit <- true
}
61 changes: 61 additions & 0 deletions server/executor/queue_driver_in_memory.go
@@ -0,0 +1,61 @@
package executor

import (
"fmt"
"log"
)

type loggerFn func(string, ...any)

func newLoggerFn(name string) loggerFn {
return func(format string, params ...any) {
log.Printf("[%s] %s", name, fmt.Sprintf(format, params...))
}
}

func NewInMemoryQueueDriver(name string) *InMemoryQueueDriver {
return &InMemoryQueueDriver{
log: newLoggerFn(fmt.Sprintf("InMemoryQueueDriver - %s", name)),
queue: make(chan Job),
exit: make(chan bool),
name: name,
}
}

type InMemoryQueueDriver struct {
log loggerFn
queue chan Job
exit chan bool
q *Queue
name string
}

func (qd *InMemoryQueueDriver) SetQueue(q *Queue) {
qd.q = q
}

func (qd InMemoryQueueDriver) Enqueue(job Job) {
qd.queue <- job
}

func (qd InMemoryQueueDriver) Start() {
for i := 0; i < QueueWorkerCount; i++ {
go func() {
qd.log("start")
for {
select {
case <-qd.exit:
qd.log("exit")
return
case job := <-qd.queue:
qd.q.Listen(job)
}
}
}()
}
}

func (qd InMemoryQueueDriver) Stop() {
qd.log("stopping")
qd.exit <- true
}

0 comments on commit f296ab5

Please sign in to comment.