Skip to content

Commit

Permalink
feat: allow users to stop a running test (#2367)
Browse files Browse the repository at this point in the history
  • Loading branch information
schoren committed Apr 13, 2023
1 parent 2d92e45 commit ba47e42
Show file tree
Hide file tree
Showing 21 changed files with 207 additions and 78 deletions.
59 changes: 30 additions & 29 deletions .github/workflows/pull-request.yaml
Expand Up @@ -362,12 +362,6 @@ jobs:
location: ${{ secrets.GKE_ZONE }}
credentials: ${{ secrets.GKE_SA_KEY }}

- name: Forward port
run: |
kubectl port-forward \
--namespace tracetest-pr-${{ github.event.pull_request.number }} \
svc/tracetest-pr-${{ github.event.pull_request.number }} 3000:11633 &
- name: Cache dependencies
id: cache-nodemodules
uses: actions/cache@v3
Expand All @@ -382,33 +376,40 @@ jobs:
if: steps.cache-nodemodules.outputs.cache-hit != 'true'
run: cd web/; npm ci


- name: Forward port
run: |
kubectl port-forward \
--namespace tracetest-pr-${{ github.event.pull_request.number }} \
svc/tracetest-pr-${{ github.event.pull_request.number }} 3000:11633 &
- name: Run integration tests
run: |
./scripts/wait-for-port.sh 3000
./scripts/wait-for-port.sh 3000 &
cd web/
npm run cy:ci
env:
CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }}

cleanup:
name: Cleanup test infra
runs-on: ubuntu-latest
needs: [trace-testing, e2e]
if: always()
steps:
- uses: google-github-actions/setup-gcloud@94337306dda8180d967a56932ceb4ddcf01edae7
with:
service_account_key: ${{ secrets.GKE_SA_KEY }}
project_id: ${{ secrets.GKE_PROJECT }}

- uses: google-github-actions/get-gke-credentials@fb08709ba27618c31c09e014e1d8364b02e5042e
with:
cluster_name: ${{ secrets.GKE_CLUSTER }}
location: ${{ secrets.GKE_ZONE }}
credentials: ${{ secrets.GKE_SA_KEY }}

- name: Uninstall tracetest
run: |
helm delete tracetest-pr-${{ github.event.pull_request.number }} \
--namespace tracetest-pr-${{ github.event.pull_request.number }}
kubectl delete ns tracetest-pr-${{ github.event.pull_request.number }}
# cleanup:
# name: Cleanup test infra
# runs-on: ubuntu-latest
# needs: [trace-testing, e2e]
# if: always()
# steps:
# - uses: google-github-actions/setup-gcloud@94337306dda8180d967a56932ceb4ddcf01edae7
# with:
# service_account_key: ${{ secrets.GKE_SA_KEY }}
# project_id: ${{ secrets.GKE_PROJECT }}

# - uses: google-github-actions/get-gke-credentials@fb08709ba27618c31c09e014e1d8364b02e5042e
# with:
# cluster_name: ${{ secrets.GKE_CLUSTER }}
# location: ${{ secrets.GKE_ZONE }}
# credentials: ${{ secrets.GKE_SA_KEY }}

# - name: Uninstall tracetest
# run: |
# helm delete tracetest-pr-${{ github.event.pull_request.number }} \
# --namespace tracetest-pr-${{ github.event.pull_request.number }}
# kubectl delete ns tracetest-pr-${{ github.event.pull_request.number }}
2 changes: 1 addition & 1 deletion api/openapi.yaml
Expand Up @@ -825,7 +825,7 @@ paths:
application/yaml:
schema:
type: string
/test/{testId}/run/{runId}/stop:
/tests/{testId}/run/{runId}/stop:
post:
tags:
- api
Expand Down
1 change: 1 addition & 0 deletions api/tests.yaml
Expand Up @@ -128,6 +128,7 @@ components:
AWAITING_TRACE,
AWAITING_TEST_RESULTS,
FINISHED,
STOPPED,
TRIGGER_FAILED,
TRACE_FAILED,
ASSERTION_FAILED,
Expand Down
2 changes: 1 addition & 1 deletion cli/openapi/api_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion local-config/collector.config.yaml
Expand Up @@ -13,7 +13,7 @@ exporters:
loglevel: debug

