-
Notifications
You must be signed in to change notification settings - Fork 53
/
scheduler.go
288 lines (228 loc) · 9.67 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package scheduler
import (
"fmt"
"time"
apapi "github.com/libopenstorage/autopilot-api/pkg/apis/autopilot/v1alpha1"
"github.com/portworx/torpedo/drivers/api"
"github.com/portworx/torpedo/drivers/node"
"github.com/portworx/torpedo/drivers/scheduler/spec"
"github.com/portworx/torpedo/drivers/volume"
"github.com/portworx/torpedo/pkg/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Options specifies keys for a key-value pair that can be passed to scheduler methods
const (
// OptionsWaitForDestroy Wait for the destroy to finish before returning
OptionsWaitForDestroy = "WAIT_FOR_DESTROY"
// OptionsWaitForResourceLeak Wait for all the resources to be cleaned up after destroying
OptionsWaitForResourceLeakCleanup = "WAIT_FOR_RESOURCE_LEAK_CLEANUP"
SecretVault = "vault"
SecretK8S = "k8s"
)
// Context holds the execution context of a test task.
type Context struct {
// UID unique object identifier
UID string
// App defines a k8s application specification
App *spec.AppSpec
// ScheduleOptions are options that callers to pass to influence the apps that get schduled
ScheduleOptions ScheduleOptions
// SkipVolumeValidation for cases when use volume driver other than portworx
SkipVolumeValidation bool
// SkipClusterScopedObject for cases of multi-cluster backup when Storage class does not restored
SkipClusterScopedObject bool
// RefreshStorageEndpoint force refresh the storage driver endpoint
RefreshStorageEndpoint bool
// ReadinessTimeout time within which context is expected to be up
ReadinessTimeout time.Duration
}
// DeepCopy create a copy of Context
func (in *Context) DeepCopy() *Context {
if in == nil {
return nil
}
out := new(Context)
out.UID = in.UID
out.App = in.App.DeepCopy()
return out
}
// GetID returns the unique ID for the context. This encompasses the instance ID
// provided by users during schedule of the context and the ID of the app specs
func (in *Context) GetID() string {
return in.App.GetID(in.UID)
}
// AppConfig custom settings
type AppConfig struct {
Replicas int `yaml:"replicas"`
VolumeSize string `yaml:"volume_size"`
WorkloadSize string `yaml:"workload_size"`
}
// InitOptions initialization options
type InitOptions struct {
// SpecDir app spec directory
SpecDir string
// VolDriverName volume driver name
VolDriverName string
// NodeDriverName node driver name
NodeDriverName string
// ConfigMap identifies what config map should be used to
SecretConfigMapName string
// CustomAppConfig custom settings for apps
CustomAppConfig map[string]AppConfig
// StorageProvisioner name
StorageProvisioner string
// SecretType secret used for encryption keys
SecretType string
// VaultAddress vault api address
VaultAddress string
// VaultToken vault authentication token
VaultToken string
}
// ScheduleOptions are options that callers to pass to influence the apps that get schduled
type ScheduleOptions struct {
// AppKeys identified a list of applications keys that users wants to schedule (Optional)
AppKeys []string
// Nodes restricts the applications to get scheduled only on these nodes (Optional)
Nodes []node.Node
// StorageProvisioner identifies what storage provider should be used
StorageProvisioner string
// ConfigMap identifies what config map should be used to
ConfigMap string
// AutopilotRule identifies options for autopilot (Optional)
AutopilotRule apapi.AutopilotRule
// Scheduler identifies what scheduler will be used
Scheduler string
// Labels is a map of {key,value} pairs for labeling spec objects
Labels map[string]string
}
// Driver must be implemented to provide test support to various schedulers.
type Driver interface {
spec.Parser
// Init initializes the scheduler driver
Init(schedOpts InitOptions) error
// String returns the string name of this driver.
String() string
// IsNodeReady checks if node is in ready state. Returns nil if ready.
IsNodeReady(n node.Node) error
// GetNodesForApp returns nodes on which given app context is running
GetNodesForApp(*Context) ([]node.Node, error)
// Schedule starts applications and returns a context for each one of them
Schedule(instanceID string, opts ScheduleOptions) ([]*Context, error)
// WaitForRunning waits for application to start running.
WaitForRunning(cc *Context, timeout, retryInterval time.Duration) error
// AddTasks adds tasks to an existing context
AddTasks(*Context, ScheduleOptions) error
// UpdateTasksID updates task IDs in the given context
UpdateTasksID(*Context, string) error
// Destroy removes a application. It does not delete the volumes of the task.
Destroy(*Context, map[string]bool) error
// WaitForDestroy waits for application to destroy.
WaitForDestroy(*Context, time.Duration) error
// DeleteTasks deletes all tasks of the application (not the application). DeleteTasksOptions is optional.
DeleteTasks(*Context, *DeleteTasksOptions) error
// GetVolumeDriverVolumeName returns name of volume which is refered by volume driver
GetVolumeDriverVolumeName(name string, namespace string) (string, error)
// GetVolumeParameters Returns a maps, each item being a volume and it's options
GetVolumeParameters(*Context) (map[string]map[string]string, error)
// ValidateVolumes validates storage volumes in the provided context
ValidateVolumes(cc *Context, timeout, retryInterval time.Duration, options *VolumeOptions) error
// DeleteVolumes will delete all storage volumes for the given context
DeleteVolumes(*Context, *VolumeOptions) ([]*volume.Volume, error)
// GetVolumes returns all storage volumes for the given context
GetVolumes(*Context) ([]*volume.Volume, error)
// ResizeVolume resizes all the volumes of a given context
ResizeVolume(*Context, string) ([]*volume.Volume, error)
// GetSnapshots returns all storage snapshots for the given context
GetSnapshots(*Context) ([]*volume.Snapshot, error)
// Describe generates a bundle that can be used by support - logs, cores, states, etc
Describe(*Context) (string, error)
// ScaleApplication scales the current applications using the new scales from the GetScaleFactorMap.
ScaleApplication(*Context, map[string]int32) error
// GetScaleFactorMap gets a map of current applications to their new scales, based on "factor"
GetScaleFactorMap(*Context) (map[string]int32, error)
// StopSchedOnNode stops scheduler service on the given node
StopSchedOnNode(n node.Node) error
// StartSchedOnNode starts scheduler service on the given node
StartSchedOnNode(n node.Node) error
// RefreshNodeRegistry refreshes node registry
RefreshNodeRegistry() error
// RescanSpecs specified in specDir
RescanSpecs(specDir, storageDriver string) error
// EnableSchedulingOnNode enable apps to be scheduled to a given node
EnableSchedulingOnNode(n node.Node) error
// DisableSchedulingOnNode disable apps to be scheduled to a given node
DisableSchedulingOnNode(n node.Node) error
// PrepareNodeToDecommission prepares a given node for decommissioning
PrepareNodeToDecommission(n node.Node, provisioner string) error
// IsScalable check if a given spec is scalable or not
IsScalable(spec interface{}) bool
// ValidateVolumeSnapshotRestore return nil if snapshot is restored successuflly to
// parent volumes
ValidateVolumeSnapshotRestore(*Context, time.Time) error
// GetTokenFromConfigMap gets token for a volume
GetTokenFromConfigMap(string) (string, error)
// AddLabelOnNode adds key value labels on the node
AddLabelOnNode(node.Node, string, string) error
// IsAutopilotEnabledForVolume checks if autopilot enabled for a given volume
IsAutopilotEnabledForVolume(*volume.Volume) bool
// SaveSchedulerLogsToFile gathers all scheduler logs into a file
SaveSchedulerLogsToFile(node.Node, string) error
// CreateAutopilotRule creates the AutopilotRule object
CreateAutopilotRule(apRule apapi.AutopilotRule) (*apapi.AutopilotRule, error)
// UpdateAutopilotRule updates the AutopilotRule
UpdateAutopilotRule(apapi.AutopilotRule) (*apapi.AutopilotRule, error)
// ListAutopilotRules lists AutopilotRules
ListAutopilotRules() (*apapi.AutopilotRuleList, error)
// GetEvents should return all the events from the scheduler since the time torpedo started
GetEvents() map[string][]Event
// ValidateAutopilotEvents validates events for PVCs injected by autopilot
ValidateAutopilotEvents(ctx *Context) error
// GetWorkloadSizeFromAppSpec gets workload size from an application spec
GetWorkloadSizeFromAppSpec(ctx *Context) (uint64, error)
// SetConfig sets connnection config (e.g. kubeconfig in case of k8s) for scheduler driver
SetConfig(configPath string) error
}
var (
schedulers = make(map[string]Driver)
)
// DeleteTasksOptions are options supplied to the DeleteTasks API
type DeleteTasksOptions struct {
api.TriggerOptions
}
// UpgradeAutopilotOptions are options supplied to the UpgradeAutopilot API
type UpgradeAutopilotOptions struct {
api.TriggerOptions
}
// VolumeOptions are options supplied to the scheduler Volume APIs
type VolumeOptions struct {
// SkipClusterScopedObjects skips volume operations on cluster scoped objects like storage class
SkipClusterScopedObjects bool
}
// Event collects kubernetes events data for further validation
type Event struct {
Message string
EventTime v1.MicroTime
Count int32
LastSeen v1.Time
Kind string
Type string
}
// Register registers the given scheduler driver
func Register(name string, d Driver) error {
if _, ok := schedulers[name]; !ok {
schedulers[name] = d
} else {
return fmt.Errorf("scheduler driver: %s is already registered", name)
}
return nil
}
// Get returns a registered scheduler test provider.
func Get(name string) (Driver, error) {
if d, ok := schedulers[name]; ok {
return d, nil
}
return nil, &errors.ErrNotFound{
ID: name,
Type: "Scheduler",
}
}