Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow users to stop a running test #2367

Merged
merged 18 commits into from
Apr 13, 2023
59 changes: 30 additions & 29 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,6 @@ jobs:
location: ${{ secrets.GKE_ZONE }}
credentials: ${{ secrets.GKE_SA_KEY }}

- name: Forward port
run: |
kubectl port-forward \
--namespace tracetest-pr-${{ github.event.pull_request.number }} \
svc/tracetest-pr-${{ github.event.pull_request.number }} 3000:11633 &

- name: Cache dependencies
id: cache-nodemodules
uses: actions/cache@v3
Expand All @@ -382,33 +376,40 @@ jobs:
if: steps.cache-nodemodules.outputs.cache-hit != 'true'
run: cd web/; npm ci


- name: Forward port
run: |
kubectl port-forward \
--namespace tracetest-pr-${{ github.event.pull_request.number }} \
svc/tracetest-pr-${{ github.event.pull_request.number }} 3000:11633 &

- name: Run integration tests
run: |
./scripts/wait-for-port.sh 3000
./scripts/wait-for-port.sh 3000 &
cd web/
npm run cy:ci
env:
CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }}

cleanup:
name: Cleanup test infra
runs-on: ubuntu-latest
needs: [trace-testing, e2e]
if: always()
steps:
- uses: google-github-actions/setup-gcloud@94337306dda8180d967a56932ceb4ddcf01edae7
with:
service_account_key: ${{ secrets.GKE_SA_KEY }}
project_id: ${{ secrets.GKE_PROJECT }}

- uses: google-github-actions/get-gke-credentials@fb08709ba27618c31c09e014e1d8364b02e5042e
with:
cluster_name: ${{ secrets.GKE_CLUSTER }}
location: ${{ secrets.GKE_ZONE }}
credentials: ${{ secrets.GKE_SA_KEY }}

- name: Uninstall tracetest
run: |
helm delete tracetest-pr-${{ github.event.pull_request.number }} \
--namespace tracetest-pr-${{ github.event.pull_request.number }}
kubectl delete ns tracetest-pr-${{ github.event.pull_request.number }}
# cleanup:
# name: Cleanup test infra
# runs-on: ubuntu-latest
# needs: [trace-testing, e2e]
# if: always()
# steps:
# - uses: google-github-actions/setup-gcloud@94337306dda8180d967a56932ceb4ddcf01edae7
# with:
# service_account_key: ${{ secrets.GKE_SA_KEY }}
# project_id: ${{ secrets.GKE_PROJECT }}

# - uses: google-github-actions/get-gke-credentials@fb08709ba27618c31c09e014e1d8364b02e5042e
# with:
# cluster_name: ${{ secrets.GKE_CLUSTER }}
# location: ${{ secrets.GKE_ZONE }}
# credentials: ${{ secrets.GKE_SA_KEY }}

# - name: Uninstall tracetest
# run: |
# helm delete tracetest-pr-${{ github.event.pull_request.number }} \
# --namespace tracetest-pr-${{ github.event.pull_request.number }}
# kubectl delete ns tracetest-pr-${{ github.event.pull_request.number }}
2 changes: 1 addition & 1 deletion api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ paths:
application/yaml:
schema:
type: string
/test/{testId}/run/{runId}/stop:
/tests/{testId}/run/{runId}/stop:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all test endpoints are actually /tests with a trailing s

post:
tags:
- api
Expand Down
1 change: 1 addition & 0 deletions api/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ components:
AWAITING_TRACE,
AWAITING_TEST_RESULTS,
FINISHED,
STOPPED,
TRIGGER_FAILED,
TRACE_FAILED,
ASSERTION_FAILED,
Expand Down
2 changes: 1 addition & 1 deletion cli/openapi/api_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion local-config/collector.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ exporters:
loglevel: debug

otlp/1:
endpoint: host.docker.internal:21321
endpoint: tracetest:21321
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was probablly a leftover from someone doing docker tests, it should point to the internal url

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is this file intended just for local testing, or do we use it in some pipelines?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local testing. It's used when you run the ./run.sh script for example, that is mainly targetted to run an env similar to CI, so you can run the tracetesting tests locally easily.

tls:
insecure: true

Expand Down
9 changes: 9 additions & 0 deletions local-config/tracetest.provision.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@ spec:
name: OpenTelemetry Collector
type: otlp
isdefault: true
---
type: Demo
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addin a default demo locally eased the testing process

spec:
name: "Pokeshop"
enabled: true
type: pokeshop
pokeshop:
grpcEndpoint: demo-api:8082
httpEndpoint: http://demo-api:8081
9 changes: 9 additions & 0 deletions scripts/check-port.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
PORT=$1

CONDITION='nc -z -w 1 localhost '$PORT' > /dev/null 2>&1'
IF_TRUE='echo "port '$PORT' ready"'
IF_FALSE='echo "port '$PORT' not available, retry"'

set -ex
bash -c "until ${CONDITION}; do ${IF_FALSE}; sleep 1; done; ${IF_TRUE}"
15 changes: 15 additions & 0 deletions server/app/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,33 @@ import (
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/id"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type runnerFacade struct {
sm *subscription.Manager
runner executor.PersistentRunner
transactionRunner executor.PersistentTransactionRunner
assertionRunner executor.AssertionRunner
tracePoller executor.PersistentTracePoller
}

func (rf runnerFacade) StopTest(testID id.ID, runID int) {
sr := executor.StopRequest{
TestID: testID,
RunID: runID,
}

rf.sm.PublishUpdate(subscription.Message{
ResourceID: sr.ResourceID(),
Content: sr,
})
}

