/
manager.go
122 lines (102 loc) · 2.96 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package tester
import (
"context"
"github.com/orensho/thin-slack-blackbox-tester/service/config"
"github.com/orensho/thin-slack-blackbox-tester/service/service"
"github.com/orensho/thin-slack-blackbox-tester/service/steps"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
)
type ManagerInterface interface {
Init(conf *config.TesterConfig) error
Start()
Stop()
}
type managerImpl struct {
testerSettings *config.TesterSettings
stepsFactory steps.StepFactoryInterface
cron *cron.Cron
testContext context.Context
testCancel context.CancelFunc
metricsService service.MetricsServiceInterface
}
func NewManager(
rootCtx context.Context,
stepsFactory steps.StepFactoryInterface,
testerSettings *config.TesterSettings,
metricsService service.MetricsServiceInterface,
) ManagerInterface {
testContext, testCancel := context.WithCancel(rootCtx)
cronLogger := cron.PrintfLogger(log.StandardLogger())
return &managerImpl{
testerSettings: testerSettings,
metricsService: metricsService,
stepsFactory: stepsFactory,
cron: cron.New(cron.WithChain(
cron.Recover(cronLogger),
cron.SkipIfStillRunning(cronLogger),
)),
testContext: testContext,
testCancel: testCancel,
}
}
func (m *managerImpl) Start() {
m.cron.Start()
}
func (m *managerImpl) Stop() {
// cancel all running flows
m.testCancel()
// stop the cron
doneCtx := m.cron.Stop()
// wait for cron jobs to stop
<-doneCtx.Done()
}
func (m *managerImpl) Init(conf *config.TesterConfig) error {
// create all test flows
for flowName, flowDefinition := range conf.Flows {
// create flow steps
flowSteps, err := m.createFlowSteps(conf.Definitions, flowDefinition.Steps)
if err != nil {
return errors.Wrapf(err, "Failed creating flow '%s' steps", flowName)
}
// create the flow
flow, err := newFlow(
m.testContext,
flowName,
flowDefinition.Config,
flowSteps,
m.metricsService,
)
if err != nil {
return errors.Wrapf(err, "Failed creating flow '%s'", flowName)
}
// add flow to the scheduler
_, err = m.cron.AddJob(flowDefinition.Config.Frequency, flow)
if err != nil {
return errors.Wrapf(err, "Failed scheduling flow '%s'", flowName)
}
}
return nil
}
func (m *managerImpl) createFlowSteps(stepsDefinition map[string]config.Definition, flowStepNames []string) ([]steps.StepInterface, error) {
var flowSteps []steps.StepInterface
for _, stepName := range flowStepNames {
stepDefinition, ok := stepsDefinition[stepName]
if !ok {
return nil, errors.Errorf("undefined step '%s'", stepName)
}
// create the step
step, err := m.stepsFactory.NewStep(stepDefinition.Type)
if err != nil {
return nil, errors.Wrapf(err, "Failed creating step '%s'", stepName)
}
// init the step configuration
err = step.Init(stepName, stepDefinition.Config)
if err != nil {
return nil, errors.Wrapf(err, "Failed initializing step '%s'", stepName)
}
flowSteps = append(flowSteps, step)
}
return flowSteps, nil
}