-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
buffer.go
156 lines (138 loc) · 3.6 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package contentutil
import (
"bytes"
"context"
"io/ioutil"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
// Buffer is a content provider and ingester that keeps data in memory
type Buffer interface {
content.Provider
content.Ingester
}
// NewBuffer returns a new buffer
func NewBuffer() Buffer {
return &buffer{
buffers: map[digest.Digest][]byte{},
refs: map[string]struct{}{},
}
}
type buffer struct {
mu sync.Mutex
buffers map[digest.Digest][]byte
refs map[string]struct{}
}
func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
b.mu.Lock()
if _, ok := b.refs[wOpts.Ref]; ok {
return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %s locked", wOpts.Ref)
}
b.mu.Unlock()
return &bufferedWriter{
main: b,
digester: digest.Canonical.Digester(),
buffer: bytes.NewBuffer(nil),
expected: wOpts.Desc.Digest,
releaseRef: func() {
b.mu.Lock()
delete(b.refs, wOpts.Ref)
b.mu.Unlock()
},
}, nil
}
func (b *buffer) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
r, err := b.getBytesReader(ctx, desc.Digest)
if err != nil {
return nil, err
}
return &readerAt{Reader: r, Closer: ioutil.NopCloser(r), size: int64(r.Len())}, nil
}
func (b *buffer) getBytesReader(ctx context.Context, dgst digest.Digest) (*bytes.Reader, error) {
b.mu.Lock()
defer b.mu.Unlock()
if dt, ok := b.buffers[dgst]; ok {
return bytes.NewReader(dt), nil
}
return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
}
func (b *buffer) addValue(k digest.Digest, dt []byte) {
b.mu.Lock()
defer b.mu.Unlock()
b.buffers[k] = dt
}
type bufferedWriter struct {
main *buffer
ref string
offset int64
total int64
startedAt time.Time
updatedAt time.Time
buffer *bytes.Buffer
expected digest.Digest
digester digest.Digester
releaseRef func()
}
func (w *bufferedWriter) Write(p []byte) (n int, err error) {
n, err = w.buffer.Write(p)
w.digester.Hash().Write(p[:n])
w.offset += int64(len(p))
w.updatedAt = time.Now()
return n, err
}
func (w *bufferedWriter) Close() error {
if w.buffer != nil {
w.releaseRef()
w.buffer = nil
}
return nil
}
func (w *bufferedWriter) Status() (content.Status, error) {
return content.Status{
Ref: w.ref,
Offset: w.offset,
Total: w.total,
StartedAt: w.startedAt,
UpdatedAt: w.updatedAt,
}, nil
}
func (w *bufferedWriter) Digest() digest.Digest {
return w.digester.Digest()
}
func (w *bufferedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opt ...content.Opt) error {
if w.buffer == nil {
return errors.Errorf("can't commit already committed or closed")
}
if s := int64(w.buffer.Len()); size > 0 && size != s {
return errors.Errorf("unexpected commit size %d, expected %d", s, size)
}
dgst := w.digester.Digest()
if expected != "" && expected != dgst {
return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
}
if w.expected != "" && w.expected != dgst {
return errors.Errorf("unexpected digest: %v != %v", dgst, w.expected)
}
w.main.addValue(dgst, w.buffer.Bytes())
return w.Close()
}
func (w *bufferedWriter) Truncate(size int64) error {
if size != 0 {
return errors.New("Truncate: unsupported size")
}
w.offset = 0
w.digester.Hash().Reset()
w.buffer.Reset()
return nil
}