forked from Lucretius/vault_raft_snapshot_agent
/
azure.go
85 lines (75 loc) · 2.42 KB
/
azure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package snapshot_agent
import (
"context"
"fmt"
"io"
"sort"
"github.com/Azure/azure-storage-blob-go/azblob"
appconfig "github.com/wasilak/vault_raft_snapshot_agent/config"
"golang.org/x/exp/slog"
)
// CreateAzureSnapshot writes snapshot to azure blob storage
func (s *Snapshotter) CreateAzureSnapshot(reader io.ReadWriter, config *appconfig.Configuration, currentTs int64) (string, error) {
ctx := context.Background()
url := fmt.Sprintf("raft_snapshot-%d.snap", currentTs)
blob := s.AzureUploader.NewBlockBlobURL(url)
_, err := azblob.UploadStreamToBlockBlob(ctx, reader, blob, azblob.UploadStreamToBlockBlobOptions{
BufferSize: 4 * 1024 * 1024,
MaxBuffers: 16,
})
if err != nil {
return "", err
} else {
if config.Retain > 0 {
deleteCtx := context.Background()
res, err := s.AzureUploader.ListBlobsFlatSegment(deleteCtx, azblob.Marker{}, azblob.ListBlobsSegmentOptions{
Prefix: "raft_snapshot-",
MaxResults: 500,
})
if err != nil {
slog.Info("Unable to iterate through bucket to find old snapshots to delete")
return url, err
}
blobs := res.Segment.BlobItems
timestamp := func(o1, o2 *azblob.BlobItemInternal) bool {
return o1.Properties.LastModified.Before(o2.Properties.LastModified)
}
AzureBy(timestamp).Sort(blobs)
if len(blobs)-int(config.Retain) <= 0 {
return url, nil
}
blobsToDelete := blobs[0 : len(blobs)-int(config.Retain)]
for _, b := range blobsToDelete {
val := s.AzureUploader.NewBlockBlobURL(b.Name)
val.Delete(deleteCtx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{})
if err != nil {
slog.Info("Cannot delete old snapshot")
return url, err
}
}
}
return url, nil
}
}
// implementation of Sort interface for s3 objects
type AzureBy func(f1, f2 *azblob.BlobItemInternal) bool
func (by AzureBy) Sort(objects []azblob.BlobItemInternal) {
fs := &azObjectSorter{
objects: objects,
by: by, // The Sort method's receiver is the function (closure) that defines the sort order.
}
sort.Sort(fs)
}
type azObjectSorter struct {
objects []azblob.BlobItemInternal
by func(f1, f2 *azblob.BlobItemInternal) bool // Closure used in the Less method.
}
func (s *azObjectSorter) Len() int {
return len(s.objects)
}
func (s *azObjectSorter) Less(i, j int) bool {
return s.by(&s.objects[i], &s.objects[j])
}
func (s *azObjectSorter) Swap(i, j int) {
s.objects[i], s.objects[j] = s.objects[j], s.objects[i]
}