/
worker.go
88 lines (70 loc) · 2.38 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package queue
import (
"encoding/json"
"fmt"
"github.com/gomodule/redigo/redis"
"github.com/gocraft/work"
log "github.com/sirupsen/logrus"
"github.com/vmware-tanzu/cloud-native-security-inspector/cnsi-scanner-trivy/pkg/etc"
"github.com/vmware-tanzu/cloud-native-security-inspector/cnsi-scanner-trivy/pkg/harbor"
"github.com/vmware-tanzu/cloud-native-security-inspector/cnsi-scanner-trivy/pkg/scan"
)
const (
scanJobDefaultPriority = 1 // The highest
scanJobMaxFailures = 1
)
type Worker interface {
Start()
Stop()
}
type worker struct {
workerPool *work.WorkerPool
}
func NewWorker(config etc.JobQueue, redisPool *redis.Pool, controller scan.Controller) Worker {
workerPool := work.NewWorkerPool(workerContext{}, uint(config.WorkerConcurrency), config.Namespace, redisPool)
// Note: For each scan job a new instance of the workerContext struct is created.
// Therefore, the only way to do a proper dependency injection is to use such closure
// and the following middleware as the first step in the processing chain.
workerPool.Middleware(func(ctx *workerContext, job *work.Job, next work.NextMiddlewareFunc) error {
ctx.controller = controller
return next()
})
workerPool.JobWithOptions(scanArtifactJobName,
work.JobOptions{
Priority: scanJobDefaultPriority,
MaxFails: scanJobMaxFailures,
}, (*workerContext).ScanArtifact)
return &worker{
workerPool: workerPool,
}
}
func (w *worker) Start() {
w.workerPool.Start()
}
func (w *worker) Stop() {
log.Trace("Job queue shutdown started")
w.workerPool.Stop()
log.Trace("Job queue shutdown completed")
}
// workerContext is a context for running scan jobs.
type workerContext struct {
controller scan.Controller
}
// ScanArtifact is a handler function for the specified scan Job with the given workerContext.
func (s *workerContext) ScanArtifact(job *work.Job) (err error) {
log.WithField("scan_job_id", job.ID).Debug("Executing enqueued scan job")
request, err := s.unmarshalScanRequest(job)
if err != nil {
return
}
err = s.controller.Scan(job.ID, request)
return
}
func (s *workerContext) unmarshalScanRequest(job *work.Job) (request harbor.ScanRequest, err error) {
// TODO Fail fast and assert that the scan_request arg was set by the enqueuer.
err = json.Unmarshal([]byte(job.ArgString(scanRequestJobArg)), &request)
if err != nil {
return request, fmt.Errorf("unmarshalling scan request: %v", err)
}
return
}