/
starter.go
307 lines (281 loc) · 10.1 KB
/
starter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package starter provides a single function that starts up servers for a
// mounttable and a device manager that is mounted on it.
package starter
import (
"encoding/base64"
"os"
"path/filepath"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
displib "v.io/x/ref/lib/dispatcher"
_ "v.io/x/ref/runtime/factories/roaming"
"v.io/x/ref/services/debug/debuglib"
"v.io/x/ref/services/device/deviced/internal/impl"
"v.io/x/ref/services/device/deviced/internal/versioning"
"v.io/x/ref/services/device/internal/claim"
"v.io/x/ref/services/device/internal/config"
"v.io/x/ref/services/internal/pathperms"
"v.io/x/ref/services/mounttable/mounttablelib"
)
const pkgPath = "v.io/x/ref/services/device/deviced/internal/starter"
var (
errCantSaveInfo = verror.Register(pkgPath+".errCantSaveInfo", verror.NoRetry, "{1:}{2:} failed to save info{:_}")
)
type NamespaceArgs struct {
Name string // Name to publish the mounttable service under (after claiming).
ListenSpec rpc.ListenSpec // ListenSpec for the server.
PermissionsFile string // Path to the Permissions file used by the mounttable.
PersistenceDir string // Path to the directory holding persistent acls.
// Name in the local neighborhood on which to make the mounttable
// visible. If empty, the mounttable will not be visible in the local
// neighborhood.
Neighborhood string
}
type DeviceArgs struct {
Name string // Name to publish the device service under (after claiming).
ListenSpec rpc.ListenSpec // ListenSpec for the device server.
ConfigState *config.State // Configuration for the device.
TestMode bool // Whether the device is running in test mode or not.
RestartCallback func() // Callback invoked when the device service is restarted.
PairingToken string // PairingToken that a claimer needs to provide.
}
func (d *DeviceArgs) name(mt string) string {
if d.Name != "" {
return d.Name
}
return naming.Join(mt, "devmgr")
}
type Args struct {
Namespace NamespaceArgs
Device DeviceArgs
// If true, the global namespace will be made available on the
// mounttable server under "global/".
MountGlobalNamespaceInLocalNamespace bool
}
// Start creates servers for the mounttable and device services and links them together.
//
// Returns the endpoints for the claimable service (empty if already claimed),
// a callback to be invoked to shutdown the services on success, or an error on
// failure.
func Start(ctx *context.T, args Args) ([]naming.Endpoint, func(), error) {
// Is this binary compatible with the state on disk?
if err := versioning.CheckCompatibility(ctx, args.Device.ConfigState.Root); err != nil {
return nil, nil, err
}
// In test mode, we skip writing the info file to disk, and we skip
// attempting to start the claimable service: the device must have been
// claimed already to enable updates anyway, and checking for perms in
// NewClaimableDispatcher needlessly prints a perms signature
// verification error to the logs.
if args.Device.TestMode {
cleanup, err := startClaimedDevice(ctx, args)
return nil, cleanup, err
}
// TODO(caprita): use some mechanism (a file lock or presence of entry
// in mounttable) to ensure only one device manager is running in an
// installation?
mi := &impl.ManagerInfo{
Pid: os.Getpid(),
}
if err := impl.SaveManagerInfo(filepath.Join(args.Device.ConfigState.Root, "device-manager"), mi); err != nil {
return nil, nil, verror.New(errCantSaveInfo, ctx, err)
}
// If the device has not yet been claimed, start the mounttable and
// claimable service and wait for it to be claimed.
// Once a device is claimed, close any previously running servers and
// start a new mounttable and device service.
claimable, claimed := claim.NewClaimableDispatcher(ctx, impl.PermsDir(args.Device.ConfigState), args.Device.PairingToken, security.AllowEveryone())
if claimable == nil {
// Device has already been claimed, bypass claimable service
// stage.
cleanup, err := startClaimedDevice(ctx, args)
return nil, cleanup, err
}
eps, stopClaimable, err := startClaimableDevice(ctx, claimable, args)
if err != nil {
return nil, nil, err
}
stop := make(chan struct{})
stopped := make(chan struct{})
go waitToBeClaimedAndStartClaimedDevice(ctx, stopClaimable, claimed, stop, stopped, args)
return eps, func() {
close(stop)
<-stopped
}, nil
}
func startClaimableDevice(ctx *context.T, dispatcher rpc.Dispatcher, args Args) ([]naming.Endpoint, func(), error) {
ctx, cancel := context.WithCancel(ctx)
ctx = v23.WithListenSpec(ctx, args.Device.ListenSpec)
ctx, server, err := v23.WithNewDispatchingServer(ctx, "", dispatcher, options.LameDuckTimeout(30*time.Second))
if err != nil {
cancel()
return nil, nil, err
}
shutdown := func() {
cancel()
ctx.Infof("Stopping claimable server...")
<-server.Closed()
ctx.Infof("Stopped claimable server.")
}
publicKey, err := v23.GetPrincipal(ctx).PublicKey().MarshalBinary()
if err != nil {
shutdown()
return nil, nil, err
}
var eps []naming.Endpoint
if proxy := args.Device.ListenSpec.Proxy; proxy != "" {
for {
status := server.Status()
if err, ok := status.ProxyErrors[proxy]; ok && err == nil {
eps = status.Endpoints
break
}
ctx.Infof("Waiting for proxy address to appear...")
<-status.Dirty
}
} else {
eps = server.Status().Endpoints
}
ctx.Infof("Unclaimed device manager with public_key: %s", base64.URLEncoding.EncodeToString(publicKey))
for _, ep := range eps {
ctx.Infof("Unclaimed device manager endpoint: %v", ep.Name())
}
ctx.FlushLog()
return eps, shutdown, nil
}
func waitToBeClaimedAndStartClaimedDevice(ctx *context.T, stopClaimable func(), claimed, stop <-chan struct{}, stopped chan<- struct{}, args Args) {
// Wait for either the claimable service to complete, or be stopped
defer close(stopped)
select {
case <-claimed:
stopClaimable()
case <-stop:
stopClaimable()
return
}
shutdown, err := startClaimedDevice(ctx, args)
if err != nil {
ctx.Errorf("Failed to start device service after it was claimed: %v", err)
v23.GetAppCycle(ctx).Stop(ctx)
return
}
defer shutdown()
<-stop // Wait to be stopped
}
func startClaimedDevice(ctx *context.T, args Args) (func(), error) {
ctx.Infof("Starting claimed device services...")
permStore := pathperms.NewPathStore(ctx)
permsDir := impl.PermsDir(args.Device.ConfigState)
debugAuth, err := pathperms.NewHierarchicalAuthorizer(permsDir, permsDir, permStore)
if err != nil {
return nil, err
}
debugDisp := debuglib.NewDispatcher(debugAuth)
ctx = v23.WithReservedNameDispatcher(ctx, debugDisp)
ctx.Infof("Starting mount table...")
mtName, stopMT, err := startMounttable(ctx, args.Namespace)
if err != nil {
ctx.Errorf("Failed to start mounttable service: %v", err)
return nil, err
} else {
ctx.Infof("Started mount table.")
}
ctx.Infof("Starting device service...")
stopDevice, err := startDeviceServer(ctx, args.Device, mtName, permStore)
if err != nil {
ctx.Errorf("Failed to start device service: %v", err)
stopMT()
return nil, err
} else {
ctx.Infof("Started device service.")
}
if args.MountGlobalNamespaceInLocalNamespace {
ctx.Infof("Mounting %v ...", mtName)
mountGlobalNamespaceInLocalNamespace(ctx, mtName)
ctx.Infof("Mounted %v", mtName)
}
impl.InvokeCallback(ctx, args.Device.ConfigState.Name)
ctx.Infof("Started claimed device services.")
return func() {
stopDevice()
stopMT()
}, nil
}
func startMounttable(ctx *context.T, n NamespaceArgs) (string, func(), error) {
mtName, stopMT, err := mounttablelib.StartServers(ctx, n.ListenSpec, n.Name, n.Neighborhood, n.PermissionsFile, n.PersistenceDir, "mounttable")
if err != nil {
ctx.Errorf("mounttablelib.StartServers(%#v) failed: %v", n, err)
} else {
ctx.Infof("Local mounttable (%v) published as %q", mtName, n.Name)
}
return mtName, func() {
ctx.Infof("Stopping mounttable...")
stopMT()
ctx.Infof("Stopped mounttable.")
}, err
}
// startDeviceServer creates an rpc.Server and sets it up to server the Device service.
//
// ls: ListenSpec for the server
// configState: configuration for the Device service dispatcher
// mt: Object address of the mounttable
// dm: Name to publish the device service under
// testMode: whether the service is to be run in test mode
// restarted: callback invoked when the device manager is restarted.
//
// Returns:
// (1) Function to be called to force the service to shutdown
// (2) Any errors in starting the service (in which case, (1) will be nil)
func startDeviceServer(ctx *context.T, args DeviceArgs, mt string, permStore *pathperms.PathStore) (shutdown func(), err error) {
ctx = v23.WithListenSpec(ctx, args.ListenSpec)
wrapper := displib.NewDispatcherWrapper()
ctx, cancel := context.WithCancel(ctx)
ctx, server, err := v23.WithNewDispatchingServer(ctx, args.name(mt), wrapper)
if err != nil {
cancel()
return nil, err
}
args.ConfigState.Name = server.Status().Endpoints[0].Name()
dispatcher, dShutdown, err := impl.NewDispatcher(ctx, args.ConfigState, mt, args.TestMode, args.RestartCallback, permStore)
if err != nil {
cancel()
<-server.Closed()
return nil, err
}
shutdown = func() {
// TODO(caprita): Capture the Dying state by feeding it back to
// the dispatcher and exposing it in Status.
ctx.Infof("Stopping device server...")
cancel()
<-server.Closed()
dShutdown()
ctx.Infof("Stopped device.")
}
wrapper.SetDispatcher(dispatcher)
ctx.Infof("Device manager (%v) published as %v", args.ConfigState.Name, args.name(mt))
return shutdown, nil
}
func mountGlobalNamespaceInLocalNamespace(ctx *context.T, localMT string) {
ns := v23.GetNamespace(ctx)
for _, root := range ns.Roots() {
go func(r string) {
for {
err := ns.Mount(ctx, naming.Join(localMT, "global"), r, 0 /* forever */, naming.ServesMountTable(true))
if err == nil {
break
}
ctx.Infof("Failed to Mount global namespace: %v", err)
time.Sleep(time.Second)
}
}(root)
}
}