Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix panic when under pressure #3098

Merged
merged 25 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,39 @@ jobs:
cd ./testing/cli-e2etest
make test

# TODO: this would be a great idea but it doesn't work on GHA with docker
# it can probablly be implemented with k8s in a separated job
# I'm leaving this as a reference and as a reminder to do it
# k6-test:
# name: k6 Load Test
# needs: [build-docker]
# runs-on: ubuntu-latest
# steps:
# - name: Checkout
# uses: actions/checkout@v3
# - name: Setup go
# uses: actions/setup-go@v3
# with:
# go-version-file: "go.work"
# cache: true
# cache-dependency-path: go.work
# - uses: actions/download-artifact@v3
# with:
# name: tracetest-dist
# path: dist/

# - name: Import image
# run: |
# docker load --input dist/image.tar
# - name: Run tests
# run: |
# chmod +x ./dist/tracetest ./testing/load/run.bash

# cd ./testing/load

# export TRACETEST_CLI="../../dist/tracetest"
# ./run.bash || (cat /tmp/docker-log; exit 1)

config-e2e:
runs-on: ubuntu-latest
outputs:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
VERSION?="dev"
TAG?=$(VERSION)
GORELEASER_VERSION=1.19.2-pro
GORELEASER_VERSION=1.20.0-pro

PROJECT_ROOT=${PWD}

Expand Down
1,173 changes: 2 additions & 1,171 deletions go.work.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions scripts/wait-for-port.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#!/bin/bash
PORT=$1

TIME_OUT=30s
TIMEOUT=${TIMEOUT:-"30s"}
CONDITION='nc -z -w 1 localhost '$PORT' > /dev/null 2>&1'
IF_TRUE='echo "port '$PORT' ready"'
IF_FALSE='echo "port '$PORT' not available, retry"'

ROOT_DIR=$(cd $(dirname "${BASH_SOURCE:-$0}")/.. && pwd)
$ROOT_DIR/scripts/wait.sh "$TIME_OUT" "$CONDITION" "$IF_TRUE" "$IF_FALSE"
$ROOT_DIR/scripts/wait.sh "$TIMEOUT" "$CONDITION" "$IF_TRUE" "$IF_FALSE"
10 changes: 9 additions & 1 deletion server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/openapi"
"github.com/kubeshop/tracetest/server/otlp"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/kubeshop/tracetest/server/provisioning"
"github.com/kubeshop/tracetest/server/resourcemanager"
"github.com/kubeshop/tracetest/server/subscription"
Expand Down Expand Up @@ -133,7 +134,11 @@ func (app *App) initAnalytics(configFromDB config.Config) error {
return analytics.Init(configFromDB.IsAnalyticsEnabled(), app.serverID, Version, Env)
}

var instanceID = id.GenerateID().String()

