-
-
Notifications
You must be signed in to change notification settings - Fork 382
/
record.go
executable file
·314 lines (287 loc) · 9.24 KB
/
record.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
// Package record provides functionality for recording and managing test cases and mocks.
package record
import (
"context"
"errors"
"fmt"
"time"
"go.keploy.io/server/v2/config"
"go.keploy.io/server/v2/pkg"
"go.keploy.io/server/v2/pkg/models"
"go.keploy.io/server/v2/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type Recorder struct {
logger *zap.Logger
testDB TestDB
mockDB MockDB
telemetry Telemetry
instrumentation Instrumentation
config config.Config
}
func New(logger *zap.Logger, testDB TestDB, mockDB MockDB, telemetry Telemetry, instrumentation Instrumentation, config config.Config) Service {
return &Recorder{
logger: logger,
testDB: testDB,
mockDB: mockDB,
telemetry: telemetry,
instrumentation: instrumentation,
config: config,
}
}
func (r *Recorder) Start(ctx context.Context) error {
// creating error group to manage proper shutdown of all the go routines and to propagate the error to the caller
errGrp, _ := errgroup.WithContext(ctx)
ctx = context.WithValue(ctx, models.ErrGroupKey, errGrp)
runAppErrGrp, _ := errgroup.WithContext(ctx)
runAppCtx := context.WithoutCancel(ctx)
runAppCtx, runAppCtxCancel := context.WithCancel(runAppCtx)
hookErrGrp, _ := errgroup.WithContext(ctx)
hookCtx := context.WithoutCancel(ctx)
hookCtx, hookCtxCancel := context.WithCancel(hookCtx)
hookCtx = context.WithValue(hookCtx, models.ErrGroupKey, hookErrGrp)
var stopReason string
// defining all the channels and variables required for the record
var runAppError models.AppError
var appErrChan = make(chan models.AppError, 1)
var incomingChan <-chan *models.TestCase
var outgoingChan <-chan *models.Mock
var insertTestErrChan = make(chan error, 10)
var insertMockErrChan = make(chan error, 10)
var appID uint64
var newTestSetID string
var testCount = 0
var mockCountMap = make(map[string]int)
// defering the stop function to stop keploy in case of any error in record or in case of context cancellation
defer func() {
select {
case <-ctx.Done():
r.telemetry.RecordedTestSuite(newTestSetID, testCount, mockCountMap)
default:
err := utils.Stop(r.logger, stopReason)
if err != nil {
utils.LogError(r.logger, err, "failed to stop recording")
}
}
runAppCtxCancel()
err := runAppErrGrp.Wait()
if err != nil {
utils.LogError(r.logger, err, "failed to stop application")
}
hookCtxCancel()
err = hookErrGrp.Wait()
if err != nil {
utils.LogError(r.logger, err, "failed to stop hooks")
}
err = errGrp.Wait()
if err != nil {
utils.LogError(r.logger, err, "failed to stop recording")
}
}()
defer close(appErrChan)
defer close(insertTestErrChan)
defer close(insertMockErrChan)
testSetIDs, err := r.testDB.GetAllTestSetIDs(ctx)
if err != nil {
stopReason = "failed to get testSetIds"
utils.LogError(r.logger, err, stopReason)
return fmt.Errorf(stopReason)
}
newTestSetID = pkg.NewID(testSetIDs, models.TestSetPattern)
// setting up the environment for recording
appID, err = r.instrumentation.Setup(ctx, r.config.Command, models.SetupOptions{Container: r.config.ContainerName, DockerNetwork: r.config.NetworkName, DockerDelay: r.config.BuildDelay})
if err != nil {
stopReason = "failed setting up the environment"
utils.LogError(r.logger, err, stopReason)
return fmt.Errorf(stopReason)
}
// checking for context cancellation as we don't want to start the hooks and proxy if the context is cancelled
select {
case <-ctx.Done():
return nil
default:
// Starting the hooks and proxy
err = r.instrumentation.Hook(hookCtx, appID, models.HookOptions{Mode: models.MODE_RECORD, EnableTesting: r.config.EnableTesting})
if err != nil {
stopReason = "failed to start the hooks and proxy"
utils.LogError(r.logger, err, stopReason)
if err == context.Canceled {
return err
}
return fmt.Errorf(stopReason)
}
}
// fetching test cases and mocks from the application and inserting them into the database
incomingChan, err = r.instrumentation.GetIncoming(ctx, appID, models.IncomingOptions{})
if err != nil {
stopReason = "failed to get incoming frames"
utils.LogError(r.logger, err, stopReason)
if err == context.Canceled {
return err
}
return fmt.Errorf(stopReason)
}
errGrp.Go(func() error {
for testCase := range incomingChan {
err := r.testDB.InsertTestCase(ctx, testCase, newTestSetID)
if err != nil {
if err == context.Canceled {
continue
}
insertTestErrChan <- err
} else {
testCount++
r.telemetry.RecordedTestAndMocks()
}
}
return nil
})
outgoingChan, err = r.instrumentation.GetOutgoing(ctx, appID, models.OutgoingOptions{})
if err != nil {
stopReason = "failed to get outgoing frames"
utils.LogError(r.logger, err, stopReason)
if err == context.Canceled {
return err
}
return fmt.Errorf(stopReason)
}
errGrp.Go(func() error {
for mock := range outgoingChan {
err := r.mockDB.InsertMock(ctx, mock, newTestSetID)
if err != nil {
if err == context.Canceled {
continue
}
insertMockErrChan <- err
} else {
mockCountMap[mock.GetKind()]++
r.telemetry.RecordedTestCaseMock(mock.GetKind())
}
}
return nil
})
// running the user application
runAppErrGrp.Go(func() error {
runAppError = r.instrumentation.Run(runAppCtx, appID, models.RunOptions{})
if runAppError.AppErrorType == models.ErrCtxCanceled {
return nil
}
appErrChan <- runAppError
return nil
})
// setting a timer for recording
if r.config.Record.RecordTimer != 0 {
errGrp.Go(func() error {
r.logger.Info("Setting a timer of " + r.config.Record.RecordTimer.String() + " for recording")
timer := time.After(r.config.Record.RecordTimer)
select {
case <-timer:
r.logger.Warn("Time up! Stopping keploy")
err := utils.Stop(r.logger, "Time up! Stopping keploy")
if err != nil {
utils.LogError(r.logger, err, "failed to stop recording")
return errors.New("failed to stop recording")
}
case <-ctx.Done():
return nil
}
return nil
})
}
// Waiting for the error to occur in any of the go routines
select {
case appErr := <-appErrChan:
switch appErr.AppErrorType {
case models.ErrCommandError:
stopReason = "error in running the user application, hence stopping keploy"
case models.ErrUnExpected:
stopReason = "user application terminated unexpectedly hence stopping keploy, please check application logs if this behaviour is not expected"
case models.ErrInternal:
stopReason = "internal error occured while hooking into the application, hence stopping keploy"
case models.ErrAppStopped:
stopReason = "user application terminated unexpectedly hence stopping keploy, please check application logs if this behaviour is not expected"
r.logger.Warn(stopReason, zap.Error(appErr))
return nil
case models.ErrCtxCanceled:
return nil
case models.ErrTestBinStopped:
stopReason = "keploy test mode binary stopped, hence stopping keploy"
return nil
default:
stopReason = "unknown error recieved from application, hence stopping keploy"
}
case err = <-insertTestErrChan:
stopReason = "error while inserting test case into db, hence stopping keploy"
case err = <-insertMockErrChan:
stopReason = "error while inserting mock into db, hence stopping keploy"
case <-ctx.Done():
return nil
}
utils.LogError(r.logger, err, stopReason)
return fmt.Errorf(stopReason)
}
func (r *Recorder) StartMock(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
ctx = context.WithValue(ctx, models.ErrGroupKey, g)
var stopReason string
defer func() {
select {
case <-ctx.Done():
break
default:
err := utils.Stop(r.logger, stopReason)
if err != nil {
utils.LogError(r.logger, err, "failed to stop recording")
}
}
err := g.Wait()
if err != nil {
utils.LogError(r.logger, err, "failed to stop recording")
}
}()
var outgoingChan <-chan *models.Mock
var insertMockErrChan = make(chan error)
appID, err := r.instrumentation.Setup(ctx, r.config.Command, models.SetupOptions{Container: r.config.ContainerName, DockerNetwork: r.config.NetworkName, DockerDelay: r.config.BuildDelay})
if err != nil {
stopReason = "failed to exeute mock record due to error while setting up the environment"
utils.LogError(r.logger, err, stopReason)
return fmt.Errorf(stopReason)
}
err = r.instrumentation.Hook(ctx, appID, models.HookOptions{Mode: models.MODE_RECORD})
if err != nil {
stopReason = "failed to start the hooks and proxy"
utils.LogError(r.logger, err, stopReason)
return fmt.Errorf(stopReason)
}
outgoingChan, err = r.instrumentation.GetOutgoing(ctx, appID, models.OutgoingOptions{})
if err != nil {
stopReason = "failed to get outgoing frames"
utils.LogError(r.logger, err, stopReason)
if err == context.Canceled {
return err
}
return fmt.Errorf(stopReason)
}
g.Go(func() error {
for mock := range outgoingChan {
mock := mock // capture range variable
g.Go(func() error {
err := r.mockDB.InsertMock(ctx, mock, "")
if err != nil {
insertMockErrChan <- err
}
return nil
})
}
return nil
})
select {
case err = <-insertMockErrChan:
stopReason = "error while inserting mock into db, hence stopping keploy"
case <-ctx.Done():
return nil
}
utils.LogError(r.logger, err, stopReason)
return fmt.Errorf(stopReason)
}