forked from buildbarn/bb-storage
/
remote_blob_access.go
104 lines (91 loc) · 2.88 KB
/
remote_blob_access.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package blobstore
import (
"context"
"fmt"
"net/http"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/digest"
// TODO: Migrate this code away from ctxhttp. Use the HTTPClient
// interface that's in this package instead. This allows us to
// add unit testing coverage.
"golang.org/x/net/context/ctxhttp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type remoteBlobAccess struct {
address string
prefix string
storageType StorageType
}
func convertHTTPUnexpectedStatus(resp *http.Response) error {
return status.Errorf(codes.Unknown, "Unexpected status code from remote cache: %d - %s", resp.StatusCode, http.StatusText(resp.StatusCode))
}
// NewRemoteBlobAccess for use of HTTP/1.1 cache backend.
//
// See: https://docs.bazel.build/versions/master/remote-caching.html#http-caching-protocol
func NewRemoteBlobAccess(address string, prefix string, storageType StorageType) BlobAccess {
return &remoteBlobAccess{
address: address,
prefix: prefix,
storageType: storageType,
}
}
func (ba *remoteBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
url := fmt.Sprintf("%s/%s/%s", ba.address, ba.prefix, digest.GetHashString())
resp, err := ctxhttp.Get(ctx, http.DefaultClient, url)
if err != nil {
return buffer.NewBufferFromError(err)
}
switch resp.StatusCode {
case http.StatusNotFound:
resp.Body.Close()
return buffer.NewBufferFromError(status.Error(codes.NotFound, url))
case http.StatusOK:
return ba.storageType.NewBufferFromReader(digest, resp.Body, buffer.Irreparable)
default:
resp.Body.Close()
return buffer.NewBufferFromError(convertHTTPUnexpectedStatus(resp))
}
}
func (ba *remoteBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
sizeBytes, err := b.GetSizeBytes()
if err != nil {
b.Discard()
return err
}
url := fmt.Sprintf("%s/%s/%s", ba.address, ba.prefix, digest.GetHashString())
r := b.ToReader()
req, err := http.NewRequest(http.MethodPut, url, r)
if err != nil {
r.Close()
return err
}
req.ContentLength = sizeBytes
resp, err := ctxhttp.Do(ctx, http.DefaultClient, req)
if err != nil {
return err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return convertHTTPUnexpectedStatus(resp)
}
return nil
}
func (ba *remoteBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
missing := digest.NewSetBuilder()
for _, blobDigest := range digests.Items() {
url := fmt.Sprintf("%s/%s/%s", ba.address, ba.prefix, blobDigest.GetHashString())
resp, err := ctxhttp.Head(ctx, http.DefaultClient, url)
if err != nil {
return digest.EmptySet, err
}
switch resp.StatusCode {
case http.StatusNotFound:
missing.Add(blobDigest)
case http.StatusOK:
continue
default:
return digest.EmptySet, convertHTTPUnexpectedStatus(resp)
}
}
return missing.Build(), nil
}