-
Notifications
You must be signed in to change notification settings - Fork 3
/
soci.go
212 lines (184 loc) · 5.57 KB
/
soci.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package explore
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
"github.com/google/go-containerregistry/pkg/logs"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/google/go-containerregistry/pkg/v1/types"
httpserve "github.com/jonjohnsonjr/dagdotdev/internal/forks/http"
"github.com/jonjohnsonjr/dagdotdev/internal/soci"
)
// 5 MB.
const threshold = (1 << 20) * 5
const spanSize = 1 << 22
func indexKey(prefix string, idx int) string {
return fmt.Sprintf("%s.%d", prefix, idx)
}
// Attempt to create a new index. If we fail, both readclosers will be nil.
// TODO: Dedupe with createIndex.
func (h *handler) tryNewIndex(w http.ResponseWriter, r *http.Request, dig name.Digest, ref string, blob *sizeBlob) (string, io.ReadCloser, io.ReadCloser, error) {
key := indexKey(dig.Identifier(), 0)
// TODO: Plumb this down into NewIndexer so we don't create it until we need to.
cw, err := h.indexCache.Writer(r.Context(), key)
if err != nil {
return "", nil, nil, fmt.Errorf("indexCache.Writer: %w", err)
}
defer cw.Close()
mt := r.URL.Query().Get("mt")
indexer, kind, pr, tpr, err := soci.NewIndexer(blob, cw, spanSize, mt)
if indexer == nil {
return kind, pr, tpr, err
}
// Render FS the old way while generating the index.
fs := h.newLayerFS(indexer, blob.size, ref, dig.String(), indexer.Type(), types.MediaType(mt))
httpserve.FileServer(fs).ServeHTTP(w, r)
for {
// Make sure we hit the end.
_, err := indexer.Next()
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return "", nil, nil, fmt.Errorf("indexer.Next: %w", err)
}
}
toc, err := indexer.TOC()
if err != nil {
return kind, nil, nil, err
}
if h.tocCache != nil {
if err := h.tocCache.Put(r.Context(), key, toc); err != nil {
logs.Debug.Printf("cache.Put(%q) = %v", key, err)
}
}
logs.Debug.Printf("index size: %d", indexer.Size())
return kind, nil, nil, nil
}
// Returns nil index if it's incomplete.
func (h *handler) getIndex(ctx context.Context, prefix string) (soci.Index, error) {
if h.indexCache == nil {
return nil, nil
}
index, err := h.getIndexN(ctx, prefix, 0)
if errors.Is(err, io.EOF) {
return nil, nil
}
if err != nil {
return nil, err
}
// TODO: Remove the need for this.
if index.TOC() == nil {
return nil, nil
}
return index, nil
}
func (h *handler) getIndexN(ctx context.Context, prefix string, idx int) (index soci.Index, err error) {
key := indexKey(prefix, idx)
bs := &cacheSeeker{h.indexCache, key}
var (
toc *soci.TOC
size int64
)
// Avoid calling cache.Size if we can.
if h.tocCache != nil {
toc, err = h.tocCache.Get(ctx, key)
if err != nil {
logs.Debug.Printf("cache.Get(%q) = %v", key, err)
defer func() {
if err == nil {
if err := h.tocCache.Put(ctx, key, index.TOC()); err != nil {
logs.Debug.Printf("cache.Put(%q) = %v", key, err)
}
}
}()
} else {
size = toc.Size
logs.Debug.Printf("cache.Get(%q) = hit", key)
}
}
// Handle in-memory index under a certain size.
if size == 0 {
size, err = h.indexCache.Size(ctx, key)
if err != nil {
return nil, fmt.Errorf("indexCache.Size: %w", err)
}
}
if size <= threshold {
return soci.NewIndex(bs, toc, nil)
}
// Index is too big to hold in memory, fetch or create an index of the index.
sub, err := h.getIndexN(ctx, prefix, idx+1)
if err != nil {
logs.Debug.Printf("getIndexN(%q, %d) = %v", prefix, idx+1, err)
rc, err := h.indexCache.Reader(ctx, key)
if err != nil {
return nil, fmt.Errorf("indexCache.Reader: %w", err)
}
sub, err = h.createIndex(ctx, rc, size, prefix, idx+1, "application/tar+gzip")
if err != nil {
return nil, fmt.Errorf("createIndex(%q, %d): %w", prefix, idx+1, err)
}
if sub == nil {
return nil, fmt.Errorf("createIndex returned nil, not a tar.gz file")
}
}
return soci.NewIndex(bs, toc, sub)
}
func (h *handler) createIndex(ctx context.Context, rc io.ReadCloser, size int64, prefix string, idx int, mediaType string) (soci.Index, error) {
key := indexKey(prefix, idx)
cw, err := h.indexCache.Writer(ctx, key)
if err != nil {
return nil, fmt.Errorf("indexCache.Writer: %w", err)
}
defer cw.Close()
// TODO: Better?
indexer, _, _, _, err := soci.NewIndexer(rc, cw, spanSize, mediaType)
if err != nil {
return nil, fmt.Errorf("TODO: don't return this error: %w", err)
}
if indexer == nil {
return nil, nil
}
for {
// Make sure we hit the end.
_, err := indexer.Next()
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return nil, fmt.Errorf("indexer.Next: %w", err)
}
}
toc, err := indexer.TOC()
if err != nil {
return nil, fmt.Errorf("TOC: %w", err)
}
if h.tocCache != nil {
if err := h.tocCache.Put(ctx, key, toc); err != nil {
logs.Debug.Printf("cache.Put(%q) = %v", key, err)
}
}
logs.Debug.Printf("index size: %d", indexer.Size())
if err := cw.Close(); err != nil {
return nil, fmt.Errorf("cw.Close: %w", err)
}
return h.getIndexN(ctx, prefix, idx)
}
func (h *handler) createFs(w http.ResponseWriter, r *http.Request, ref string, dig name.Digest, index soci.Index, size int64, mt types.MediaType, urls []string, opts []remote.Option) (*soci.SociFS, error) {
if opts == nil {
opts = h.remoteOptions(w, r, dig.Context().Name())
}
opts = append(opts, remote.WithSize(size))
cachedStr := ""
if len(urls) > 0 {
cachedStr = urls[0]
}
blob := remote.LazyBlob(dig, cachedStr, nil, opts...)
// We never saw a non-nil Body, we can do the range.
prefix := strings.TrimPrefix(ref, "/")
fs := soci.FS(index, blob, prefix, dig.String(), respTooBig, mt, renderHeader)
return fs, nil
}