otlp/1:
endpoint: host.docker.internal:21321
endpoint: tracetest:21321
tls:
insecure: true

Expand Down
9 changes: 9 additions & 0 deletions local-config/tracetest.provision.yaml
Expand Up @@ -4,3 +4,12 @@ spec:
name: OpenTelemetry Collector
type: otlp
isdefault: true
---
type: Demo
spec:
name: "Pokeshop"
enabled: true
type: pokeshop
pokeshop:
grpcEndpoint: demo-api:8082
httpEndpoint: http://demo-api:8081
9 changes: 9 additions & 0 deletions scripts/check-port.sh
@@ -0,0 +1,9 @@
#!/bin/bash
PORT=$1

CONDITION='nc -z -w 1 localhost '$PORT' > /dev/null 2>&1'
IF_TRUE='echo "port '$PORT' ready"'
IF_FALSE='echo "port '$PORT' not available, retry"'

set -ex
bash -c "until ${CONDITION}; do ${IF_FALSE}; sleep 1; done; ${IF_TRUE}"
15 changes: 15 additions & 0 deletions server/app/facade.go
Expand Up @@ -6,19 +6,33 @@ import (
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
"github.com/kubeshop/tracetest/server/id"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)

type runnerFacade struct {
sm *subscription.Manager
runner executor.PersistentRunner
transactionRunner executor.PersistentTransactionRunner
assertionRunner executor.AssertionRunner
tracePoller executor.PersistentTracePoller
}

func (rf runnerFacade) StopTest(testID id.ID, runID int) {
sr := executor.StopRequest{
TestID: testID,
RunID: runID,
}

rf.sm.PublishUpdate(subscription.Message{
ResourceID: sr.ResourceID(),
Content: sr,
})
}

func (rf runnerFacade) RunTest(ctx context.Context, test model.Test, rm model.RunMetadata, env model.Environment) model.Run {
return rf.runner.Run(ctx, test, rm, env)
}
Expand Down Expand Up @@ -90,6 +104,7 @@ func newRunnerFacades(
)

