Skip to content

Commit 76f242d

Browse files
committed
Deduplicate uploads of packages and indices
Fixes zombiezen#1
1 parent a81c421 commit 76f242d

File tree

3 files changed

+221
-9
lines changed

3 files changed

+221
-9
lines changed

aptblob.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"os"
2324
"path/filepath"
@@ -155,7 +156,10 @@ func appendToIndex(ctx context.Context, bucket *blob.Bucket, dist distribution,
155156
}
156157

157158
// Append packages to index.
158-
packages = append(packages, newParagraphs...)
159+
packages, err = dedupePackages(append(packages, newParagraphs...))
160+
if err != nil {
161+
return err
162+
}
159163
indexHashes, gzipIndexHashes, err := uploadIndex(ctx, bucket, key, packages)
160164
if err != nil {
161165
return err
@@ -233,6 +237,32 @@ func downloadIndex(ctx context.Context, bucket *blob.Bucket, key string, fields
233237
return paragraphs, nil
234238
}
235239

240+
func dedupePackages(packages []deb.Paragraph) ([]deb.Paragraph, error) {
241+
type packageVersion struct {
242+
name string
243+
version string
244+
}
245+
index := make(map[packageVersion]int)
246+
n := 0
247+
for _, pkg := range packages {
248+
v := packageVersion{
249+
name: pkg.Get("Package"),
250+
version: pkg.Get("Version"),
251+
}
252+
if v.name == "" || v.version == "" {
253+
return nil, errors.New("package found without Package or Version")
254+
}
255+
i, seen := index[v]
256+
if !seen {
257+
i = n
258+
n++
259+
}
260+
packages[i] = pkg
261+
index[v] = i
262+
}
263+
return packages[:n], nil
264+
}
265+
236266
func updateSignature(para *deb.Paragraph, key string, newSigs ...deb.IndexSignature) error {
237267
if len(newSigs) == 0 {
238268
return nil

aptblob_test.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2020 Ross Light
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// SPDX-License-Identifier: Apache-2.0
16+
17+
package main
18+
19+
import (
20+
"testing"
21+
22+
"github.com/google/go-cmp/cmp"
23+
"zombiezen.com/go/aptblob/internal/deb"
24+
)
25+
26+
func TestDedupePackages(t *testing.T) {
27+
tests := []struct {
28+
name string
29+
packages []deb.Paragraph
30+
want []deb.Paragraph
31+
wantError bool
32+
}{
33+
{
34+
name: "Empty",
35+
want: nil,
36+
},
37+
{
38+
name: "SinglePackage",
39+
packages: []deb.Paragraph{
40+
{
41+
{Name: "Package", Value: "libc6"},
42+
{Name: "Version", Value: "6.1"},
43+
},
44+
},
45+
want: []deb.Paragraph{
46+
{
47+
{Name: "Package", Value: "libc6"},
48+
{Name: "Version", Value: "6.1"},
49+
},
50+
},
51+
},
52+
{
53+
name: "UnrelatedPackages",
54+
packages: []deb.Paragraph{
55+
{
56+
{Name: "Package", Value: "libc6"},
57+
{Name: "Version", Value: "6.1"},
58+
},
59+
{
60+
{Name: "Package", Value: "git"},
61+
{Name: "Version", Value: "2.20"},
62+
},
63+
},
64+
want: []deb.Paragraph{
65+
{
66+
{Name: "Package", Value: "libc6"},
67+
{Name: "Version", Value: "6.1"},
68+
},
69+
{
70+
{Name: "Package", Value: "git"},
71+
{Name: "Version", Value: "2.20"},
72+
},
73+
},
74+
},
75+
{
76+
name: "DifferentVersionsOfSamePackage",
77+
packages: []deb.Paragraph{
78+
{
79+
{Name: "Package", Value: "libc6"},
80+
{Name: "Version", Value: "6.1"},
81+
{Name: "Foo", Value: "bar"},
82+
},
83+
{
84+
{Name: "Package", Value: "libc6"},
85+
{Name: "Version", Value: "6.2"},
86+
{Name: "Baz", Value: "quux"},
87+
},
88+
},
89+
want: []deb.Paragraph{
90+
{
91+
{Name: "Package", Value: "libc6"},
92+
{Name: "Version", Value: "6.1"},
93+
{Name: "Foo", Value: "bar"},
94+
},
95+
{
96+
{Name: "Package", Value: "libc6"},
97+
{Name: "Version", Value: "6.2"},
98+
{Name: "Baz", Value: "quux"},
99+
},
100+
},
101+
},
102+
{
103+
name: "SamePackageAndVersion",
104+
packages: []deb.Paragraph{
105+
{
106+
{Name: "Package", Value: "libc6"},
107+
{Name: "Version", Value: "6.1"},
108+
{Name: "Foo", Value: "bar"},
109+
},
110+
{
111+
{Name: "Package", Value: "libc6"},
112+
{Name: "Version", Value: "6.1"},
113+
{Name: "Baz", Value: "quux"},
114+
},
115+
},
116+
want: []deb.Paragraph{
117+
{
118+
{Name: "Package", Value: "libc6"},
119+
{Name: "Version", Value: "6.1"},
120+
{Name: "Baz", Value: "quux"},
121+
},
122+
},
123+
},
124+
}
125+
for _, test := range tests {
126+
t.Run(test.name, func(t *testing.T) {
127+
got, err := dedupePackages(test.packages)
128+
if err != nil {
129+
t.Log("dedupePackages:", err)
130+
if !test.wantError {
131+
t.Fail()
132+
}
133+
return
134+
}
135+
if test.wantError {
136+
t.Fatalf("packages = %v; want error", got)
137+
}
138+
if diff := cmp.Diff(test.want, got); diff != "" {
139+
t.Errorf("packages (-want +got):\n%s", diff)
140+
}
141+
})
142+
}
143+
}

upload.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"strings"
3737

3838
"gocloud.dev/blob"
39+
"gocloud.dev/gcerrors"
3940
"golang.org/x/crypto/openpgp/clearsign"
4041
"zombiezen.com/go/aptblob/internal/deb"
4142
)
@@ -135,7 +136,9 @@ func uploadIndex(ctx context.Context, bucket *blob.Bucket, key string, packages
135136
if err := deb.Save(buf, packages); err != nil {
136137
return indexHashes{}, indexHashes{}, err
137138
}
138-
uncompressed, err = upload(ctx, bucket, key, "text/plain; charset=utf-8", "", bytes.NewReader(buf.Bytes()))
139+
uncompressed, err = upload(ctx, bucket, key, bytes.NewReader(buf.Bytes()), uploadOptions{
140+
contentType: "text/plain; charset=utf-8",
141+
})
139142
if err != nil {
140143
return indexHashes{}, indexHashes{}, err
141144
}
@@ -147,7 +150,9 @@ func uploadIndex(ctx context.Context, bucket *blob.Bucket, key string, packages
147150
if err := zw.Close(); err != nil {
148151
return indexHashes{}, indexHashes{}, fmt.Errorf("compress %s: %w", key, err)
149152
}
150-
gzipped, err = upload(ctx, bucket, key+gzipExtension, "application/gzip", "", bytes.NewReader(gzipBuf.Bytes()))
153+
gzipped, err = upload(ctx, bucket, key+gzipExtension, bytes.NewReader(gzipBuf.Bytes()), uploadOptions{
154+
contentType: "application/gzip",
155+
})
151156
if err != nil {
152157
return indexHashes{}, indexHashes{}, err
153158
}
@@ -179,7 +184,10 @@ func uploadBinaryPackage(ctx context.Context, bucket *blob.Bucket, debPath strin
179184
if arch == "" {
180185
return nil, fmt.Errorf("upload binary package %s: missing Architecture field", debName)
181186
}
182-
packageHashes, err := upload(ctx, bucket, poolPath(debName), "application/vnd.debian.binary-package", "immutable", debFile)
187+
packageHashes, err := upload(ctx, bucket, poolPath(debName), debFile, uploadOptions{
188+
contentType: "application/vnd.debian.binary-package",
189+
cacheControl: immutable,
190+
})
183191
if err != nil {
184192
return nil, fmt.Errorf("upload binary package %s: %w", debName, err)
185193
}
@@ -213,7 +221,10 @@ func uploadSourcePackage(ctx context.Context, bucket *blob.Bucket, dscPath strin
213221
return nil, fmt.Errorf("upload source package %s: files: %w", packageName, err)
214222
}
215223

216-
_, err = upload(ctx, bucket, dir+"/"+filepath.Base(dscPath), "text/plain; charset=utf-8", "immutable", bytes.NewReader(dsc))
224+
_, err = upload(ctx, bucket, dir+"/"+filepath.Base(dscPath), bytes.NewReader(dsc), uploadOptions{
225+
contentType: "text/plain; charset=utf-8",
226+
cacheControl: immutable,
227+
})
217228
if err != nil {
218229
return nil, fmt.Errorf("upload source package %s: %s: %w", packageName, filepath.Base(dscPath), err)
219230
}
@@ -227,7 +238,10 @@ func uploadSourcePackage(ctx context.Context, bucket *blob.Bucket, dscPath strin
227238
if err != nil {
228239
return nil, fmt.Errorf("upload source package %s: %s: %w", packageName, fname, err)
229240
}
230-
_, uploadErr := upload(ctx, bucket, dir+"/"+fname, contentType, "immutable", content)
241+
_, uploadErr := upload(ctx, bucket, dir+"/"+fname, content, uploadOptions{
242+
contentType: contentType,
243+
cacheControl: immutable,
244+
})
231245
content.Close()
232246
if uploadErr != nil {
233247
return nil, fmt.Errorf("upload source package %s: %s: %w", packageName, fname, err)
@@ -276,7 +290,15 @@ func poolPath(name string) string {
276290
return "pool/" + name
277291
}
278292

279-
func upload(ctx context.Context, bucket *blob.Bucket, key string, contentType, cacheControl string, content io.ReadSeeker) (indexHashes, error) {
293+
// immutable is the Cache-Control header that indicates that the content is immutable.
294+
const immutable = "immutable"
295+
296+
type uploadOptions struct {
297+
contentType string
298+
cacheControl string
299+
}
300+
301+
func upload(ctx context.Context, bucket *blob.Bucket, key string, content io.ReadSeeker, opts uploadOptions) (indexHashes, error) {
280302
if _, err := content.Seek(0, io.SeekStart); err != nil {
281303
return indexHashes{}, fmt.Errorf("upload %s: %w", key, err)
282304
}
@@ -296,10 +318,27 @@ func upload(ctx context.Context, bucket *blob.Bucket, key string, contentType, c
296318
md5Hash.Sum(h.md5[:0])
297319
sha1Hash.Sum(h.sha1[:0])
298320
sha256Hash.Sum(h.sha256[:0])
321+
if opts.cacheControl == immutable {
322+
attr, err := bucket.Attributes(ctx, key)
323+
if err == nil {
324+
// Immutable objects don't have to be uploaded if they already exist,
325+
// but they must match the existing object.
326+
if attr.Size != h.size || !bytes.Equal(h.md5[:], attr.MD5) {
327+
return indexHashes{}, fmt.Errorf("upload %s: immutable object differs", key)
328+
}
329+
return h, nil
330+
} else if gcerrors.Code(err) != gcerrors.NotFound {
331+
return indexHashes{}, fmt.Errorf("upload %s: %w", key, err)
332+
}
333+
}
334+
if opts.cacheControl == "" {
335+
// Default to 5 minute cache.
336+
opts.cacheControl = "max-age=300"
337+
}
299338
w, err := bucket.NewWriter(ctx, key, &blob.WriterOptions{
300-
ContentType: contentType,
339+
ContentType: opts.contentType,
301340
ContentMD5: h.md5[:],
302-
CacheControl: cacheControl,
341+
CacheControl: opts.cacheControl,
303342
})
304343
if err != nil {
305344
return indexHashes{}, fmt.Errorf("upload %s: %w", key, err)

0 commit comments

Comments
 (0)