-
Notifications
You must be signed in to change notification settings - Fork 568
/
sync.go
130 lines (121 loc) · 3.32 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package pfssync
import (
"archive/tar"
"io"
"os"
"path"
"syscall"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/tarutil"
"github.com/pachyderm/pachyderm/v2/src/pfs"
"golang.org/x/sync/errgroup"
)
// Downloader is the standard interface for a PFS downloader.
type Downloader interface {
// Download a PFS file to a location on the local filesystem.
Download(storageRoot string, file *pfs.File, opts ...DownloadOption) error
}
type downloader struct {
pachClient *CacheClient
pipes map[string]struct{}
eg *errgroup.Group
done bool
}
// WithDownloader provides a scoped environment for a Downloader.
func WithDownloader(pachClient *CacheClient, cb func(Downloader) error) (retErr error) {
d := &downloader{
pachClient: pachClient,
pipes: make(map[string]struct{}),
eg: &errgroup.Group{},
}
defer func() {
d.done = true
if err := d.closePipes(); retErr == nil {
retErr = err
}
}()
return cb(d)
}
func (d *downloader) closePipes() (retErr error) {
pipes := make(map[string]io.Closer)
defer func() {
for path, pipe := range pipes {
if err := pipe.Close(); retErr == nil {
retErr = errors.EnsureStack(err)
}
if err := os.Remove(path); retErr == nil {
retErr = errors.EnsureStack(err)
}
}
}()
// Open all the pipes to unblock the goroutines.
for path := range d.pipes {
f, err := os.OpenFile(path, syscall.O_NONBLOCK+os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
return errors.EnsureStack(err)
}
pipes[path] = f
}
return errors.EnsureStack(d.eg.Wait())
}
type downloadConfig struct {
lazy, empty bool
headerCallback func(*tar.Header) error
}
// Download a PFS file to a location on the local filesystem.
func (d *downloader) Download(storageRoot string, file *pfs.File, opts ...DownloadOption) error {
if err := os.MkdirAll(storageRoot, 0700); err != nil {
return errors.EnsureStack(err)
}
dc := &downloadConfig{}
for _, opt := range opts {
opt(dc)
}
if dc.lazy || dc.empty {
return d.downloadInfo(storageRoot, file, dc)
}
r, err := d.pachClient.GetFileTAR(file.Commit, file.Path)
if err != nil {
return err
}
if dc.headerCallback != nil {
return tarutil.Import(storageRoot, r, dc.headerCallback)
}
return tarutil.Import(storageRoot, r)
}
func (d *downloader) downloadInfo(storageRoot string, file *pfs.File, config *downloadConfig) error {
return d.pachClient.WalkFile(file.Commit, file.Path, func(fi *pfs.FileInfo) error {
if fi.FileType == pfs.FileType_DIR {
return nil
}
fullPath := path.Join(storageRoot, fi.File.Path)
if err := os.MkdirAll(path.Dir(fullPath), 0700); err != nil {
return errors.EnsureStack(err)
}
if config.lazy {
return d.makePipe(fullPath, func(w io.Writer) error {
r, err := d.pachClient.GetFileTAR(file.Commit, fi.File.Path)
if err != nil {
return err
}
return tarutil.Iterate(r, func(f tarutil.File) error {
if config.headerCallback != nil {
hdr, err := f.Header()
if err != nil {
return errors.EnsureStack(err)
}
if err := config.headerCallback(hdr); err != nil {
return err
}
}
return errors.EnsureStack(f.Content(w))
}, true)
})
}
f, err := os.Create(fullPath)
if err != nil {
return errors.EnsureStack(err)
}
return errors.EnsureStack(f.Close())
})
}