-
Notifications
You must be signed in to change notification settings - Fork 495
/
binarystorage.go
199 lines (178 loc) · 5.46 KB
/
binarystorage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Copyright 2014 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package binarystorage
import (
"context"
"fmt"
"io"
"github.com/juju/errors"
"github.com/juju/mgo/v3"
"github.com/juju/mgo/v3/bson"
"github.com/juju/mgo/v3/txn"
jujutxn "github.com/juju/txn/v3"
internallogger "github.com/juju/juju/internal/logger"
"github.com/juju/juju/internal/mongo"
)
var logger = internallogger.GetLogger("juju.state.binarystorage")
// ManagedStorage instances persist data for a bucket, for a user, or globally.
// (Only bucket storage is currently implemented).
type ManagedStorage interface {
// Get returns a reader for data at path, namespaced to the bucket.
// If the data is still being uploaded and is not fully written yet,
// an ErrUploadPending error is returned. This means the path is valid but
// the caller should try again to retrieve the data.
Get(ctx context.Context, path string) (r io.ReadCloser, length int64, err error)
// Put stores data from reader at path, namespaced to the bucket.
Put(ctx context.Context, path string, r io.Reader, length int64) error
// Remove deletes data at path, namespaced to the bucket.
Remove(ctx context.Context, path string) error
}
type binaryStorage struct {
managedStorage ManagedStorage
metadataCollection mongo.Collection
txnRunner jujutxn.Runner
}
var _ Storage = (*binaryStorage)(nil)
// New constructs a new Storage that stores binary files in the provided
// ManagedStorage, and metadata in the provided collection using the provided
// transaction runner.
func New(
managedStorage ManagedStorage,
metadataCollection mongo.Collection,
runner jujutxn.Runner,
) Storage {
return &binaryStorage{
managedStorage: managedStorage,
metadataCollection: metadataCollection,
txnRunner: runner,
}
}
// Add implements Storage.Add.
func (s *binaryStorage) Add(ctx context.Context, r io.Reader, metadata Metadata) (resultErr error) {
// Add the binary file to storage.
path := fmt.Sprintf("tools/%s-%s", metadata.Version, metadata.SHA256)
if err := s.managedStorage.Put(context.TODO(), path, r, metadata.Size); err != nil {
return errors.Annotate(err, "cannot store binary file")
}
defer func() {
if resultErr == nil {
return
}
err := s.managedStorage.Remove(context.TODO(), path)
if err != nil {
logger.Errorf("failed to remove binary blob: %v", err)
}
}()
newDoc := metadataDoc{
Id: metadata.Version,
Version: metadata.Version,
Size: metadata.Size,
SHA256: metadata.SHA256,
Path: path,
}
// Add or replace metadata. If replacing, record the existing path so we
// can remove it later.
var oldPath string
buildTxn := func(attempt int) ([]txn.Op, error) {
op := txn.Op{
C: s.metadataCollection.Name(),
Id: newDoc.Id,
}
// On the first attempt we assume we're adding new binary files.
// Subsequent attempts to add files will fetch the existing
// doc, record the old path, and attempt to update the
// size, path and hash fields.
if attempt == 0 {
op.Assert = txn.DocMissing
op.Insert = &newDoc
} else {
oldDoc, err := s.findMetadata(metadata.Version)
if err != nil {
return nil, err
}
oldPath = oldDoc.Path
op.Assert = bson.D{{Name: "path", Value: oldPath}}
if oldPath != path {
op.Update = bson.D{{
Name: "$set", Value: bson.D{
{Name: "size", Value: metadata.Size},
{Name: "sha256", Value: metadata.SHA256},
{Name: "path", Value: path},
},
}}
}
}
return []txn.Op{op}, nil
}
err := s.txnRunner.Run(buildTxn)
if err != nil {
return errors.Annotate(err, "cannot store binary metadata")
}
if oldPath != "" && oldPath != path {
// Attempt to remove the old path. Failure is non-fatal.
err := s.managedStorage.Remove(ctx, oldPath)
if err != nil {
logger.Errorf("failed to remove old binary blob: %v", err)
} else {
logger.Debugf("removed old binary blob")
}
}
return nil
}
func (s *binaryStorage) Open(ctx context.Context, version string) (Metadata, io.ReadCloser, error) {
metadataDoc, err := s.findMetadata(version)
if err != nil {
return Metadata{}, nil, errors.Trace(err)
}
r, _, err := s.managedStorage.Get(ctx, metadataDoc.Path)
if err != nil {
return Metadata{}, nil, errors.Annotatef(err, "resource at %q", metadataDoc.Path)
}
metadata := Metadata{
Version: metadataDoc.Version,
Size: metadataDoc.Size,
SHA256: metadataDoc.SHA256,
}
return metadata, r, nil
}
func (s *binaryStorage) Metadata(version string) (Metadata, error) {
metadataDoc, err := s.findMetadata(version)
if err != nil {
return Metadata{}, err
}
return Metadata{
Version: metadataDoc.Version,
Size: metadataDoc.Size,
SHA256: metadataDoc.SHA256,
}, nil
}
func (s *binaryStorage) AllMetadata() ([]Metadata, error) {
var docs []metadataDoc
if err := s.metadataCollection.Find(nil).All(&docs); err != nil {
return nil, err
}
list := make([]Metadata, len(docs))
for i, doc := range docs {
list[i] = Metadata{
Version: doc.Version,
Size: doc.Size,
SHA256: doc.SHA256,
}
}
return list, nil
}
type metadataDoc struct {
Id string `bson:"_id"`
Version string `bson:"version"`
Size int64 `bson:"size"`
SHA256 string `bson:"sha256,omitempty"`
Path string `bson:"path"`
}
func (s *binaryStorage) findMetadata(version string) (metadataDoc, error) {
var doc metadataDoc
err := s.metadataCollection.FindId(version).One(&doc)
if err == mgo.ErrNotFound {
return doc, errors.NotFoundf("%v binary metadata", version)
}
return doc, err
}