forked from snapcore/snapd
/
backend.go
1036 lines (917 loc) · 27.9 KB
/
backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// -*- Mode: Go; indent-tabs-mode: t -*-
/*
* Copyright (C) 2018-2020 Canonical Ltd
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 3 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package backend
import (
"archive/tar"
"archive/zip"
"bytes"
"context"
"crypto"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"syscall"
"time"
"github.com/snapcore/snapd/client"
"github.com/snapcore/snapd/dirs"
"github.com/snapcore/snapd/logger"
"github.com/snapcore/snapd/osutil"
"github.com/snapcore/snapd/snap"
"github.com/snapcore/snapd/snapdenv"
"github.com/snapcore/snapd/strutil"
)
const (
archiveName = "archive.tgz"
metadataName = "meta.json"
metaHashName = "meta.sha3_384"
userArchivePrefix = "user/"
userArchiveSuffix = ".tgz"
)
var (
// Stop is used to ask Iter to stop iteration, without it being an error.
Stop = errors.New("stop iteration")
osOpen = os.Open
dirNames = (*os.File).Readdirnames
backendOpen = Open
timeNow = time.Now
usersForUsernames = usersForUsernamesImpl
)
// LastSnapshotSetID returns the highest set id number for the snapshots stored
// in snapshots directory; set ids are inferred from the filenames.
func LastSnapshotSetID() (uint64, error) {
dir, err := osOpen(dirs.SnapshotsDir)
if err != nil {
if osutil.IsDirNotExist(err) {
// no snapshots
return 0, nil
}
return 0, fmt.Errorf("cannot open snapshots directory: %v", err)
}
defer dir.Close()
var maxSetID uint64
var readErr error
for readErr == nil {
var names []string
// note os.Readdirnames can return a non-empty names and a non-nil err
names, readErr = dirNames(dir, 100)
for _, name := range names {
if ok, setID := isSnapshotFilename(name); ok {
if setID > maxSetID {
maxSetID = setID
}
}
}
}
if readErr != nil && readErr != io.EOF {
return 0, readErr
}
return maxSetID, nil
}
// Iter loops over all snapshots in the snapshots directory, applying the given
// function to each. The snapshot will be closed after the function returns. If
// the function returns an error, iteration is stopped (and if the error isn't
// Stop, it's returned as the error of the iterator).
func Iter(ctx context.Context, f func(*Reader) error) error {
if err := ctx.Err(); err != nil {
return err
}
dir, err := osOpen(dirs.SnapshotsDir)
if err != nil {
if osutil.IsDirNotExist(err) {
// no dir -> no snapshots
return nil
}
return fmt.Errorf("cannot open snapshots directory: %v", err)
}
defer dir.Close()
importsInProgress := map[uint64]bool{}
var names []string
var readErr error
for readErr == nil && err == nil {
names, readErr = dirNames(dir, 100)
// note os.Readdirnames can return a non-empty names and a non-nil err
for _, name := range names {
if err = ctx.Err(); err != nil {
break
}
// filter out non-snapshot directory entries
ok, setID := isSnapshotFilename(name)
if !ok {
continue
}
// keep track of in-progress in a map as well
// to avoid races. E.g.:
// 1. The dirNnames() are read
// 2. 99_some-snap_1.0_x1.zip is returned
// 3. the code checks if 99_importing is there,
// it is so 99_some-snap is skipped
// 4. other snapshots are examined
// 5. in-parallel 99_importing finishes
// 7. 99_other-snap_1.0_x1.zip is now examined
// 8. code checks if 99_importing is there, but it
// is no longer there because import
// finished in the meantime. We still
// want to not call the callback with
// 99_other-snap or the callback would get
// an incomplete view about 99_snapshot.
if importsInProgress[setID] {
continue
}
if importInProgressFor(setID) {
importsInProgress[setID] = true
continue
}
filename := filepath.Join(dirs.SnapshotsDir, name)
reader, openError := backendOpen(filename, setID)
// reader can be non-nil even when openError is not nil (in
// which case reader.Broken will have a reason). f can
// check and either ignore or return an error when
// finding a broken snapshot.
if reader != nil {
err = f(reader)
} else {
// TODO: use warnings instead
logger.Noticef("Cannot open snapshot %q: %v.", name, openError)
}
if openError == nil {
// if openError was nil the snapshot was opened and needs closing
if closeError := reader.Close(); err == nil {
err = closeError
}
}
if err != nil {
break
}
}
}
if readErr != nil && readErr != io.EOF {
return readErr
}
if err == Stop {
err = nil
}
return err
}
// List valid snapshots sets.
func List(ctx context.Context, setID uint64, snapNames []string) ([]client.SnapshotSet, error) {
setshots := map[uint64][]*client.Snapshot{}
err := Iter(ctx, func(reader *Reader) error {
if setID == 0 || reader.SetID == setID {
if len(snapNames) == 0 || strutil.ListContains(snapNames, reader.Snap) {
setshots[reader.SetID] = append(setshots[reader.SetID], &reader.Snapshot)
}
}
return nil
})
sets := make([]client.SnapshotSet, 0, len(setshots))
for id, shots := range setshots {
sort.Sort(bySnap(shots))
sets = append(sets, client.SnapshotSet{ID: id, Snapshots: shots})
}
sort.Sort(byID(sets))
return sets, err
}
// Filename of the given client.Snapshot in this backend.
func Filename(snapshot *client.Snapshot) string {
// this _needs_ the snap name and version to be valid
return filepath.Join(dirs.SnapshotsDir, fmt.Sprintf("%d_%s_%s_%s.zip", snapshot.SetID, snapshot.Snap, snapshot.Version, snapshot.Revision))
}
// isSnapshotFilename checks if the given filePath is a snapshot file name, i.e.
// if it starts with a numeric set id and ends with .zip extension;
// filePath can be just a file name, or a full path.
func isSnapshotFilename(filePath string) (ok bool, setID uint64) {
fname := filepath.Base(filePath)
// XXX: we could use a regexp here to match very precisely all the elements
// of the filename following Filename() above, but perhaps it's better no to
// go overboard with it in case the format evolves in the future. Only check
// if the name starts with a set-id and ends with .zip.
//
// Filename is "<sid>_<snapName>_version_revision.zip", e.g. "16_snapcraft_4.2_5407.zip"
ext := filepath.Ext(fname)
if ext != ".zip" {
return false, 0
}
parts := strings.SplitN(fname, "_", 2)
if len(parts) != 2 {
return false, 0
}
// invalid: no parts following <sid>_
if parts[1] == ext {
return false, 0
}
id, err := strconv.Atoi(parts[0])
if err != nil {
return false, 0
}
return true, uint64(id)
}
// EstimateSnapshotSize calculates estimated size of the snapshot.
func EstimateSnapshotSize(si *snap.Info, usernames []string) (uint64, error) {
var total uint64
calculateSize := func(path string, finfo os.FileInfo, err error) error {
if finfo.Mode().IsRegular() {
total += uint64(finfo.Size())
}
return err
}
visitDir := func(dir string) error {
exists, isDir, err := osutil.DirExists(dir)
if err != nil {
return err
}
if !(exists && isDir) {
return nil
}
return filepath.Walk(dir, calculateSize)
}
for _, dir := range []string{si.DataDir(), si.CommonDataDir()} {
if err := visitDir(dir); err != nil {
return 0, err
}
}
users, err := usersForUsernames(usernames)
if err != nil {
return 0, err
}
for _, usr := range users {
if err := visitDir(si.UserDataDir(usr.HomeDir)); err != nil {
return 0, err
}
if err := visitDir(si.UserCommonDataDir(usr.HomeDir)); err != nil {
return 0, err
}
}
// XXX: we could use a typical compression factor here
return total, nil
}
// Save a snapshot
func Save(ctx context.Context, id uint64, si *snap.Info, cfg map[string]interface{}, usernames []string) (*client.Snapshot, error) {
if err := os.MkdirAll(dirs.SnapshotsDir, 0700); err != nil {
return nil, err
}
snapshot := &client.Snapshot{
SetID: id,
Snap: si.InstanceName(),
SnapID: si.SnapID,
Revision: si.Revision,
Version: si.Version,
Epoch: si.Epoch,
Time: timeNow(),
SHA3_384: make(map[string]string),
Size: 0,
Conf: cfg,
// Note: Auto is no longer set in the Snapshot.
}
aw, err := osutil.NewAtomicFile(Filename(snapshot), 0600, 0, osutil.NoChown, osutil.NoChown)
if err != nil {
return nil, err
}
// if things worked, we'll commit (and Cancel becomes a NOP)
defer aw.Cancel()
w := zip.NewWriter(aw)
defer w.Close() // note this does not close the file descriptor (that's done by hand on the atomic writer, above)
if err := addDirToZip(ctx, snapshot, w, "root", archiveName, si.DataDir()); err != nil {
return nil, err
}
users, err := usersForUsernames(usernames)
if err != nil {
return nil, err
}
for _, usr := range users {
if err := addDirToZip(ctx, snapshot, w, usr.Username, userArchiveName(usr), si.UserDataDir(usr.HomeDir)); err != nil {
return nil, err
}
}
metaWriter, err := w.Create(metadataName)
if err != nil {
return nil, err
}
hasher := crypto.SHA3_384.New()
enc := json.NewEncoder(io.MultiWriter(metaWriter, hasher))
if err := enc.Encode(snapshot); err != nil {
return nil, err
}
hashWriter, err := w.Create(metaHashName)
if err != nil {
return nil, err
}
fmt.Fprintf(hashWriter, "%x\n", hasher.Sum(nil))
if err := w.Close(); err != nil {
return nil, err
}
if err := ctx.Err(); err != nil {
return nil, err
}
if err := aw.Commit(); err != nil {
return nil, err
}
return snapshot, nil
}
var isTesting = snapdenv.Testing()
func addDirToZip(ctx context.Context, snapshot *client.Snapshot, w *zip.Writer, username string, entry, dir string) error {
parent, revdir := filepath.Split(dir)
exists, isDir, err := osutil.DirExists(parent)
if err != nil {
return err
}
if exists && !isDir {
logger.Noticef("Not saving directories under %q in snapshot #%d of %q as it is not a directory.", parent, snapshot.SetID, snapshot.Snap)
return nil
}
if !exists {
logger.Debugf("Not saving directories under %q in snapshot #%d of %q as it is does not exist.", parent, snapshot.SetID, snapshot.Snap)
return nil
}
tarArgs := []string{
"--create",
"--sparse", "--gzip",
"--format", "gnu",
"--directory", parent,
}
noRev, noCommon := true, true
exists, isDir, err = osutil.DirExists(dir)
if err != nil {
return err
}
switch {
case exists && isDir:
tarArgs = append(tarArgs, revdir)
noRev = false
case exists && !isDir:
logger.Noticef("Not saving %q in snapshot #%d of %q as it is not a directory.", dir, snapshot.SetID, snapshot.Snap)
case !exists:
logger.Debugf("Not saving %q in snapshot #%d of %q as it is does not exist.", dir, snapshot.SetID, snapshot.Snap)
}
common := filepath.Join(parent, "common")
exists, isDir, err = osutil.DirExists(common)
if err != nil {
return err
}
switch {
case exists && isDir:
tarArgs = append(tarArgs, "common")
noCommon = false
case exists && !isDir:
logger.Noticef("Not saving %q in snapshot #%d of %q as it is not a directory.", common, snapshot.SetID, snapshot.Snap)
case !exists:
logger.Debugf("Not saving %q in snapshot #%d of %q as it is does not exist.", common, snapshot.SetID, snapshot.Snap)
}
if noCommon && noRev {
return nil
}
archiveWriter, err := w.CreateHeader(&zip.FileHeader{Name: entry})
if err != nil {
return err
}
var sz osutil.Sizer
hasher := crypto.SHA3_384.New()
cmd := tarAsUser(username, tarArgs...)
cmd.Stdout = io.MultiWriter(archiveWriter, hasher, &sz)
matchCounter := &strutil.MatchCounter{
// keep at most 5 matches
N: 5,
// keep the last lines only, those likely contain the reason for
// fatal errors
LastN: true,
}
cmd.Stderr = matchCounter
if isTesting {
matchCounter.N = -1
cmd.Stderr = io.MultiWriter(os.Stderr, matchCounter)
}
if err := osutil.RunWithContext(ctx, cmd); err != nil {
matches, count := matchCounter.Matches()
if count > 0 {
note := ""
if count > 5 {
note = fmt.Sprintf(" (showing last 5 lines out of %d)", count)
}
// we have at most 5 matches here
errStr := strings.Join(matches, "\n")
return fmt.Errorf("cannot create archive%s:\n%s", note, errStr)
}
return fmt.Errorf("tar failed: %v", err)
}
snapshot.SHA3_384[entry] = fmt.Sprintf("%x", hasher.Sum(nil))
snapshot.Size += sz.Size()
return nil
}
var ErrCannotCancel = errors.New("cannot cancel: import already finished")
// multiError collects multiple errors that affected an operation.
type multiError struct {
header string
errs []error
}
// newMultiError returns a new multiError struct initialized with
// the given format string that explains what operation potentially
// went wrong. multiError can be nested and will render correctly
// in these cases.
func newMultiError(header string, errs []error) error {
return &multiError{header: header, errs: errs}
}
// Error formats the error string.
func (me *multiError) Error() string {
return me.nestedError(0)
}
// helper to ensure formating of nested multiErrors works.
func (me *multiError) nestedError(level int) string {
indent := strings.Repeat(" ", level)
buf := bytes.NewBufferString(fmt.Sprintf("%s:\n", me.header))
if level > 8 {
return "circular or too deep error nesting (max 8)?!"
}
for i, err := range me.errs {
switch v := err.(type) {
case *multiError:
fmt.Fprintf(buf, "%s- %v", indent, v.nestedError(level+1))
default:
fmt.Fprintf(buf, "%s- %v", indent, err)
}
if i < len(me.errs)-1 {
fmt.Fprintf(buf, "\n")
}
}
return buf.String()
}
var (
importingFnRegexp = regexp.MustCompile("^([0-9]+)_importing$")
importingFnGlob = "[0-9]*_importing"
importingFnFmt = "%d_importing"
importingForIDFmt = "%d_*.zip"
)
// importInProgressFor return true if the given snapshot id has an import
// that is in progress.
func importInProgressFor(setID uint64) bool {
return newImportTransaction(setID).InProgress()
}
// importTransaction keeps track of the given snapshot ID import and
// ensures it can be committed/cancelled in an atomic way.
//
// Start() must be called before the first data is imported. When the
// import is successful Commit() should be called.
//
// Cancel() will cancel the given import and cleanup. It's always safe
// to defer a Cancel() it will just return a "ErrCannotCancel" after
// a commit.
type importTransaction struct {
id uint64
lockPath string
committed bool
}
// newImportTransaction creates a new importTransaction for the given
// snapshot id.
func newImportTransaction(setID uint64) *importTransaction {
return &importTransaction{
id: setID,
lockPath: filepath.Join(dirs.SnapshotsDir, fmt.Sprintf(importingFnFmt, setID)),
}
}
// newImportTransactionFromImportFile creates a new importTransaction
// for the given import file path. It may return an error if an
// invalid file was specified.
func newImportTransactionFromImportFile(p string) (*importTransaction, error) {
parts := importingFnRegexp.FindStringSubmatch(path.Base(p))
if len(parts) != 2 {
return nil, fmt.Errorf("cannot determine snapshot id from %q", p)
}
setID, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return nil, err
}
return newImportTransaction(setID), nil
}
// Start marks the start of a snapshot import
func (t *importTransaction) Start() error {
return t.lock()
}
// InProgress returns true if there is an import for this transactions
// snapshot ID already.
func (t *importTransaction) InProgress() bool {
return osutil.FileExists(t.lockPath)
}
// Cancel cancels a snapshot import and cleanups any files on disk belonging
// to this snapshot ID.
func (t *importTransaction) Cancel() error {
if t.committed {
return ErrCannotCancel
}
inProgressImports, err := filepath.Glob(filepath.Join(dirs.SnapshotsDir, fmt.Sprintf(importingForIDFmt, t.id)))
if err != nil {
return err
}
var errs []error
for _, p := range inProgressImports {
if err := os.Remove(p); err != nil {
errs = append(errs, err)
}
}
if err := t.unlock(); err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
return newMultiError(fmt.Sprintf("cannot cancel import for set id %d", t.id), errs)
}
return nil
}
// Commit will commit a given transaction
func (t *importTransaction) Commit() error {
if err := t.unlock(); err != nil {
return err
}
t.committed = true
return nil
}
func (t *importTransaction) lock() error {
return ioutil.WriteFile(t.lockPath, nil, 0644)
}
func (t *importTransaction) unlock() error {
return os.Remove(t.lockPath)
}
var filepathGlob = filepath.Glob
// CleanupAbandondedImports will clean any import that is in progress.
// This is meant to be called at startup of snapd before any real imports
// happen. It is not safe to run this concurrently with any other snapshot
// operation.
//
// The amount of snapshots cleaned is returned and an error if one or
// more cleanups did not succeed.
func CleanupAbandondedImports() (cleaned int, err error) {
inProgressSnapshots, err := filepathGlob(filepath.Join(dirs.SnapshotsDir, importingFnGlob))
if err != nil {
return 0, err
}
var errs []error
for _, p := range inProgressSnapshots {
tr, err := newImportTransactionFromImportFile(p)
if err != nil {
errs = append(errs, err)
continue
}
if err := tr.Cancel(); err != nil {
errs = append(errs, err)
} else {
cleaned++
}
}
if len(errs) > 0 {
return cleaned, newMultiError("cannot cleanup imports", errs)
}
return cleaned, nil
}
// ImportFlags carries extra flags to drive import behavior.
type ImportFlags struct {
// noDuplicatedImportCheck tells import not to check for existing snapshot
// with same content hash (and not report DuplicatedSnapshotImportError).
NoDuplicatedImportCheck bool
}
// Import a snapshot from the export file format
func Import(ctx context.Context, id uint64, r io.Reader, flags *ImportFlags) (snapNames []string, err error) {
if err := os.MkdirAll(dirs.SnapshotsDir, 0700); err != nil {
return nil, err
}
errPrefix := fmt.Sprintf("cannot import snapshot %d", id)
tr := newImportTransaction(id)
if tr.InProgress() {
return nil, fmt.Errorf("%s: already in progress for this set id", errPrefix)
}
if err := tr.Start(); err != nil {
return nil, err
}
// Cancel once Committed is a NOP
defer tr.Cancel()
// Unpack and validate the streamed data
//
// XXX: this will leak snapshot IDs, i.e. we allocate a new
// snapshot ID before but then we error here because of e.g.
// duplicated import attempts
snapNames, err = unpackVerifySnapshotImport(ctx, r, id, flags)
if err != nil {
if _, ok := err.(DuplicatedSnapshotImportError); ok {
return nil, err
}
return nil, fmt.Errorf("%s: %v", errPrefix, err)
}
if err := tr.Commit(); err != nil {
return nil, err
}
return snapNames, nil
}
func writeOneSnapshotFile(targetPath string, tr io.Reader) error {
t, err := os.OpenFile(targetPath, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return fmt.Errorf("cannot create snapshot file %q: %v", targetPath, err)
}
defer t.Close()
if _, err := io.Copy(t, tr); err != nil {
return fmt.Errorf("cannot write snapshot file %q: %v", targetPath, err)
}
return nil
}
type DuplicatedSnapshotImportError struct {
SetID uint64
SnapNames []string
}
func (e DuplicatedSnapshotImportError) Error() string {
return fmt.Sprintf("cannot import snapshot, already available as snapshot id %v", e.SetID)
}
func checkDuplicatedSnapshotSetWithContentHash(ctx context.Context, contentHash []byte) error {
snapshotSetMap := map[uint64]client.SnapshotSet{}
// XXX: deal with import in progress here
// get all current snapshotSets
err := Iter(ctx, func(reader *Reader) error {
ss := snapshotSetMap[reader.SetID]
ss.Snapshots = append(ss.Snapshots, &reader.Snapshot)
snapshotSetMap[reader.SetID] = ss
return nil
})
if err != nil {
return fmt.Errorf("cannot calculate snapshot set hashes: %v", err)
}
for setID, ss := range snapshotSetMap {
h, err := ss.ContentHash()
if err != nil {
return fmt.Errorf("cannot calculate content hash for %v: %v", setID, err)
}
if bytes.Equal(h, contentHash) {
var snapNames []string
for _, snapshot := range ss.Snapshots {
snapNames = append(snapNames, snapshot.Snap)
}
return DuplicatedSnapshotImportError{SetID: setID, SnapNames: snapNames}
}
}
return nil
}
func unpackVerifySnapshotImport(ctx context.Context, r io.Reader, realSetID uint64, flags *ImportFlags) (snapNames []string, err error) {
var exportFound bool
tr := tar.NewReader(r)
var tarErr error
var header *tar.Header
if flags == nil {
flags = &ImportFlags{}
}
for tarErr == nil {
header, tarErr = tr.Next()
if tarErr == io.EOF {
break
}
switch {
case tarErr != nil:
return nil, fmt.Errorf("cannot read snapshot import: %v", tarErr)
case header == nil:
// should not happen
return nil, fmt.Errorf("tar header not found")
case header.Typeflag == tar.TypeDir:
return nil, errors.New("unexpected directory in import file")
}
if header.Name == "content.json" {
var ej contentJSON
dec := json.NewDecoder(tr)
if err := dec.Decode(&ej); err != nil {
return nil, err
}
if !flags.NoDuplicatedImportCheck {
// XXX: this is potentially slow as it needs
// to open all snapshots files and read a
// small amount of data from them
if err := checkDuplicatedSnapshotSetWithContentHash(ctx, ej.ContentHash); err != nil {
return nil, err
}
}
continue
}
if header.Name == "export.json" {
// XXX: read into memory and validate once we
// hashes in export.json
exportFound = true
continue
}
// Format of the snapshot import is:
// $setID_.....
// But because the setID is local this will not be correct
// for our system and we need to discard this setID.
//
// So chop off the incorrect (old) setID and just use
// the rest that is still valid.
l := strings.SplitN(header.Name, "_", 2)
if len(l) != 2 {
return nil, fmt.Errorf("unexpected filename in import stream: %v", header.Name)
}
targetPath := path.Join(dirs.SnapshotsDir, fmt.Sprintf("%d_%s", realSetID, l[1]))
if err := writeOneSnapshotFile(targetPath, tr); err != nil {
return snapNames, err
}
r, err := backendOpen(targetPath, realSetID)
if err != nil {
return snapNames, fmt.Errorf("cannot open snapshot: %v", err)
}
err = r.Check(context.TODO(), nil)
r.Close()
snapNames = append(snapNames, r.Snap)
if err != nil {
return snapNames, fmt.Errorf("validation failed for %q: %v", targetPath, err)
}
}
if !exportFound {
return nil, fmt.Errorf("no export.json file in uploaded data")
}
// XXX: validate using the unmarshalled export.json hashes here
return snapNames, nil
}
type exportMetadata struct {
Format int `json:"format"`
Date time.Time `json:"date"`
Files []string `json:"files"`
}
type SnapshotExport struct {
// open snapshot files
snapshotFiles []*os.File
// contentHash of the full snapshot
contentHash []byte
// remember setID mostly for nicer errors
setID uint64
// cached size, needs to be calculated with CalculateSize
size int64
}
// NewSnapshotExport will return a SnapshotExport structure. It must be
// Close()ed after use to avoid leaking file descriptors.
func NewSnapshotExport(ctx context.Context, setID uint64) (se *SnapshotExport, err error) {
var snapshotFiles []*os.File
var snapshotSet client.SnapshotSet
defer func() {
// cleanup any open FDs if anything goes wrong
if err != nil {
for _, f := range snapshotFiles {
f.Close()
}
}
}()
// Open all files first and keep the file descriptors
// open. The caller should have locked the state so that no
// delete/change snapshot operations can happen while the
// files are getting opened.
err = Iter(ctx, func(reader *Reader) error {
if reader.SetID == setID {
snapshotSet.Snapshots = append(snapshotSet.Snapshots, &reader.Snapshot)
// Duplicate the file descriptor of the reader
// we were handed as Iter() closes those as
// soon as this unnamed returns. We re-package
// the file descriptor into snapshotFiles
// below.
fd, err := syscall.Dup(int(reader.Fd()))
if err != nil {
return fmt.Errorf("cannot duplicate descriptor: %v", err)
}
f := os.NewFile(uintptr(fd), reader.Name())
if f == nil {
return fmt.Errorf("cannot open file from descriptor %d", fd)
}
snapshotFiles = append(snapshotFiles, f)
}
return nil
})
if err != nil {
return nil, fmt.Errorf("cannot export snapshot %v: %v", setID, err)
}
if len(snapshotFiles) == 0 {
return nil, fmt.Errorf("no snapshot data found for %v", setID)
}
h, err := snapshotSet.ContentHash()
if err != nil {
return nil, fmt.Errorf("cannot calculate content hash for snapshot export %v: %v", setID, err)
}
se = &SnapshotExport{snapshotFiles: snapshotFiles, setID: setID, contentHash: h}
// ensure we never leak FDs even if the user does not call close
runtime.SetFinalizer(se, (*SnapshotExport).Close)
return se, nil
}
// Init will calculate the snapshot size. This can take some time
// so it should be called without any locks. The SnapshotExport
// keeps the FDs open so even files moved/deleted will be found.
func (se *SnapshotExport) Init() error {
// Export once into a dummy writer so that we can set the size
// of the export. This is then used to set the Content-Length
// in the response correctly.
//
// Note that the size of the generated tar could change if the
// time switches between this export and the export we stream
// to the client to a time after the year 2242. This is unlikely
// but a known issue with this approach here.
var sz osutil.Sizer
if err := se.StreamTo(&sz); err != nil {
return fmt.Errorf("cannot calculcate the size for %v: %s", se.setID, err)
}
se.size = sz.Size()
return nil
}
func (se *SnapshotExport) Size() int64 {
return se.size
}
func (se *SnapshotExport) Close() {
for _, f := range se.snapshotFiles {
f.Close()
}
se.snapshotFiles = nil
}
type contentJSON struct {
ContentHash []byte `json:"content-hash"`
}
func (se *SnapshotExport) StreamTo(w io.Writer) error {
// write out a tar
var files []string
tw := tar.NewWriter(w)
defer tw.Close()
// export contentHash as content.json
h, err := json.Marshal(contentJSON{se.contentHash})
if err != nil {
return err
}
hdr := &tar.Header{
Typeflag: tar.TypeReg,
Name: "content.json",
Size: int64(len(h)),
Mode: 0640,
ModTime: timeNow(),
}
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := tw.Write(h); err != nil {
return err
}
// write out the individual snapshots
for _, snapshotFile := range se.snapshotFiles {
stat, err := snapshotFile.Stat()
if err != nil {
return err
}
if !stat.Mode().IsRegular() {
// should never happen
return fmt.Errorf("unexported special file %q in snapshot: %s", stat.Name(), stat.Mode())
}
if _, err := snapshotFile.Seek(0, 0); err != nil {
return fmt.Errorf("cannot seek on %v: %v", stat.Name(), err)
}
hdr, err := tar.FileInfoHeader(stat, "")
if err != nil {
return fmt.Errorf("symlink: %v", stat.Name())
}
if err = tw.WriteHeader(hdr); err != nil {