forked from moby/buildkit
/
export.go
148 lines (127 loc) · 3.04 KB
/
export.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package local
import (
"context"
"io/ioutil"
"os"
"strings"
"time"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/exporter"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)
type Opt struct {
SessionManager *session.Manager
}
type localExporter struct {
opt Opt
// session manager
}
func New(opt Opt) (exporter.Exporter, error) {
le := &localExporter{opt: opt}
return le, nil
}
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
li := &localExporterInstance{localExporter: e, caller: caller}
return li, nil
}
type localExporterInstance struct {
*localExporter
caller session.Caller
}
func (e *localExporterInstance) Name() string {
return "exporting to client"
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
isMap := len(inp.Refs) > 0
export := func(ctx context.Context, k string, ref cache.ImmutableRef) func() error {
return func() error {
var src string
var err error
if ref == nil {
src, err = ioutil.TempDir("", "buildkit")
if err != nil {
return err
}
defer os.RemoveAll(src)
} else {
mount, err := ref.Mount(ctx, true)
if err != nil {
return err
}
lm := snapshot.LocalMounter(mount)
src, err = lm.Mount()
if err != nil {
return err
}
defer lm.Unmount()
}
fs := fsutil.NewFS(src, nil)
lbl := "copying files"
if isMap {
lbl += " " + k
fs = fsutil.SubDirFS(fs, fstypes.Stat{
Mode: uint32(os.ModeDir | 0755),
Path: strings.Replace(k, "/", "_", -1),
})
}
progress := newProgressHandler(ctx, lbl)
if err := filesync.CopyToCaller(ctx, fs, e.caller, progress); err != nil {
return err
}
return nil
}
}
eg, ctx := errgroup.WithContext(ctx)
if isMap {
for k, ref := range inp.Refs {
eg.Go(export(ctx, k, ref))
}
} else {
eg.Go(export(ctx, "", inp.Ref))
}
if err := eg.Wait(); err != nil {
return nil, err
}
return nil, nil
}
func newProgressHandler(ctx context.Context, id string) func(int, bool) {
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
pw, _, _ := progress.FromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
Action: "transferring",
}
pw.Write(id, st)
return func(s int, last bool) {
if last || limiter.Allow() {
st.Current = s
if last {
now := time.Now()
st.Completed = &now
}
pw.Write(id, st)
if last {
pw.Close()
}
}
}
}