return &runnerFacade{
sm: subscriptionManager,
runner: runner,
transactionRunner: transactionRunner,
assertionRunner: assertionRunner,
Expand Down
2 changes: 1 addition & 1 deletion server/executor/assertion_executor.go
Expand Up @@ -16,7 +16,7 @@ type AssertionExecutor interface {

type defaultAssertionExecutor struct{}

func (e defaultAssertionExecutor) Assert(ctx context.Context, defs model.OrderedMap[model.SpanQuery, model.NamedAssertions], trace model.Trace, ds []expression.DataStore) (model.OrderedMap[model.SpanQuery, []model.AssertionResult], bool) {
func (e defaultAssertionExecutor) Assert(_ context.Context, defs model.OrderedMap[model.SpanQuery, model.NamedAssertions], trace model.Trace, ds []expression.DataStore) (model.OrderedMap[model.SpanQuery, []model.AssertionResult], bool) {
testResult := model.OrderedMap[model.SpanQuery, []model.AssertionResult]{}
allPassed := true
defs.ForEach(func(spanQuery model.SpanQuery, asserts model.NamedAssertions) error {
Expand Down
1 change: 1 addition & 0 deletions server/executor/eventemitter.go
Expand Up @@ -25,6 +25,7 @@ func NewEventEmitter(repository model.TestRunEventRepository, publisher publishe
return &internalEventEmitter{
repository: repository,
publisher: publisher,
mutex: sync.Mutex{},
}
}

Expand Down
54 changes: 54 additions & 0 deletions server/executor/run_stop.go
@@ -0,0 +1,54 @@
package executor

import (
"context"
"log"

"github.com/kubeshop/tracetest/server/id"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/subscription"
)

type StopRequest struct {
TestID id.ID
RunID int
}

func (sr StopRequest) ResourceID() string {
runID := (model.Run{ID: sr.RunID, TestID: sr.TestID}).ResourceID()
return runID + "/stop"
}

func (r persistentRunner) listenForStopRequests(ctx context.Context, cancelCtx context.CancelFunc, run model.Run) {
sfn := subscription.NewSubscriberFunction(func(m subscription.Message) error {
stopRequest, ok := m.Content.(StopRequest)
if !ok {
return nil
}

ctx, _ := r.tracer.Start(ctx, "User Requested Stop Run")
// refresh data from DB to make sure we have the latest possible data before updating
run, err := r.runs.GetRun(ctx, stopRequest.TestID, stopRequest.RunID)
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to get test run data: %s \n", stopRequest.TestID, stopRequest.RunID, err.Error())
return err
}

run = run.Stopped()
r.handleDBError(run, r.updater.Update(ctx, run))

evt := events.TraceStoppedInfo(stopRequest.TestID, stopRequest.RunID)
err = r.eventEmitter.Emit(ctx, evt)
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: fail to emit TraceStoppedInfo event: %s \n", stopRequest.TestID, stopRequest.RunID, err.Error())
return err
}

cancelCtx()

return nil
})

r.subscriptionManager.Subscribe((StopRequest{run.TestID, run.ID}).ResourceID(), sfn)
}
17 changes: 10 additions & 7 deletions server/executor/runner.go
Expand Up @@ -76,12 +76,11 @@ type persistentRunner struct {
}

type execReq struct {
ctx context.Context
test model.Test
run model.Run
subscriptionManager *subscription.Manager
Headers propagation.MapCarrier
executor expression.Executor
ctx context.Context
test model.Test
run model.Run
Headers propagation.MapCarrier
executor expression.Executor
}

func (r persistentRunner) handleDBError(run model.Run, err error) {
Expand Down Expand Up @@ -133,14 +132,18 @@ func getNewCtx(ctx context.Context) context.Context {
}

func (r persistentRunner) Run(ctx context.Context, test model.Test, metadata model.RunMetadata, environment model.Environment) model.Run {
ctx = getNewCtx(ctx)
ctx, cancelCtx := context.WithCancel(
getNewCtx(ctx),
)

run := model.NewRun()
run.Metadata = metadata
run.Environment = environment
run, err := r.runs.CreateRun(ctx, test, run)
r.handleDBError(run, err)

r.listenForStopRequests(ctx, cancelCtx, run)

ds := []expression.DataStore{expression.EnvironmentDataStore{
Values: environment.Values,
}}
Expand Down
15 changes: 14 additions & 1 deletion server/executor/trace_poller.go
Expand Up @@ -133,6 +133,13 @@ func (tp tracePoller) enqueueJob(job PollingRequest) {
}

func (tp tracePoller) processJob(job PollingRequest) {
select {
default:
case <-job.ctx.Done():
log.Printf("[TracePoller] Context cancelled.")
return
}

if job.IsFirstRequest() {
err := tp.eventEmitter.Emit(job.ctx, events.TracePollingStart(job.test.ID, job.run.ID))
if err != nil {
Expand Down Expand Up @@ -191,7 +198,13 @@ func (tp tracePoller) runAssertions(job PollingRequest) {
func (tp tracePoller) handleTraceDBError(job PollingRequest, err error) (bool, string) {
run := job.run

pp := *tp.ppGetter.GetDefault(job.ctx).Periodic
profile := tp.ppGetter.GetDefault(job.ctx)
if profile.Periodic == nil {
log.Println("[TracePoller] cannot get polling profile.")
return true, "Cannot get polling profile"
}

pp := *profile.Periodic

// Edge case: the trace still not avaiable on Data Store during polling
if errors.Is(err, connection.ErrTraceNotFound) && time.Since(run.ServiceTriggeredAt) < pp.TimeoutDuration() {
Expand Down

0 comments on commit ba47e42

Please sign in to comment.