-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
progress.go
136 lines (116 loc) · 3.06 KB
/
progress.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package pullprogress
import (
"context"
"io"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/moby/buildkit/util/progress"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type PullManager interface {
content.IngestManager
content.Manager
}
type ProviderWithProgress struct {
Provider content.Provider
Manager PullManager
}
func (p *ProviderWithProgress) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
ra, err := p.Provider.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, p.Manager, doneCh)
return readerAtWithCancel{ReaderAt: ra, cancel: cancel, doneCh: doneCh}, nil
}
type readerAtWithCancel struct {
content.ReaderAt
cancel func()
doneCh <-chan struct{}
}
func (ra readerAtWithCancel) Close() error {
ra.cancel()
select {
case <-ra.doneCh:
case <-time.After(time.Second):
logrus.Warn("timeout waiting for pull progress to complete")
}
return ra.ReaderAt.Close()
}
type FetcherWithProgress struct {
Fetcher remotes.Fetcher
Manager PullManager
}
func (f *FetcherWithProgress) Fetch(ctx context.Context, desc ocispecs.Descriptor) (io.ReadCloser, error) {
rc, err := f.Fetcher.Fetch(ctx, desc)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
doneCh := make(chan struct{})
go trackProgress(ctx, desc, f.Manager, doneCh)
return readerWithCancel{ReadCloser: rc, cancel: cancel, doneCh: doneCh}, nil
}
type readerWithCancel struct {
io.ReadCloser
cancel func()
doneCh <-chan struct{}
}
func (r readerWithCancel) Close() error {
r.cancel()
select {
case <-r.doneCh:
case <-time.After(time.Second):
logrus.Warn("timeout waiting for pull progress to complete")
}
return r.ReadCloser.Close()
}
func trackProgress(ctx context.Context, desc ocispecs.Descriptor, manager PullManager, doneCh chan<- struct{}) {
defer close(doneCh)
ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop()
go func() {
<-ctx.Done()
ticker.Stop()
}()
pw, _, _ := progress.NewFromContext(ctx)
defer pw.Close()
ingestRef := remotes.MakeRefKey(ctx, desc)
started := time.Now()
onFinalStatus := false
for !onFinalStatus {
select {
case <-ctx.Done():
onFinalStatus = true
case <-ticker.C:
}
status, err := manager.Status(ctx, ingestRef)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Current: int(status.Offset),
Total: int(status.Total),
Started: &started,
})
continue
} else if !errors.Is(err, errdefs.ErrNotFound) {
logrus.Errorf("unexpected error getting ingest status of %q: %v", ingestRef, err)
return
}
info, err := manager.Info(ctx, desc.Digest)
if err == nil {
pw.Write(desc.Digest.String(), progress.Status{
Current: int(info.Size),
Total: int(info.Size),
Started: &started,
Completed: &info.CreatedAt,
})
return
}
}
}