-
Notifications
You must be signed in to change notification settings - Fork 67
/
test_suite_runner.go
170 lines (142 loc) 路 4.97 KB
/
test_suite_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package executor
import (
"context"
"fmt"
"log"
"github.com/kubeshop/tracetest/server/executor/testrunner"
"github.com/kubeshop/tracetest/server/pkg/maps"
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/test"
"github.com/kubeshop/tracetest/server/testsuite"
"github.com/kubeshop/tracetest/server/variableset"
)
type testSuiteRunRepository interface {
transactionUpdater
CreateRun(context.Context, testsuite.TestSuiteRun) (testsuite.TestSuiteRun, error)
}
type testRunner interface {
Run(context.Context, test.Test, test.RunMetadata, variableset.VariableSet, *[]testrunner.RequiredGate) test.Run
}
func NewTestSuiteRunner(
testRunner testRunner,
transactionRuns testSuiteRunRepository,
subscriptionManager subscription.Manager,
) *persistentTransactionRunner {
updater := (CompositeTransactionUpdater{}).
Add(NewDBTranasctionUpdater(transactionRuns)).
Add(NewSubscriptionTransactionUpdater(subscriptionManager))
return &persistentTransactionRunner{
testRunner: testRunner,
transactionRuns: transactionRuns,
updater: updater,
subscriptionManager: subscriptionManager,
}
}
type persistentTransactionRunner struct {
testRunner testRunner
transactionRuns testSuiteRunRepository
updater TestSuiteRunUpdater
subscriptionManager subscription.Manager
}
func (r *persistentTransactionRunner) SetOutputQueue(_ pipeline.Enqueuer[Job]) {
// this is a no-op, as transaction runner does not need to enqueue anything
}
func (r persistentTransactionRunner) ProcessItem(ctx context.Context, job Job) {
tran := job.TestSuite
run := job.TestSuiteRun
run.State = testsuite.TestSuiteStateExecuting
err := r.updater.Update(ctx, run)
if err != nil {
log.Printf("[TransactionRunner] could not update transaction run: %s", err.Error())
return
}
log.Printf("[TransactionRunner] running transaction %s with %d steps", run.TestSuiteID, len(tran.Steps))
for step, test := range tran.Steps {
run, err = r.runTransactionStep(ctx, run, step, test)
if err != nil {
log.Printf("[TransactionRunner] could not execute step %d of transaction %s: %s", step, run.TestSuiteID, err.Error())
return
}
if run.State == testsuite.TestSuiteStateFailed {
break
}
run.VariableSet = mergeOutputsIntoEnv(run.VariableSet, run.Steps[step].Outputs)
err = r.transactionRuns.UpdateRun(ctx, run)
if err != nil {
log.Printf("[TransactionRunner] could not update transaction step: %s", err.Error())
return
}
}
if run.State != testsuite.TestSuiteStateFailed {
run.State = testsuite.TestSuiteStateFinished
}
err = r.updater.Update(ctx, run)
if err != nil {
log.Printf("[TransactionRunner] could not update transaction run: %s", err.Error())
return
}
}
func (r persistentTransactionRunner) runTransactionStep(ctx context.Context, tr testsuite.TestSuiteRun, step int, testObj test.Test) (testsuite.TestSuiteRun, error) {
testRun := r.testRunner.Run(ctx, testObj, tr.RunMetadata(step), tr.VariableSet, tr.RequiredGates)
tr, err := r.updateStepRun(ctx, tr, step, testRun)
if err != nil {
return testsuite.TestSuiteRun{}, fmt.Errorf("could not update transaction run: %w", err)
}
done := make(chan bool)
// listen for updates and propagate them as if they were transaction updates
r.subscriptionManager.Subscribe(testRun.ResourceID(), subscription.NewSubscriberFunction(
func(m subscription.Message) error {
testRun := test.Run{}
err := m.DecodeContent(&testRun)
if err != nil {
return fmt.Errorf("cannot decode Run message: %w", err)
}
if testRun.LastError != nil {
tr.State = testsuite.TestSuiteStateFailed
tr.LastError = testRun.LastError
}
tr, err = r.updateStepRun(ctx, tr, step, testRun)
if err != nil {
done <- true
return err
}
r.subscriptionManager.PublishUpdate(subscription.Message{
ResourceID: tr.ResourceID(),
Type: "result_update",
Content: tr,
})
if testRun.State.IsFinal() {
done <- true
}
return nil
}),
)
// TODO: this will block indefinitely. we need to set a timeout or something
<-done
return tr, err
}
func (r persistentTransactionRunner) updateStepRun(ctx context.Context, tr testsuite.TestSuiteRun, step int, run test.Run) (testsuite.TestSuiteRun, error) {
if len(tr.Steps) <= step {
tr.Steps = append(tr.Steps, test.Run{})
}
tr.Steps[step] = run
err := r.updater.Update(ctx, tr)
if err != nil {
return testsuite.TestSuiteRun{}, fmt.Errorf("could not update transaction run: %w", err)
}
return tr, nil
}
func mergeOutputsIntoEnv(variableSet variableset.VariableSet, outputs maps.Ordered[string, test.RunOutput]) variableset.VariableSet {
newEnv := make([]variableset.VariableSetValue, 0, outputs.Len())
outputs.ForEach(func(key string, val test.RunOutput) error {
newEnv = append(newEnv, variableset.VariableSetValue{
Key: key,
Value: val.Value,
})
return nil
})
return variableSet.Merge(variableset.VariableSet{
Values: newEnv,
})
}