-
Notifications
You must be signed in to change notification settings - Fork 39.6k
/
operation_executor.go
115 lines (95 loc) · 4.42 KB
/
operation_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package operationexecutor implements interfaces that enable execution of
// register and unregister operations with a
// goroutinemap so that more than one operation is never triggered
// on the same plugin.
package operationexecutor
import (
"time"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap"
)
// OperationExecutor defines a set of operations for registering and unregistering
// a plugin that are executed with a NewGoRoutineMap which
// prevents more than one operation from being triggered on the same socket path.
//
// These operations should be idempotent (for example, RegisterPlugin should
// still succeed if the plugin is already registered, etc.). However,
// they depend on the plugin handlers (for each plugin type) to implement this
// behavior.
//
// Once an operation completes successfully, the actualStateOfWorld is updated
// to indicate the plugin is registered/unregistered.
//
// Once the operation is started, since it is executed asynchronously,
// errors are simply logged and the goroutine is terminated without updating
// actualStateOfWorld.
type OperationExecutor interface {
// RegisterPlugin registers the given plugin using a handler in the plugin handler map.
// It then updates the actual state of the world to reflect that.
RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
// It then updates the actual state of the world to reflect that.
UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error
}
// NewOperationExecutor returns a new instance of OperationExecutor.
func NewOperationExecutor(
operationGenerator OperationGenerator) OperationExecutor {
return &operationExecutor{
pendingOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
operationGenerator: operationGenerator,
}
}
// ActualStateOfWorldUpdater defines a set of operations updating the actual
// state of the world cache after successful registration/deregistration.
type ActualStateOfWorldUpdater interface {
// AddPlugin add the given plugin in the cache if no existing plugin
// in the cache has the same socket path.
// An error will be returned if socketPath is empty.
AddPlugin(pluginInfo cache.PluginInfo) error
// RemovePlugin deletes the plugin with the given socket path from the actual
// state of world.
// If a plugin does not exist with the given socket path, this is a no-op.
RemovePlugin(socketPath string)
}
type operationExecutor struct {
// pendingOperations keeps track of pending attach and detach operations so
// multiple operations are not started on the same volume
pendingOperations goroutinemap.GoRoutineMap
// operationGenerator is an interface that provides implementations for
// generating volume function
operationGenerator OperationGenerator
}
var _ OperationExecutor = &operationExecutor{}
func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
return oe.pendingOperations.IsOperationPending(socketPath)
}
func (oe *operationExecutor) RegisterPlugin(
socketPath string,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
return oe.pendingOperations.Run(
socketPath, generatedOperation)
}
func (oe *operationExecutor) UnregisterPlugin(
pluginInfo cache.PluginInfo,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld)
return oe.pendingOperations.Run(
pluginInfo.SocketPath, generatedOperation)
}