-
Notifications
You must be signed in to change notification settings - Fork 0
/
azureservice.go
308 lines (261 loc) · 10.6 KB
/
azureservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
// Package azurestore provides a Azure Blob Storage based backend
// AzureStore is a storage backend that uses the AzService interface in order to store uploads in Azure Blob Storage.
// It stores the uploads in a container specified in two different BlockBlob: The `[id].info` blobs are used to store the fileinfo in JSON format. The `[id]` blobs without an extension contain the raw binary data uploaded.
// If the upload is not finished within a week, the uncommited blocks will be discarded.
// Support for setting the default Continaer access type and Blob access tier varies on your Azure Storage Account and its limits.
// More information about Container access types and limts
// https://docs.microsoft.com/en-us/azure/storage/blobs/anonymous-read-access-configure?tabs=portal
// More information about Blob access tiers and limits
// https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-performance-tiers
// https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview#access-tiers-for-block-blob-data
package azurestore
import (
"context"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"net/url"
"sort"
"strings"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/jimydavis/tusd/pkg/handler"
)
const (
InfoBlobSuffix string = ".info"
MaxBlockBlobSize int64 = azblob.BlockBlobMaxBlocks * azblob.BlockBlobMaxStageBlockBytes
MaxBlockBlobChunkSize int64 = azblob.BlockBlobMaxStageBlockBytes
)
type azService struct {
BlobAccessTier azblob.AccessTierType
ContainerURL *azblob.ContainerURL
ContainerName string
}
type AzService interface {
NewBlob(ctx context.Context, name string) (AzBlob, error)
}
type AzConfig struct {
AccountName string
AccountKey string
BlobAccessTier string
ContainerName string
ContainerAccessType string
Endpoint string
}
type AzBlob interface {
// Delete the blob
Delete(ctx context.Context) error
// Upload the blob
Upload(ctx context.Context, body io.ReadSeeker) error
// Download returns a readcloser to download the contents of the blob
Download(ctx context.Context) (io.ReadCloser, error)
// Get the offset of the blob and its indexes
GetOffset(ctx context.Context) (int64, error)
// Commit the uploaded blocks to the BlockBlob
Commit(ctx context.Context) error
}
type BlockBlob struct {
Blob *azblob.BlockBlobURL
AccessTier azblob.AccessTierType
Indexes []int
}
type InfoBlob struct {
Blob *azblob.BlockBlobURL
}
// New Azure service for communication to Azure BlockBlob Storage API
func NewAzureService(config *AzConfig) (AzService, error) {
// struct to store your credentials.
credential, err := azblob.NewSharedKeyCredential(config.AccountName, config.AccountKey)
if err != nil {
return nil, err
}
// Might be limited by the storage account
// "" or default inherits the access type from the Storage Account
var containerAccessType azblob.PublicAccessType
switch config.ContainerAccessType {
case "container":
containerAccessType = azblob.PublicAccessContainer
case "blob":
containerAccessType = azblob.PublicAccessBlob
case "":
default:
containerAccessType = azblob.PublicAccessNone
}
// Does not support the premium access tiers
var blobAccessTierType azblob.AccessTierType
switch config.BlobAccessTier {
case "archive":
blobAccessTierType = azblob.AccessTierArchive
case "cool":
blobAccessTierType = azblob.AccessTierCool
case "hot":
blobAccessTierType = azblob.AccessTierHot
case "":
default:
blobAccessTierType = azblob.DefaultAccessTier
}
// The pipeline specifies things like retry policies, logging, deserialization of HTTP response payloads, and more.
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
cURL, _ := url.Parse(fmt.Sprintf("%s/%s", config.Endpoint, config.ContainerName))
// Get the ContainerURL URL
containerURL := azblob.NewContainerURL(*cURL, p)
// Do not care about response since it will fail if container exists and create if it does not.
_, _ = containerURL.Create(context.Background(), azblob.Metadata{}, containerAccessType)
return &azService{
BlobAccessTier: blobAccessTierType,
ContainerURL: &containerURL,
ContainerName: config.ContainerName,
}, nil
}
// Determine if we return a InfoBlob or BlockBlob, based on the name
func (service *azService) NewBlob(ctx context.Context, name string) (AzBlob, error) {
var fileBlob AzBlob
bb := service.ContainerURL.NewBlockBlobURL(name)
if strings.HasSuffix(name, InfoBlobSuffix) {
fileBlob = &InfoBlob{
Blob: &bb,
}
} else {
fileBlob = &BlockBlob{
Blob: &bb,
Indexes: []int{},
AccessTier: service.BlobAccessTier,
}
}
return fileBlob, nil
}
// Delete the blockBlob from Azure Blob Storage
func (blockBlob *BlockBlob) Delete(ctx context.Context) error {
_, err := blockBlob.Blob.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
return err
}
// Upload a block to Azure Blob Storage and add it to the indexes to be after upload is finished
func (blockBlob *BlockBlob) Upload(ctx context.Context, body io.ReadSeeker) error {
// Keep track of the indexes
var index int
if len(blockBlob.Indexes) == 0 {
index = 0
} else {
index = blockBlob.Indexes[len(blockBlob.Indexes)-1] + 1
}
blockBlob.Indexes = append(blockBlob.Indexes, index)
_, err := blockBlob.Blob.StageBlock(ctx, blockIDIntToBase64(index), body, azblob.LeaseAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{})
if err != nil {
return err
}
return nil
}
// Download the blockBlob from Azure Blob Storage
func (blockBlob *BlockBlob) Download(ctx context.Context) (io.ReadCloser, error) {
downloadResponse, err := blockBlob.Blob.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
// If the file does not exist, it will not return an error, but a 404 status and body
if downloadResponse != nil && downloadResponse.StatusCode() == 404 {
return nil, handler.ErrNotFound
}
if err != nil {
// This might occur when the blob is being uploaded, but a block list has not been committed yet
if isAzureError(err, "BlobNotFound") {
err = handler.ErrNotFound
}
return nil, err
}
return downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20}), nil
}
func (blockBlob *BlockBlob) GetOffset(ctx context.Context) (int64, error) {
// Get the offset of the file from azure storage
// For the blob, show each block (ID and size) that is a committed part of it.
var indexes []int
var offset int64
getBlock, err := blockBlob.Blob.GetBlockList(ctx, azblob.BlockListAll, azblob.LeaseAccessConditions{})
if err != nil {
if isAzureError(err, "BlobNotFound") {
err = handler.ErrNotFound
}
return 0, err
}
// Need committed blocks to be added to offset to know how big the file really is
for _, block := range getBlock.CommittedBlocks {
offset += int64(block.Size)
indexes = append(indexes, blockIDBase64ToInt(block.Name))
}
// Need to get the uncommitted blocks so that we can commit them
for _, block := range getBlock.UncommittedBlocks {
offset += int64(block.Size)
indexes = append(indexes, blockIDBase64ToInt(block.Name))
}
// Sort the block IDs in ascending order. This is required as Azure returns the block lists alphabetically
// and we store the indexes as base64 encoded ints.
sort.Ints(indexes)
blockBlob.Indexes = indexes
return offset, nil
}
// After all the blocks have been uploaded, we commit the unstaged blocks by sending a Block List
func (blockBlob *BlockBlob) Commit(ctx context.Context) error {
base64BlockIDs := make([]string, len(blockBlob.Indexes))
for index, id := range blockBlob.Indexes {
base64BlockIDs[index] = blockIDIntToBase64(id)
}
_, err := blockBlob.Blob.CommitBlockList(ctx, base64BlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, blockBlob.AccessTier, nil, azblob.ClientProvidedKeyOptions{})
return err
}
// Delete the infoBlob from Azure Blob Storage
func (infoBlob *InfoBlob) Delete(ctx context.Context) error {
_, err := infoBlob.Blob.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
return err
}
// Upload the infoBlob to Azure Blob Storage
// Because the info file is presumed to be smaller than azblob.BlockBlobMaxUploadBlobBytes (256MiB), we can upload it all in one go
// New uploaded data will create a new, or overwrite the existing block blob
func (infoBlob *InfoBlob) Upload(ctx context.Context, body io.ReadSeeker) error {
_, err := infoBlob.Blob.Upload(ctx, body, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.DefaultAccessTier, nil, azblob.ClientProvidedKeyOptions{})
return err
}
// Download the infoBlob from Azure Blob Storage
func (infoBlob *InfoBlob) Download(ctx context.Context) (io.ReadCloser, error) {
downloadResponse, err := infoBlob.Blob.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
// If the file does not exist, it will not return an error, but a 404 status and body
if downloadResponse != nil && downloadResponse.StatusCode() == 404 {
return nil, fmt.Errorf("file %s does not exist", infoBlob.Blob.ToBlockBlobURL())
}
if err != nil {
if isAzureError(err, "BlobNotFound") {
err = handler.ErrNotFound
}
return nil, err
}
return downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20}), nil
}
// infoBlob does not utilise offset, so just return 0, nil
func (infoBlob *InfoBlob) GetOffset(ctx context.Context) (int64, error) {
return 0, nil
}
// infoBlob does not have uncommited blocks, so just return nil
func (infoBlob *InfoBlob) Commit(ctx context.Context) error {
return nil
}
// === Helper Functions ===
// These helper functions convert a binary block ID to a base-64 string and vice versa
// NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
func blockIDBinaryToBase64(blockID []byte) string {
return base64.StdEncoding.EncodeToString(blockID)
}
func blockIDBase64ToBinary(blockID string) []byte {
binary, _ := base64.StdEncoding.DecodeString(blockID)
return binary
}
// These helper functions convert an int block ID to a base-64 string and vice versa
func blockIDIntToBase64(blockID int) string {
binaryBlockID := (&[4]byte{})[:] // All block IDs are 4 bytes long
binary.LittleEndian.PutUint32(binaryBlockID, uint32(blockID))
return blockIDBinaryToBase64(binaryBlockID)
}
func blockIDBase64ToInt(blockID string) int {
blockIDBase64ToBinary(blockID)
return int(binary.LittleEndian.Uint32(blockIDBase64ToBinary(blockID)))
}
func isAzureError(err error, code string) bool {
if err, ok := err.(azblob.StorageError); ok && string(err.ServiceCode()) == code {
return true
}
return false
}