-
Notifications
You must be signed in to change notification settings - Fork 105
/
deferred_package_manager.go
175 lines (155 loc) · 6.2 KB
/
deferred_package_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package packages
// DeferredPackageManager wraps a CloudPackageManager and a goroutine that is establishing a connection to app. It starts up
// instantly and behaves as a noop package manager until a connection to app has been established.
//
// Raison d'être: AquireConnection will waste 5 seconds timing out on robots that have no internet connection but have downloaded
// all of their cloud_packages
//
// This puts an optimization on top of that behavior: On the first start of the package manager, if all of the expected packages are
// present on the system, put the app-connection establishment in a goroutine and use a noopManager for the first Sync.
// If there are missing packages, this will still block to prevent a half-started robot.
import (
"context"
"os"
"sync"
"github.com/pkg/errors"
pb "go.viam.com/api/app/packages/v1"
goutils "go.viam.com/utils"
"go.viam.com/rdk/config"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
)
type deferredPackageManager struct {
resource.Named
resource.TriviallyReconfigurable
ctx context.Context
establishConnection func(ctx context.Context) (pb.PackageServiceClient, error)
cloudManager ManagerSyncer
cloudManagerArgs cloudManagerConstructorArgs
cloudManagerLock sync.Mutex
lastSyncedManager ManagerSyncer
lastSyncedManagerLock sync.Mutex
logger logging.Logger
}
type cloudManagerConstructorArgs struct {
cloudConfig *config.Cloud
packagesDir string
logger logging.Logger
}
var (
_ Manager = (*deferredPackageManager)(nil)
_ ManagerSyncer = (*deferredPackageManager)(nil)
)
// DeferredServiceName is used to refer to/depend on this service internally.
var DeferredServiceName = resource.NewName(resource.APINamespaceRDKInternal.WithServiceType(SubtypeName), "deferred-manager")
// NewDeferredPackageManager returns a DeferredPackageManager. See deferred_package_manager.go for more details.
func NewDeferredPackageManager(
ctx context.Context,
establishConnection func(ctx context.Context) (pb.PackageServiceClient, error),
cloudConfig *config.Cloud,
packagesDir string,
logger logging.Logger,
) ManagerSyncer {
noopManager := noopManager{Named: InternalServiceName.AsNamed()}
return &deferredPackageManager{
Named: DeferredServiceName.AsNamed(),
ctx: ctx,
establishConnection: establishConnection,
cloudManagerArgs: cloudManagerConstructorArgs{cloudConfig, packagesDir, logger},
lastSyncedManager: &noopManager,
logger: logger,
}
}
// Sync syncs packages and removes any not in the list from the local file system.
// If there are packages missing on the local fs, this will wait while attempting to establish a connection to app.viam.
//
// Sync is the core state-setting operation of the package manager so if we sync with one manager,
// all subsequent operations should use the same manager until the next sync.
func (m *deferredPackageManager) Sync(ctx context.Context, packages []config.PackageConfig, modules []config.Module) error {
m.lastSyncedManagerLock.Lock()
defer m.lastSyncedManagerLock.Unlock()
mgr, err := m.getManagerForSync(ctx, packages)
if err != nil {
return err
}
m.lastSyncedManager = mgr
return mgr.Sync(ctx, packages, modules)
}
// Cleanup removes all unknown packages from the working directory.
func (m *deferredPackageManager) Cleanup(ctx context.Context) error {
m.lastSyncedManagerLock.Lock()
defer m.lastSyncedManagerLock.Unlock()
return m.lastSyncedManager.Cleanup(ctx)
}
// PackagePath returns the package if it exists and already download. If it does not exist it returns a ErrPackageMissing error.
func (m *deferredPackageManager) PackagePath(name PackageName) (string, error) {
m.lastSyncedManagerLock.Lock()
defer m.lastSyncedManagerLock.Unlock()
return m.lastSyncedManager.PackagePath(name)
}
// Close manager.
func (m *deferredPackageManager) Close(ctx context.Context) error {
m.lastSyncedManagerLock.Lock()
defer m.lastSyncedManagerLock.Unlock()
return m.lastSyncedManager.Close(ctx)
}
// getManagerForSync returns the cloudManager if there is one cached (or if there are missing packages)
// otherwise return noopManager and async get a cloudManager.
func (m *deferredPackageManager) getManagerForSync(ctx context.Context, packages []config.PackageConfig) (ManagerSyncer, error) {
m.cloudManagerLock.Lock()
// the lock is handed to the goroutine so we do not defer an unlock
// return cached cloud manager if possible
if m.cloudManager != nil {
m.cloudManagerLock.Unlock()
return m.cloudManager, nil
}
// if we are missing packages, run createCloudManager synchronously
if m.isMissingPackages(packages) {
mgr, err := m.createCloudManager(ctx)
if err == nil {
// err == nil, not != nil
m.cloudManager = mgr
m.logger.Info("cloud package manager created synchronously")
}
m.cloudManagerLock.Unlock()
return mgr, err
}
// otherwise, spawn a goroutine to establish the connection and use a noopManager in the meantime
// hold the cloudManagerLock until this finishes
goutils.PanicCapturingGo(func() {
defer m.cloudManagerLock.Unlock()
mgr, err := m.createCloudManager(ctx)
if err != nil {
m.logger.Warnf("failed to create cloud package manager %v", err)
} else {
m.cloudManager = mgr
m.logger.Info("cloud package manager created asyncronously")
}
})
// No unlock here. The goroutine will unlock
return &noopManager{Named: InternalServiceName.AsNamed()}, nil
}
// createCloudManager uses the passed establishConnection function to instantiate a cloudManager.
func (m *deferredPackageManager) createCloudManager(ctx context.Context) (ManagerSyncer, error) {
client, err := m.establishConnection(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to a establish connection to app.viam")
}
return NewCloudManager(
m.cloudManagerArgs.cloudConfig,
client,
m.cloudManagerArgs.packagesDir,
m.cloudManagerArgs.logger,
)
}
// isMissingPackages is used pre-sync to determine if we should force-wait for the connection
// to be established.
func (m *deferredPackageManager) isMissingPackages(packages []config.PackageConfig) bool {
for _, pkg := range packages {
dir := pkg.LocalDataDirectory(m.cloudManagerArgs.packagesDir)
if _, err := os.Stat(dir); err != nil {
return true
}
}
return false
}