forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 1
/
instance.go
159 lines (132 loc) · 4.31 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package csimanager
import (
"context"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/plugins/csi"
)
const managerFingerprintInterval = 30 * time.Second
// instanceManager is used to manage the fingerprinting and supervision of a
// single CSI Plugin.
type instanceManager struct {
info *dynamicplugins.PluginInfo
logger hclog.Logger
eventer TriggerNodeEvent
updater UpdateNodeCSIInfoFunc
shutdownCtx context.Context
shutdownCtxCancelFn context.CancelFunc
shutdownCh chan struct{}
// mountPoint is the root of the mount dir where plugin specific data may be
// stored and where mount points will be created
mountPoint string
// containerMountPoint is the location _inside_ the plugin container that the
// `mountPoint` is bound in to.
containerMountPoint string
// AllocID is the allocation id of the task group running the dynamic plugin
allocID string
fp *pluginFingerprinter
volumeManager *volumeManager
volumeManagerSetupCh chan struct{}
client csi.CSIPlugin
}
func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater UpdateNodeCSIInfoFunc, p *dynamicplugins.PluginInfo) *instanceManager {
ctx, cancelFn := context.WithCancel(context.Background())
logger = logger.Named(p.Name)
return &instanceManager{
logger: logger,
eventer: eventer,
info: p,
updater: updater,
fp: &pluginFingerprinter{
logger: logger.Named("fingerprinter"),
info: p,
fingerprintNode: p.Type == dynamicplugins.PluginTypeCSINode,
fingerprintController: p.Type == dynamicplugins.PluginTypeCSIController,
hadFirstSuccessfulFingerprintCh: make(chan struct{}),
},
mountPoint: p.Options["MountPoint"],
containerMountPoint: p.Options["ContainerMountPoint"],
allocID: p.AllocID,
volumeManagerSetupCh: make(chan struct{}),
shutdownCtx: ctx,
shutdownCtxCancelFn: cancelFn,
shutdownCh: make(chan struct{}),
}
}
func (i *instanceManager) run() {
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
if err != nil {
i.logger.Error("failed to setup instance manager client", "error", err)
close(i.shutdownCh)
return
}
i.client = c
i.fp.client = c
go i.setupVolumeManager()
go i.runLoop()
}
func (i *instanceManager) setupVolumeManager() {
if i.info.Type != dynamicplugins.PluginTypeCSINode {
i.logger.Debug("not a node plugin, skipping volume manager setup", "type", i.info.Type)
return
}
select {
case <-i.shutdownCtx.Done():
return
case <-i.fp.hadFirstSuccessfulFingerprintCh:
i.volumeManager = newVolumeManager(i.logger, i.eventer, i.client, i.mountPoint, i.containerMountPoint, i.fp.requiresStaging)
i.logger.Debug("volume manager setup complete")
close(i.volumeManagerSetupCh)
return
}
}
// VolumeMounter returns the volume manager that is configured for the given plugin
// instance. If called before the volume manager has been setup, it will block until
// the volume manager is ready or the context is closed.
func (i *instanceManager) VolumeMounter(ctx context.Context) (VolumeMounter, error) {
select {
case <-i.volumeManagerSetupCh:
return i.volumeManager, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (i *instanceManager) requestCtxWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(i.shutdownCtx, timeout)
}
func (i *instanceManager) runLoop() {
timer := time.NewTimer(0)
for {
select {
case <-i.shutdownCtx.Done():
if i.client != nil {
i.client.Close()
i.client = nil
}
// run one last fingerprint so that we mark the plugin as unhealthy.
// the client has been closed so this will return quickly with the
// plugin's basic info
ctx, cancelFn := i.requestCtxWithTimeout(time.Second)
info := i.fp.fingerprint(ctx)
cancelFn()
if info != nil {
i.updater(i.info.Name, info)
}
close(i.shutdownCh)
return
case <-timer.C:
ctx, cancelFn := i.requestCtxWithTimeout(managerFingerprintInterval)
info := i.fp.fingerprint(ctx)
cancelFn()
if info != nil {
i.updater(i.info.Name, info)
}
timer.Reset(managerFingerprintInterval)
}
}
}
func (i *instanceManager) shutdown() {
i.shutdownCtxCancelFn()
<-i.shutdownCh
}