func (rf runnerFacade) RunTest(ctx context.Context, test model.Test, rm model.RunMetadata, env model.Environment) model.Run {
return rf.runner.Run(ctx, test, rm, env)
}
Expand Down Expand Up @@ -90,6 +104,7 @@ func newRunnerFacades(
)

return &runnerFacade{
sm: subscriptionManager,
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
Expand Down
2 changes: 1 addition & 1 deletion server/executor/assertion_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type AssertionExecutor interface {

type defaultAssertionExecutor struct{}

func (e defaultAssertionExecutor) Assert(ctx context.Context, defs model.OrderedMap[model.SpanQuery, model.NamedAssertions], trace model.Trace, ds []expression.DataStore) (model.OrderedMap[model.SpanQuery, []model.AssertionResult], bool) {
func (e defaultAssertionExecutor) Assert(_ context.Context, defs model.OrderedMap[model.SpanQuery, model.NamedAssertions], trace model.Trace, ds []expression.DataStore) (model.OrderedMap[model.SpanQuery, []model.AssertionResult], bool) {
testResult := model.OrderedMap[model.SpanQuery, []model.AssertionResult]{}
allPassed := true
defs.ForEach(func(spanQuery model.SpanQuery, asserts model.NamedAssertions) error {
Expand Down
1 change: 1 addition & 0 deletions server/executor/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func NewEventEmitter(repository model.TestRunEventRepository, publisher publishe
return &internalEventEmitter{
repository: repository,
publisher: publisher,
mutex: sync.Mutex{},
}
}

Expand Down
54 changes: 54 additions & 0 deletions server/executor/run_stop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package executor

import (
"context"
"log"

"github.com/kubeshop/tracetest/server/id"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/subscription"
)

type StopRequest struct {
TestID id.ID
RunID int
}

func (sr StopRequest) ResourceID() string {
runID := (model.Run{ID: sr.RunID, TestID: sr.TestID}).ResourceID()
return runID + "/stop"
}

func (r persistentRunner) listenForStopRequests(ctx context.Context, cancelCtx context.CancelFunc, run model.Run) {
sfn := subscription.NewSubscriberFunction(func(m subscription.Message) error {
stopRequest, ok := m.Content.(StopRequest)
if !ok {
return nil
}

ctx, _ := r.tracer.Start(ctx, "User Requested Stop Run")
// refresh data from DB to make sure we have the latest possible data before updating
run, err := r.runs.GetRun(ctx, stopRequest.TestID, stopRequest.RunID)
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to get test run data: %s \n", stopRequest.TestID, stopRequest.RunID, err.Error())
return err
}

run = run.Stopped()
r.handleDBError(run, r.updater.Update(ctx, run))

evt := events.TraceStoppedInfo(stopRequest.TestID, stopRequest.RunID)
err = r.eventEmitter.Emit(ctx, evt)
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to emit TraceStoppedInfo event: %s \n", stopRequest.TestID, stopRequest.RunID, err.Error())
return err
}

cancelCtx()

return nil
})

r.subscriptionManager.Subscribe((StopRequest{run.TestID, run.ID}).ResourceID(), sfn)
}
17 changes: 10 additions & 7 deletions server/executor/runner.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to think about in another PR: is there a way to add a unit test that checks if the runner behaved correctly on a cancel case?

Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ type persistentRunner struct {
}

type execReq struct {
ctx context.Context
test model.Test
run model.Run
subscriptionManager *subscription.Manager
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this var was unused

Headers propagation.MapCarrier
executor expression.Executor
ctx context.Context
test model.Test
run model.Run
Headers propagation.MapCarrier
executor expression.Executor
}

func (r persistentRunner) handleDBError(run model.Run, err error) {
Expand Down Expand Up @@ -133,14 +132,18 @@ func getNewCtx(ctx context.Context) context.Context {
}

func (r persistentRunner) Run(ctx context.Context, test model.Test, metadata model.RunMetadata, environment model.Environment) model.Run {
ctx = getNewCtx(ctx)
ctx, cancelCtx := context.WithCancel(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to make the context cancellable, since it isn't by default

getNewCtx(ctx),
)

run := model.NewRun()
run.Metadata = metadata
run.Environment = environment
run, err := r.runs.CreateRun(ctx, test, run)
r.handleDBError(run, err)

r.listenForStopRequests(ctx, cancelCtx, run)

ds := []expression.DataStore{expression.EnvironmentDataStore{
Values: environment.Values,
}}
Expand Down
15 changes: 14 additions & 1 deletion server/executor/trace_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ func (tp tracePoller) enqueueJob(job PollingRequest) {
}

func (tp tracePoller) processJob(job PollingRequest) {
select {
default:
case <-job.ctx.Done():
log.Printf("[TracePoller] Context cancelled.")
return
}

if job.IsFirstRequest() {
err := tp.eventEmitter.Emit(job.ctx, events.TracePollingStart(job.test.ID, job.run.ID))
if err != nil {
Expand Down Expand Up @@ -191,7 +198,13 @@ func (tp tracePoller) runAssertions(job PollingRequest) {
func (tp tracePoller) handleTraceDBError(job PollingRequest, err error) (bool, string) {
run := job.run

pp := *tp.ppGetter.GetDefault(job.ctx).Periodic
profile := tp.ppGetter.GetDefault(job.ctx)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sometimes the context can get cancelled in the middle of this func call, causing a nil pointer panic. This new approach handles the case more gracefully

if profile.Periodic == nil {
log.Println("[TracePoller] cannot get polling profile.")
return true, "Cannot get polling profile"
}

pp := *profile.Periodic

// Edge case: the trace still not avaiable on Data Store during polling
if errors.Is(err, connection.ErrTraceNotFound) && time.Since(run.ServiceTriggeredAt) < pp.TimeoutDuration() {
Expand Down