Skip to content

Commit

Permalink
fix panic when under pressure (#3098)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Aug 30, 2023
1 parent 8b7a8d6 commit 1093d04
Show file tree
Hide file tree
Showing 29 changed files with 552 additions and 1,274 deletions.
33 changes: 33 additions & 0 deletions .github/workflows/pull-request.yaml
Expand Up @@ -338,6 +338,39 @@ jobs:
cd ./testing/cli-e2etest
make test
# TODO: this would be a great idea but it doesn't work on GHA with docker
# it can probablly be implemented with k8s in a separated job
# I'm leaving this as a reference and as a reminder to do it
# k6-test:
# name: k6 Load Test
# needs: [build-docker]
# runs-on: ubuntu-latest
# steps:
# - name: Checkout
# uses: actions/checkout@v3
# - name: Setup go
# uses: actions/setup-go@v3
# with:
# go-version-file: "go.work"
# cache: true
# cache-dependency-path: go.work
# - uses: actions/download-artifact@v3
# with:
# name: tracetest-dist
# path: dist/

# - name: Import image
# run: |
# docker load --input dist/image.tar
# - name: Run tests
# run: |
# chmod +x ./dist/tracetest ./testing/load/run.bash

# cd ./testing/load

# export TRACETEST_CLI="../../dist/tracetest"
# ./run.bash || (cat /tmp/docker-log; exit 1)

config-e2e:
runs-on: ubuntu-latest
outputs:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
@@ -1,6 +1,6 @@
VERSION?="dev"
TAG?=$(VERSION)
GORELEASER_VERSION=1.19.2-pro
GORELEASER_VERSION=1.20.0-pro

PROJECT_ROOT=${PWD}

Expand Down
1,173 changes: 2 additions & 1,171 deletions go.work.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions scripts/wait-for-port.sh
@@ -1,10 +1,10 @@
#!/bin/bash
PORT=$1

TIME_OUT=30s
TIMEOUT=${TIMEOUT:-"30s"}
CONDITION='nc -z -w 1 localhost '$PORT' > /dev/null 2>&1'
IF_TRUE='echo "port '$PORT' ready"'
IF_FALSE='echo "port '$PORT' not available, retry"'

ROOT_DIR=$(cd $(dirname "${BASH_SOURCE:-$0}")/.. && pwd)
$ROOT_DIR/scripts/wait.sh "$TIME_OUT" "$CONDITION" "$IF_TRUE" "$IF_FALSE"
$ROOT_DIR/scripts/wait.sh "$TIMEOUT" "$CONDITION" "$IF_TRUE" "$IF_FALSE"
10 changes: 9 additions & 1 deletion server/app/app.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/openapi"
"github.com/kubeshop/tracetest/server/otlp"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/kubeshop/tracetest/server/provisioning"
"github.com/kubeshop/tracetest/server/resourcemanager"
"github.com/kubeshop/tracetest/server/subscription"
Expand Down Expand Up @@ -133,7 +134,11 @@ func (app *App) initAnalytics(configFromDB config.Config) error {
return analytics.Init(configFromDB.IsAnalyticsEnabled(), app.serverID, Version, Env)
}

var instanceID = id.GenerateID().String()

func (app *App) Start(opts ...appOption) error {
// instanceID is a temprary ID for this instance of the server
// it is regenerated on every start intentionally
for _, opt := range opts {
opt(app)
}
Expand All @@ -145,6 +150,7 @@ func (app *App) Start(opts ...appOption) error {
if err != nil {
return err
}
poolcfg.MaxConns = 20

pool, err := pgxpool.NewWithConfig(context.Background(), poolcfg)
if err != nil {
Expand All @@ -155,6 +161,8 @@ func (app *App) Start(opts ...appOption) error {
if err != nil {
return err
}
db.SetMaxOpenConns(80)

testDB, err := testdb.Postgres(
testdb.WithDB(db),
)
Expand Down Expand Up @@ -210,7 +218,7 @@ func (app *App) Start(opts ...appOption) error {
variableSetRepo := variableset.NewRepository(db)
linterRepo := analyzer.NewRepository(db)
testRepo := test.NewRepository(db)
runRepo := test.NewRunRepository(db)
runRepo := test.NewRunRepository(db, test.NewCache(instanceID))
testRunnerRepo := testrunner.NewRepository(db)
tracesRepo := traces.NewTraceRepository(db)

Expand Down
3 changes: 2 additions & 1 deletion server/app/test_pipeline.go
Expand Up @@ -86,7 +86,8 @@ func buildTestPipeline(
WithDataStoreGetter(dsRepo).
WithPollingProfileGetter(ppRepo).
WithTestGetter(testRepo).
WithRunGetter(runRepo)
WithRunGetter(runRepo).
WithInstanceID(instanceID)

pgQueue := executor.NewPostgresQueueDriver(pool)

Expand Down
91 changes: 71 additions & 20 deletions server/executor/queue.go
Expand Up @@ -6,8 +6,11 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"strconv"

"github.com/alitto/pond"

"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/http/middleware"
Expand All @@ -19,7 +22,8 @@ import (
)

const (
QueueWorkerCount = 5
QueueWorkerCount = 20
QueueWorkerBufferSize = QueueWorkerCount * 1_000 // 1k jobs per worker

JobCountHeader string = "X-Tracetest-Job-Count"
)
Expand Down Expand Up @@ -170,9 +174,13 @@ type subscriptor interface {
Subscribe(string, subscription.Subscriber)
}

type Listener interface {
Listen(Job)
}

type QueueDriver interface {
Enqueue(Job)
SetQueue(*Queue)
SetListener(Listener)
}

type QueueBuilder struct {
Expand All @@ -187,6 +195,8 @@ type QueueBuilder struct {

pollingProfiles pollingProfileGetter
dataStores dataStoreGetter

instanceID string
}

func NewQueueBuilder() *QueueBuilder {
Expand All @@ -208,6 +218,11 @@ func (qb *QueueBuilder) WithRunGetter(runs testRunGetter) *QueueBuilder {
return qb
}

func (qb *QueueBuilder) WithInstanceID(id string) *QueueBuilder {
qb.instanceID = id
return qb
}

func (qb *QueueBuilder) WithTestGetter(tests testGetter) *QueueBuilder {
qb.tests = tests
return qb
Expand Down Expand Up @@ -249,9 +264,12 @@ func (qb *QueueBuilder) Build(driver QueueDriver, itemProcessor QueueItemProcess

driver: driver,
itemProcessor: itemProcessor,
workerPool: pond.New(QueueWorkerCount, QueueWorkerBufferSize),

instanceID: qb.instanceID,
}

driver.SetQueue(queue)
driver.SetListener(queue)

return queue
}
Expand All @@ -271,11 +289,14 @@ type Queue struct {

itemProcessor QueueItemProcessor
driver QueueDriver
workerPool *pond.WorkerPool

instanceID string
}

func (q *Queue) SetDriver(driver QueueDriver) {
q.driver = driver
driver.SetQueue(q)
driver.SetListener(q)
}

func propagator() propagation.TextMapPropagator {
Expand All @@ -289,31 +310,40 @@ func (q Queue) Enqueue(ctx context.Context, job Job) {
return
}

if job.Headers == nil {
job.Headers = &headers{}
}
propagator().Inject(ctx, propagation.MapCarrier(*job.Headers))
// use a worker to enqueue the job in case the driver takes a bit to actually enqueue
// this way we release the caller as soon as possible
q.workerPool.Submit(func() {
if job.Headers == nil {
job.Headers = &headers{}
}
propagator().Inject(ctx, propagation.MapCarrier(*job.Headers))
job.Headers.Set("InstanceID", q.instanceID)

newJob := Job{
Headers: job.Headers,
newJob := Job{
Headers: job.Headers,

Test: test.Test{ID: job.Test.ID},
Run: test.Run{ID: job.Run.ID},
Test: test.Test{ID: job.Test.ID},
Run: test.Run{ID: job.Run.ID},

TestSuite: testsuite.TestSuite{ID: job.TestSuite.ID},
TestSuiteRun: testsuite.TestSuiteRun{ID: job.TestSuiteRun.ID},
TestSuite: testsuite.TestSuite{ID: job.TestSuite.ID},
TestSuiteRun: testsuite.TestSuiteRun{ID: job.TestSuiteRun.ID},

PollingProfile: pollingprofile.PollingProfile{ID: job.PollingProfile.ID},
DataStore: datastore.DataStore{ID: job.DataStore.ID},
}
PollingProfile: pollingprofile.PollingProfile{ID: job.PollingProfile.ID},
DataStore: datastore.DataStore{ID: job.DataStore.ID},
}
log.Printf("queue: enqueuing job for run %d", job.Run.ID)

q.driver.Enqueue(newJob)
q.driver.Enqueue(newJob)
})
}

func (q Queue) Listen(job Job) {
log.Printf("queue: received job for run %d", job.Run.ID)
// this is called when a new job is put in the queue and we need to process it
ctx := propagator().Extract(context.Background(), propagation.MapCarrier(*job.Headers))

ctx = context.WithValue(ctx, "LastInstanceID", job.Headers.Get("InstanceID"))

ctx, cancelCtx := context.WithCancel(ctx)
q.listenForStopRequests(context.Background(), cancelCtx, job)

Expand All @@ -338,7 +368,15 @@ func (q Queue) Listen(job Job) {
case <-ctx.Done():
return
}
q.itemProcessor.ProcessItem(ctx, newJob)

q.workerPool.Submit(func() {
log.Printf("queue: submit to processItem fn for run %d", job.Run.ID)
q.itemProcessor.ProcessItem(ctx, newJob)
})
}

func (q *Queue) Stop() {
q.workerPool.StopAndWait()
}

type StopRequest struct {
Expand Down Expand Up @@ -483,6 +521,11 @@ func (b tenantPropagator) Inject(ctx context.Context, carrier propagation.TextMa
if tenantID != "" {
carrier.Set(string(middleware.TenantIDKey), tenantID)
}

instanceID := ctx.Value("instanceID")
if instanceID != nil {
carrier.Set("instanceID", instanceID.(string))
}
}

// Extract returns a copy of parent with the baggage from the carrier added.
Expand All @@ -492,7 +535,15 @@ func (b tenantPropagator) Extract(parent context.Context, carrier propagation.Te
return parent
}

return context.WithValue(parent, middleware.TenantIDKey, tenantID)
resultingCtx := context.WithValue(parent, middleware.TenantIDKey, tenantID)

instanceID := carrier.Get("instanceID")
if instanceID != "" {
resultingCtx = context.WithValue(resultingCtx, "instanceID", instanceID)
}

return resultingCtx

}

// Fields returns the keys who's values are set with Inject.
Expand Down
38 changes: 18 additions & 20 deletions server/executor/queue_driver_in_memory.go
Expand Up @@ -23,36 +23,34 @@ func NewInMemoryQueueDriver(name string) *InMemoryQueueDriver {
}

type InMemoryQueueDriver struct {
log loggerFn
queue chan Job
exit chan bool
q *Queue
name string
log loggerFn
queue chan Job
exit chan bool
listener Listener
name string
}

func (qd *InMemoryQueueDriver) SetQueue(q *Queue) {
qd.q = q
func (qd *InMemoryQueueDriver) SetListener(l Listener) {
qd.listener = l
}

func (qd InMemoryQueueDriver) Enqueue(job Job) {
qd.queue <- job
}

func (qd InMemoryQueueDriver) Start() {
for i := 0; i < QueueWorkerCount; i++ {
go func() {
qd.log("start")
for {
select {
case <-qd.exit:
qd.log("exit")
return
case job := <-qd.queue:
qd.q.Listen(job)
}
go func() {
qd.log("start")
for {
select {
case <-qd.exit:
qd.log("exit")
return
case job := <-qd.queue:
qd.listener.Listen(job)
}
}()
}
}
}()
}

func (qd InMemoryQueueDriver) Stop() {
Expand Down

0 comments on commit 1093d04

Please sign in to comment.