-
Notifications
You must be signed in to change notification settings - Fork 128
/
job.go
113 lines (94 loc) · 2.93 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package client
import (
"context"
"fmt"
"time"
"github.com/kubeshop/testkube/internal/pkg/api/repository/result"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/jobs"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/runner/output"
"go.uber.org/zap"
)
func NewJobExecutor(repo result.Repository) (client JobExecutor, err error) {
jobClient, err := jobs.NewJobClient()
if err != nil {
return client, fmt.Errorf("can't get k8s jobs client: %w", err)
}
return JobExecutor{
Client: jobClient,
Repository: repo,
Log: log.DefaultLogger,
}, nil
}
type JobExecutor struct {
Client *jobs.JobClient
Repository result.Repository
Log *zap.SugaredLogger
}
// Watch will get valid execution after async Execute, execution will be returned when success or error occurs
// Worker should set valid state for success or error after script completion
// TODO add timeout
func (c JobExecutor) Watch(id string) (events chan ResultEvent) {
events = make(chan ResultEvent)
go func() {
ticker := time.NewTicker(WatchInterval)
for range ticker.C {
result, err := c.Get(id)
events <- ResultEvent{
Result: result,
Error: err,
}
if err != nil || result.IsCompleted() {
close(events)
return
}
}
}()
return events
}
func (c JobExecutor) Get(id string) (execution testkube.ExecutionResult, err error) {
exec, err := c.Repository.Get(context.Background(), id)
if err != nil {
return testkube.ExecutionResult{}, err
}
return *exec.ExecutionResult, nil
}
// Logs returns job logs using kubernetes api
func (c JobExecutor) Logs(id string) (out chan output.Output, err error) {
out = make(chan output.Output)
logs := make(chan []byte)
go func() {
defer func() {
c.Log.Debug("closing JobExecutor.Logs out log")
close(out)
}()
if err := c.Client.TailJobLogs(id, logs); err != nil {
out <- output.NewOutputError(err)
return
}
for l := range logs {
entry, err := output.GetLogEntry(l)
if err != nil {
out <- output.NewOutputError(err)
return
}
out <- entry
}
}()
return
}
// Execute starts new external script execution, reads data and returns ID
// Execution is started asynchronously client can check later for results
func (c JobExecutor) Execute(execution testkube.Execution, options ExecuteOptions) (result testkube.ExecutionResult, err error) {
return c.Client.LaunchK8sJob(options.ExecutorSpec.Image, c.Repository, execution, options.HasSecrets)
}
// Execute starts new external script execution, reads data and returns ID
// Execution is started synchronously client will be blocked
func (c JobExecutor) ExecuteSync(execution testkube.Execution, options ExecuteOptions) (result testkube.ExecutionResult, err error) {
return c.Client.LaunchK8sJobSync(options.ExecutorSpec.Image, c.Repository, execution, options.HasSecrets)
}
func (c JobExecutor) Abort(id string) error {
c.Client.AbortK8sJob(id)
return nil
}