forked from snapcore/snapd
/
bulk.go
325 lines (289 loc) · 9.04 KB
/
bulk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
// -*- Mode: Go; indent-tabs-mode: t -*-
/*
* Copyright (C) 2020 Canonical Ltd
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package assertstate
import (
"context"
"fmt"
"sort"
"strings"
"github.com/snapcore/snapd/asserts"
"github.com/snapcore/snapd/asserts/snapasserts"
"github.com/snapcore/snapd/overlord/snapstate"
"github.com/snapcore/snapd/overlord/state"
"github.com/snapcore/snapd/release"
"github.com/snapcore/snapd/store"
)
const storeGroup = "store assertion"
// maxGroups is the maximum number of assertion groups we set with the
// asserts.Pool used to refresh snap assertions, it corresponds
// roughly to for how many snaps we will request assertions in
// in one /v2/snaps/refresh request.
// Given that requesting assertions for ~500 snaps together with no
// updates can take around 900ms-1s, conservatively set it to half of
// that. Most systems should be done in one request anyway.
var maxGroups = 256
func bulkRefreshSnapDeclarations(s *state.State, snapStates map[string]*snapstate.SnapState, userID int, deviceCtx snapstate.DeviceContext, opts *RefreshAssertionsOptions) error {
db := cachedDB(s)
pool := asserts.NewPool(db, maxGroups)
var mergedRPErr *resolvePoolError
tryResolvePool := func() error {
err := resolvePool(s, pool, nil, userID, deviceCtx, opts)
if rpe, ok := err.(*resolvePoolError); ok {
if mergedRPErr == nil {
mergedRPErr = rpe
} else {
mergedRPErr.merge(rpe)
}
return nil
}
return err
}
c := 0
for instanceName, snapst := range snapStates {
sideInfo := snapst.CurrentSideInfo()
if sideInfo.SnapID == "" {
continue
}
declRef := &asserts.Ref{
Type: asserts.SnapDeclarationType,
PrimaryKey: []string{release.Series, sideInfo.SnapID},
}
// update snap-declaration (and prereqs) for the snap,
// they were originally added at install time
if err := pool.AddToUpdate(declRef, instanceName); err != nil {
return fmt.Errorf("cannot prepare snap-declaration refresh for snap %q: %v", instanceName, err)
}
c++
if c%maxGroups == 0 {
// we have exhausted max groups, resolve
// what we setup so far and then clear groups
// to reuse the pool
if err := tryResolvePool(); err != nil {
return err
}
if err := pool.ClearGroups(); err != nil {
// this shouldn't happen but if it
// does fallback
return &bulkAssertionFallbackError{err}
}
}
}
modelAs := deviceCtx.Model()
// fetch store assertion if available
if modelAs.Store() != "" {
storeRef := asserts.Ref{
Type: asserts.StoreType,
PrimaryKey: []string{modelAs.Store()},
}
if err := pool.AddToUpdate(&storeRef, storeGroup); err != nil {
if !asserts.IsNotFound(err) {
return fmt.Errorf("cannot prepare store assertion refresh: %v", err)
}
// assertion is not present in the db yet,
// we'll try to resolve it (fetch it) first
storeAt := &asserts.AtRevision{
Ref: storeRef,
Revision: asserts.RevisionNotKnown,
}
err := pool.AddUnresolved(storeAt, storeGroup)
if err != nil {
return fmt.Errorf("cannot prepare store assertion fetching: %v", err)
}
}
}
if err := tryResolvePool(); err != nil {
return err
}
if mergedRPErr != nil {
if e := mergedRPErr.errors[storeGroup]; asserts.IsNotFound(e) || e == asserts.ErrUnresolved {
// ignore
delete(mergedRPErr.errors, storeGroup)
}
if len(mergedRPErr.errors) == 0 {
return nil
}
mergedRPErr.message = "cannot refresh snap-declarations for snaps"
return mergedRPErr
}
return nil
}
func bulkRefreshValidationSetAsserts(s *state.State, vsets map[string]*ValidationSetTracking, beforeCommitChecker func(*asserts.Database, asserts.Backstore) error, userID int, deviceCtx snapstate.DeviceContext, opts *RefreshAssertionsOptions) error {
db := cachedDB(s)
pool := asserts.NewPool(db, maxGroups)
ignoreNotFound := make(map[string]bool)
for _, vs := range vsets {
var atSeq *asserts.AtSequence
if vs.PinnedAt > 0 {
// pinned to specific sequence, update to latest revision for same
// sequence.
atSeq = &asserts.AtSequence{
Type: asserts.ValidationSetType,
SequenceKey: []string{release.Series, vs.AccountID, vs.Name},
Sequence: vs.PinnedAt,
Pinned: true,
}
} else {
// not pinned, update to latest sequence
atSeq = &asserts.AtSequence{
Type: asserts.ValidationSetType,
SequenceKey: []string{release.Series, vs.AccountID, vs.Name},
Sequence: vs.Current,
}
}
// every sequence to resolve has own group
group := atSeq.Unique()
if vs.LocalOnly {
ignoreNotFound[group] = true
}
if err := pool.AddSequenceToUpdate(atSeq, group); err != nil {
return err
}
}
err := resolvePoolNoFallback(s, pool, beforeCommitChecker, userID, deviceCtx, opts)
if err == nil {
return nil
}
if _, ok := err.(*snapasserts.ValidationSetsConflictError); ok {
return err
}
if rerr, ok := err.(*resolvePoolError); ok {
// ignore resolving errors for validation sets that are local only (no
// assertion in the store).
for group := range ignoreNotFound {
if e := rerr.errors[group]; asserts.IsNotFound(e) || e == asserts.ErrUnresolved {
delete(rerr.errors, group)
}
}
if len(rerr.errors) == 0 {
return nil
}
}
return fmt.Errorf("cannot refresh validation set assertions: %v", err)
}
// marker error to request falling back to the old implemention for assertion
// refreshes
type bulkAssertionFallbackError struct {
err error
}
func (e *bulkAssertionFallbackError) Error() string {
return fmt.Sprintf("unsuccessful bulk assertion refresh, fallback: %v", e.err)
}
type resolvePoolError struct {
message string
// errors maps groups to errors
errors map[string]error
}
func (rpe *resolvePoolError) merge(rpe1 *resolvePoolError) {
// we expect usually rpe and rpe1 errors to be disjunct, but is also
// ok for rpe1 errors to win
for k, e := range rpe1.errors {
rpe.errors[k] = e
}
}
func (rpe *resolvePoolError) Error() string {
message := rpe.message
if message == "" {
message = "cannot fetch and resolve assertions"
}
s := make([]string, 0, 1+len(rpe.errors))
s = append(s, fmt.Sprintf("%s:", message))
groups := make([]string, 0, len(rpe.errors))
for g := range rpe.errors {
groups = append(groups, g)
}
sort.Strings(groups)
for _, g := range groups {
s = append(s, fmt.Sprintf(" - %s: %v", g, rpe.errors[g]))
}
return strings.Join(s, "\n")
}
func resolvePool(s *state.State, pool *asserts.Pool, checkBeforeCommit func(*asserts.Database, asserts.Backstore) error, userID int, deviceCtx snapstate.DeviceContext, opts *RefreshAssertionsOptions) error {
user, err := userFromUserID(s, userID)
if err != nil {
return err
}
sto := snapstate.Store(s, deviceCtx)
db := cachedDB(s)
unsupported := handleUnsupported(db)
for {
storeOpts := &store.RefreshOptions{IsAutoRefresh: opts.IsAutoRefresh}
s.Unlock()
_, aresults, err := sto.SnapAction(context.TODO(), nil, nil, pool, user, storeOpts)
s.Lock()
if err != nil {
// request fallback on
// * unexpected SnapActionErrors or
// * unexpected HTTP status of 4xx or 500
ignore := false
switch stoErr := err.(type) {
case *store.SnapActionError:
if !stoErr.NoResults || len(stoErr.Other) != 0 {
return &bulkAssertionFallbackError{stoErr}
}
// simply no results error, we are likely done
ignore = true
case *store.UnexpectedHTTPStatusError:
if stoErr.StatusCode >= 400 && stoErr.StatusCode <= 500 {
return &bulkAssertionFallbackError{stoErr}
}
}
if !ignore {
return err
}
}
if len(aresults) == 0 {
// everything resolved if no errors
break
}
for _, ares := range aresults {
b := asserts.NewBatch(unsupported)
s.Unlock()
err := sto.DownloadAssertions(ares.StreamURLs, b, user)
s.Lock()
if err != nil {
pool.AddGroupingError(err, ares.Grouping)
continue
}
_, err = pool.AddBatch(b, ares.Grouping)
if err != nil {
return err
}
}
}
if checkBeforeCommit != nil {
if err := checkBeforeCommit(db, pool.Backstore()); err != nil {
return err
}
}
pool.CommitTo(db)
errors := pool.Errors()
if len(errors) != 0 {
return &resolvePoolError{errors: errors}
}
return nil
}
func resolvePoolNoFallback(s *state.State, pool *asserts.Pool, checkBeforeCommit func(*asserts.Database, asserts.Backstore) error, userID int, deviceCtx snapstate.DeviceContext, opts *RefreshAssertionsOptions) error {
err := resolvePool(s, pool, checkBeforeCommit, userID, deviceCtx, opts)
if err != nil {
// no fallback, report inner error.
if ferr, ok := err.(*bulkAssertionFallbackError); ok {
err = ferr.err
}
}
return err
}