func (app *App) Start(opts ...appOption) error {
// instanceID is a temprary ID for this instance of the server
// it is regenerated on every start intentionally
for _, opt := range opts {
opt(app)
}
Expand All @@ -145,6 +150,7 @@ func (app *App) Start(opts ...appOption) error {
if err != nil {
return err
}
poolcfg.MaxConns = 20

pool, err := pgxpool.NewWithConfig(context.Background(), poolcfg)
if err != nil {
Expand All @@ -155,6 +161,8 @@ func (app *App) Start(opts ...appOption) error {
if err != nil {
return err
}
db.SetMaxOpenConns(80)

testDB, err := testdb.Postgres(
testdb.WithDB(db),
)
Expand Down Expand Up @@ -210,7 +218,7 @@ func (app *App) Start(opts ...appOption) error {
variableSetRepo := variableset.NewRepository(db)
linterRepo := analyzer.NewRepository(db)
testRepo := test.NewRepository(db)
runRepo := test.NewRunRepository(db)
runRepo := test.NewRunRepository(db, test.NewCache(instanceID))
testRunnerRepo := testrunner.NewRepository(db)
tracesRepo := traces.NewTraceRepository(db)

Expand Down
3 changes: 2 additions & 1 deletion server/app/test_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func buildTestPipeline(
WithDataStoreGetter(dsRepo).
WithPollingProfileGetter(ppRepo).
WithTestGetter(testRepo).
WithRunGetter(runRepo)
WithRunGetter(runRepo).
WithInstanceID(instanceID)

pgQueue := executor.NewPostgresQueueDriver(pool)

Expand Down
91 changes: 71 additions & 20 deletions server/executor/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"strconv"

"github.com/alitto/pond"

"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/http/middleware"
Expand All @@ -19,7 +22,8 @@ import (
)

const (
QueueWorkerCount = 5
QueueWorkerCount = 20
QueueWorkerBufferSize = QueueWorkerCount * 1_000 // 1k jobs per worker

JobCountHeader string = "X-Tracetest-Job-Count"
)
Expand Down Expand Up @@ -170,9 +174,13 @@ type subscriptor interface {
Subscribe(string, subscription.Subscriber)
}

type Listener interface {
Listen(Job)
}

type QueueDriver interface {
Enqueue(Job)
SetQueue(*Queue)
SetListener(Listener)
}

type QueueBuilder struct {
Expand All @@ -187,6 +195,8 @@ type QueueBuilder struct {

pollingProfiles pollingProfileGetter
dataStores dataStoreGetter

instanceID string
}

func NewQueueBuilder() *QueueBuilder {
Expand All @@ -208,6 +218,11 @@ func (qb *QueueBuilder) WithRunGetter(runs testRunGetter) *QueueBuilder {
return qb
}

func (qb *QueueBuilder) WithInstanceID(id string) *QueueBuilder {
qb.instanceID = id
return qb
}

func (qb *QueueBuilder) WithTestGetter(tests testGetter) *QueueBuilder {
qb.tests = tests
return qb
Expand Down Expand Up @@ -249,9 +264,12 @@ func (qb *QueueBuilder) Build(driver QueueDriver, itemProcessor QueueItemProcess

driver: driver,
itemProcessor: itemProcessor,
workerPool: pond.New(QueueWorkerCount, QueueWorkerBufferSize),

instanceID: qb.instanceID,
}

driver.SetQueue(queue)
driver.SetListener(queue)

return queue
}
Expand All @@ -271,11 +289,14 @@ type Queue struct {

itemProcessor QueueItemProcessor
driver QueueDriver
workerPool *pond.WorkerPool

instanceID string
}

func (q *Queue) SetDriver(driver QueueDriver) {
q.driver = driver
driver.SetQueue(q)
driver.SetListener(q)
}

func propagator() propagation.TextMapPropagator {
Expand All @@ -289,31 +310,40 @@ func (q Queue) Enqueue(ctx context.Context, job Job) {
return
}

if job.Headers == nil {
job.Headers = &headers{}
}
propagator().Inject(ctx, propagation.MapCarrier(*job.Headers))
// use a worker to enqueue the job in case the driver takes a bit to actually enqueue
// this way we release the caller as soon as possible
q.workerPool.Submit(func() {
if job.Headers == nil {
job.Headers = &headers{}
}
propagator().Inject(ctx, propagation.MapCarrier(*job.Headers))
job.Headers.Set("InstanceID", q.instanceID)

newJob := Job{
Headers: job.Headers,
newJob := Job{
Headers: job.Headers,

Test: test.Test{ID: job.Test.ID},
Run: test.Run{ID: job.Run.ID},
Test: test.Test{ID: job.Test.ID},
Run: test.Run{ID: job.Run.ID},

TestSuite: testsuite.TestSuite{ID: job.TestSuite.ID},
TestSuiteRun: testsuite.TestSuiteRun{ID: job.TestSuiteRun.ID},
TestSuite: testsuite.TestSuite{ID: job.TestSuite.ID},
TestSuiteRun: testsuite.TestSuiteRun{ID: job.TestSuiteRun.ID},

PollingProfile: pollingprofile.PollingProfile{ID: job.PollingProfile.ID},
DataStore: datastore.DataStore{ID: job.DataStore.ID},
}
PollingProfile: pollingprofile.PollingProfile{ID: job.PollingProfile.ID},
DataStore: datastore.DataStore{ID: job.DataStore.ID},
}
log.Printf("queue: enqueuing job for run %d", job.Run.ID)

q.driver.Enqueue(newJob)
q.driver.Enqueue(newJob)
})
}

func (q Queue) Listen(job Job) {
log.Printf("queue: received job for run %d", job.Run.ID)
// this is called when a new job is put in the queue and we need to process it
ctx := propagator().Extract(context.Background(), propagation.MapCarrier(*job.Headers))

ctx = context.WithValue(ctx, "LastInstanceID", job.Headers.Get("InstanceID"))

ctx, cancelCtx := context.WithCancel(ctx)
q.listenForStopRequests(context.Background(), cancelCtx, job)

Expand All @@ -338,7 +368,15 @@ func (q Queue) Listen(job Job) {
case <-ctx.Done():
return
}
q.itemProcessor.ProcessItem(ctx, newJob)

q.workerPool.Submit(func() {
log.Printf("queue: submit to processItem fn for run %d", job.Run.ID)
q.itemProcessor.ProcessItem(ctx, newJob)
})
}

func (q *Queue) Stop() {
q.workerPool.StopAndWait()
}

type StopRequest struct {
Expand Down Expand Up @@ -483,6 +521,11 @@ func (b tenantPropagator) Inject(ctx context.Context, carrier propagation.TextMa
if tenantID != "" {
carrier.Set(string(middleware.TenantIDKey), tenantID)
}

instanceID := ctx.Value("instanceID")
if instanceID != nil {
carrier.Set("instanceID", instanceID.(string))
}
}

// Extract returns a copy of parent with the baggage from the carrier added.
Expand All @@ -492,7 +535,15 @@ func (b tenantPropagator) Extract(parent context.Context, carrier propagation.Te
return parent
}

return context.WithValue(parent, middleware.TenantIDKey, tenantID)
resultingCtx := context.WithValue(parent, middleware.TenantIDKey, tenantID)

instanceID := carrier.Get("instanceID")
if instanceID != "" {
resultingCtx = context.WithValue(resultingCtx, "instanceID", instanceID)
}

return resultingCtx

}

// Fields returns the keys who's values are set with Inject.
Expand Down
38 changes: 18 additions & 20 deletions server/executor/queue_driver_in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,34 @@ func NewInMemoryQueueDriver(name string) *InMemoryQueueDriver {
}

type InMemoryQueueDriver struct {
log loggerFn
queue chan Job
exit chan bool
q *Queue
name string
log loggerFn
queue chan Job
exit chan bool
listener Listener
name string
}

func (qd *InMemoryQueueDriver) SetQueue(q *Queue) {
qd.q = q
func (qd *InMemoryQueueDriver) SetListener(l Listener) {
qd.listener = l
}

func (qd InMemoryQueueDriver) Enqueue(job Job) {
qd.queue <- job
}

func (qd InMemoryQueueDriver) Start() {
for i := 0; i < QueueWorkerCount; i++ {
go func() {
qd.log("start")
for {
select {
case <-qd.exit:
qd.log("exit")
return
case job := <-qd.queue:
qd.q.Listen(job)
}
go func() {
qd.log("start")
for {
select {
case <-qd.exit:
qd.log("exit")
return
case job := <-qd.queue:
qd.listener.Listen(job)
}
}()
}
}
}()
}

func (qd InMemoryQueueDriver) Stop() {
Expand Down