-
Notifications
You must be signed in to change notification settings - Fork 0
/
folder.go
265 lines (234 loc) · 9.11 KB
/
folder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package azure
import (
"context"
"fmt"
"io"
"net/url"
"strconv"
"strings"
"time"
"github.com/wal-g/storages/storage"
"github.com/wal-g/tracelog"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/pkg/errors"
)
const (
AccountSetting = "AZURE_STORAGE_ACCOUNT"
AccessKeySetting = "AZURE_STORAGE_ACCESS_KEY"
SasTokenSetting = "AZURE_STORAGE_SAS_TOKEN"
EnvironmentName = "AZURE_ENVIRONMENT_NAME"
BufferSizeSetting = "AZURE_BUFFER_SIZE"
MaxBuffersSetting = "AZURE_MAX_BUFFERS"
TryTimeoutSetting = "AZURE_TRY_TIMEOUT"
minBufferSize = 1024
defaultBufferSize = 64 * 1024 * 1024
minBuffers = 1
defaultBuffers = 3
defaultTryTimeout = 5
defaultEnvName = "AzurePublicCloud"
)
var SettingList = []string{
AccountSetting,
AccessKeySetting,
SasTokenSetting,
EnvironmentName,
BufferSizeSetting,
MaxBuffersSetting,
}
func NewFolderError(err error, format string, args ...interface{}) storage.Error {
return storage.NewError(err, "Azure", format, args...)
}
func NewCredentialError(settingName string) storage.Error {
return NewFolderError(errors.New("Credential error"),
"%s setting is not set", settingName)
}
func NewFolder(
uploadStreamToBlockBlobOptions azblob.UploadStreamToBlockBlobOptions,
containerURL azblob.ContainerURL,
path string) *Folder {
return &Folder{uploadStreamToBlockBlobOptions, containerURL, path}
}
func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder, error) {
var accountName, accountKey, accountToken, environmentName string
var ok, usingToken bool
if accountName, ok = settings[AccountSetting]; !ok {
return nil, NewCredentialError(AccountSetting)
}
if accountKey, ok = settings[AccessKeySetting]; !ok {
if accountToken, usingToken = settings[SasTokenSetting]; !usingToken {
return nil, NewCredentialError(AccessKeySetting)
}
}
if environmentName, ok = settings[EnvironmentName]; !ok {
environmentName = defaultEnvName
}
var credential azblob.Credential
var err error
if usingToken {
credential = azblob.NewAnonymousCredential()
} else {
credential, err = azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, NewFolderError(err, "Unable to create credentials")
}
}
var tryTimeout int
if strTryTimeout, ok := settings[TryTimeoutSetting]; ok {
tryTimeout, err = strconv.Atoi(strTryTimeout)
if err != nil {
return nil, NewFolderError(err, "Invalid azure try timeout setting")
}
} else {
tryTimeout = defaultTryTimeout
}
pipeLine := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: time.Duration(tryTimeout) * time.Minute}})
containerName, path, err := storage.GetPathFromPrefix(prefix)
if err != nil {
return nil, NewFolderError(err, "Unable to create container")
}
storageEndpointSuffix := getStorageEndpointSuffix(environmentName)
var serviceURL *url.URL
if usingToken {
serviceURL, err = url.Parse(fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, containerName, accountToken))
if err != nil {
return nil, NewFolderError(err, "Unable to parse service URL with SAS token")
}
} else {
serviceURL, err = url.Parse(fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, containerName))
if err != nil {
return nil, NewFolderError(err, "Unable to parse service URL")
}
}
containerURL := azblob.NewContainerURL(*serviceURL, pipeLine)
path = storage.AddDelimiterToPath(path)
return NewFolder(getUploadStreamToBlockBlobOptions(settings), containerURL, path), nil
}
type Folder struct {
uploadStreamToBlockBlobOptions azblob.UploadStreamToBlockBlobOptions
containerURL azblob.ContainerURL
path string
}
func (folder *Folder) GetPath() string {
return folder.path
}
func (folder *Folder) Exists(objectRelativePath string) (bool, error) {
path := storage.JoinPath(folder.path, objectRelativePath)
ctx := context.Background()
blobURL := folder.containerURL.NewBlockBlobURL(path)
_, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
if stgErr, ok := err.(azblob.StorageError); ok && stgErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
return false, nil
}
if err != nil {
return false, NewFolderError(err, "Unable to stat object %v", path)
}
return true, nil
}
func (folder *Folder) ListFolder() (objects []storage.Object, subFolders []storage.Folder, err error) {
//Marker is used for segmented iteration.
for marker := (azblob.Marker{}); marker.NotDone(); {
blobs, err := folder.containerURL.ListBlobsHierarchySegment(context.Background(), marker, "/", azblob.ListBlobsSegmentOptions{Prefix: folder.path})
if err != nil {
return nil, nil, NewFolderError(err, "Unable to iterate %v", folder.path)
}
//add blobs to the list of storage objects
for _, blob := range blobs.Segment.BlobItems {
objName := strings.TrimPrefix(blob.Name, folder.path)
updated := time.Time(blob.Properties.LastModified)
objects = append(objects, storage.NewLocalObject(objName, updated, *blob.Properties.ContentLength))
}
marker = blobs.NextMarker
//Get subFolder names
blobPrefixes := blobs.Segment.BlobPrefixes
//add subFolders to the list of storage folders
for _, blobPrefix := range blobPrefixes {
subFolderPath := blobPrefix.Name
subFolders = append(subFolders, NewFolder(folder.uploadStreamToBlockBlobOptions, folder.containerURL, subFolderPath))
}
}
return
}
func (folder *Folder) GetSubFolder(subFolderRelativePath string) storage.Folder {
return NewFolder(
folder.uploadStreamToBlockBlobOptions,
folder.containerURL,
storage.AddDelimiterToPath(storage.JoinPath(folder.path, subFolderRelativePath)))
}
func (folder *Folder) ReadObject(objectRelativePath string) (io.ReadCloser, error) {
//Download blob using blobURL obtained from full path to blob
path := storage.JoinPath(folder.path, objectRelativePath)
blobURL := folder.containerURL.NewBlockBlobURL(path)
downloadResponse, err := blobURL.Download(context.Background(), 0, 0, azblob.BlobAccessConditions{}, false)
if stgErr, ok := err.(azblob.StorageError); ok && stgErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
return nil, storage.NewObjectNotFoundError(path)
}
if err != nil {
return nil, NewFolderError(err, "Unable to download blob %s.", path)
}
//retrieve and return the downloaded content
content := downloadResponse.Body(azblob.RetryReaderOptions{})
return content, nil
}
func (folder *Folder) PutObject(name string, content io.Reader) error {
tracelog.DebugLogger.Printf("Put %v into %v\n", name, folder.path)
//Upload content to a block blob using full path
path := storage.JoinPath(folder.path, name)
blobURL := folder.containerURL.NewBlockBlobURL(path)
_, err := azblob.UploadStreamToBlockBlob(context.Background(), content, blobURL, folder.uploadStreamToBlockBlobOptions)
if err != nil {
return NewFolderError(err, "Unable to upload blob %v", name)
}
tracelog.DebugLogger.Printf("Put %v done\n", name)
return nil
}
func (folder *Folder) DeleteObjects(objectRelativePaths []string) error {
for _, objectRelativePath := range objectRelativePaths {
//Delete blob using blobURL obtained from full path to blob
path := storage.JoinPath(folder.path, objectRelativePath)
blobURL := folder.containerURL.NewBlockBlobURL(path)
tracelog.DebugLogger.Printf("Delete %v\n", path)
_, err := blobURL.Delete(context.Background(), azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
if stgErr, ok := err.(azblob.StorageError); ok && stgErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
continue
}
if err != nil {
return NewFolderError(err, "Unable to delete object %v", path)
} else {
//blob is deleted
}
}
return nil
}
func getUploadStreamToBlockBlobOptions(settings map[string]string) azblob.UploadStreamToBlockBlobOptions {
// Configure the size of the rotating buffers
bufSizeS := settings[BufferSizeSetting]
bufferSize, err := strconv.Atoi(bufSizeS)
if err != nil || bufferSize < minBufferSize {
bufferSize = defaultBufferSize
}
// Configure the size of the rotating buffers and number of buffers
maxBufS := settings[MaxBuffersSetting]
maxBuffers, err := strconv.Atoi(maxBufS)
if err != nil || maxBuffers < minBuffers {
maxBuffers = defaultBuffers
}
return azblob.UploadStreamToBlockBlobOptions{MaxBuffers: maxBuffers, BufferSize: bufferSize}
}
// Function will get environment's name and return string with the environment's Azure storage account endpoint suffix.
// Expected names AzureUSGovernmentCloud, AzureChinaCloud, AzureGermanCloud. If any other name is used the func will return
// the Azure storage account endpoint suffix for AzurePublicCloud.
func getStorageEndpointSuffix(environmentName string) string {
var storageEndpointSuffix string
switch environmentName {
case azure.USGovernmentCloud.Name:
storageEndpointSuffix = azure.USGovernmentCloud.StorageEndpointSuffix
case azure.ChinaCloud.Name:
storageEndpointSuffix = azure.ChinaCloud.StorageEndpointSuffix
case azure.GermanCloud.Name:
storageEndpointSuffix = azure.GermanCloud.StorageEndpointSuffix
default:
storageEndpointSuffix = azure.PublicCloud.StorageEndpointSuffix
}
return storageEndpointSuffix
}