/
swift.go
160 lines (144 loc) · 4.91 KB
/
swift.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2017 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package blobstore // import "gopkg.in/juju/charmstore.v5/internal/blobstore"
import (
"bytes"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/juju/loggo"
"gopkg.in/errgo.v1"
"gopkg.in/goose.v2/client"
"gopkg.in/goose.v2/errors"
"gopkg.in/goose.v2/identity"
"gopkg.in/goose.v2/swift"
)
const maxBufferSize = 10 * 1024 * 1024
type swiftBackend struct {
client *swift.Client
container string
tmpdir string
}
// NewSwiftBackend returns a backend which uses OpenStack's Swift for
// its operations with the given credentials and auth mode. It stores
// all the data objects in the container with the given name.
func NewSwiftBackend(cred *identity.Credentials, authmode identity.AuthMode, container, tmpdir string) Backend {
c := client.NewClient(cred,
authmode,
gooseLogger{},
)
c.SetRequiredServiceTypes([]string{"object-store"})
return &swiftBackend{
client: swift.New(c),
container: container,
}
}
func (s *swiftBackend) Get(name string) (r ReadSeekCloser, size int64, err error) {
// Use infinite read-ahead here as the goose implementation of
// byte range handling seems to differ from swift's.
r2, headers, err := s.client.OpenObject(s.container, name, -1)
if err != nil {
if errors.IsNotFound(err) {
return nil, 0, errgo.WithCausef(nil, ErrNotFound, "")
}
return nil, 0, errgo.Mask(err)
}
lengthstr := headers.Get("Content-Length")
size, err = strconv.ParseInt(lengthstr, 10, 64)
return swiftBackendReader{r2.(ReadSeekCloser)}, size, err
}
func (s *swiftBackend) Put(name string, r io.Reader, size int64, hash string) error {
if rs, ok := r.(io.ReadSeeker); ok {
return errgo.Mask(s.putReadSeeker(name, rs, size, hash), errgo.Is(io.ErrUnexpectedEOF))
}
// Buffer the incoming data here. Goose would buffer it anyway so
// doing it here gives us more control over how it is done.
if size > maxBufferSize {
return errgo.Mask(s.putFileBuffer(name, r, size, hash), errgo.Is(io.ErrUnexpectedEOF))
}
return errgo.Mask(s.putBuffer(name, r, size, hash), errgo.Is(io.ErrUnexpectedEOF))
}
func (s *swiftBackend) putReadSeeker(name string, rs io.ReadSeeker, size int64, hash string) error {
if err := copyAndCheckHash(ioutil.Discard, rs, size, hash); err != nil {
return errgo.Mask(err, errgo.Is(io.ErrUnexpectedEOF))
}
if _, err := rs.Seek(0, seekStart); err != nil {
return errgo.Mask(err)
}
if err := s.client.PutReader(s.container, name, rs, size); err != nil {
// TODO: investigate if PutReader can return err but the object still be
// written. Should there be cleanup here?
return errgo.Mask(err)
}
return nil
}
func (s *swiftBackend) putFileBuffer(name string, r io.Reader, size int64, hash string) error {
fn := filepath.Join(s.tmpdir, name)
f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return errgo.Mask(err)
}
defer f.Close()
defer func() {
if err := os.Remove(fn); err != nil {
logger.Warningf("error removing temporary file: %s", err)
}
}()
if err := copyAndCheckHash(f, r, size, hash); err != nil {
return errgo.Mask(err, errgo.Is(io.ErrUnexpectedEOF))
}
if _, err := f.Seek(0, seekStart); err != nil {
return errgo.Mask(err)
}
if err := s.client.PutReader(s.container, name, f, size); err != nil {
// TODO: investigate if PutReader can return err but the object still be
// written. Should there be cleanup here?
return errgo.Mask(err)
}
return nil
}
func (s *swiftBackend) putBuffer(name string, r io.Reader, size int64, hash string) error {
buf := bytes.NewBuffer(make([]byte, 0, size))
if err := copyAndCheckHash(buf, r, size, hash); err != nil {
return errgo.Mask(err, errgo.Is(io.ErrUnexpectedEOF))
}
if err := s.client.PutReader(s.container, name, bytes.NewReader(buf.Bytes()), size); err != nil {
// TODO: investigate if PutReader can return err but the object still be
// written. Should there be cleanup here?
return errgo.Mask(err)
}
return nil
}
func (s *swiftBackend) Remove(name string) error {
err := s.client.DeleteObject(s.container, name)
if err != nil && errors.IsNotFound(err) {
return errgo.WithCausef(nil, ErrNotFound, "")
}
return errgo.Mask(err)
}
// swiftBackendReader translates not-found errors as
// produced by Swift into not-found errors as expected
// by the Backend.Get interface contract.
type swiftBackendReader struct {
ReadSeekCloser
}
func (r swiftBackendReader) Read(buf []byte) (int, error) {
n, err := r.ReadSeekCloser.Read(buf)
if err == nil || err == io.EOF {
return n, err
}
if errors.IsNotFound(err) {
return n, errgo.WithCausef(nil, ErrNotFound, "")
}
return n, errgo.Mask(err)
}
// gooseLogger implements the logger interface required
// by goose, using the loggo logger to do the actual
// logging.
// TODO: Patch goose to use loggo directly.
type gooseLogger struct{}
func (gooseLogger) Printf(f string, a ...interface{}) {
logger.LogCallf(2, loggo.DEBUG, f, a...)
}