/
backupsync.go
159 lines (147 loc) · 4.41 KB
/
backupsync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package controllers
import (
"context"
"encoding/json"
"io"
"os"
"path/filepath"
"time"
storkv1 "github.com/libopenstorage/stork/pkg/apis/stork/v1alpha1"
"github.com/libopenstorage/stork/pkg/crypto"
"github.com/libopenstorage/stork/pkg/log"
"github.com/libopenstorage/stork/pkg/objectstore"
storkops "github.com/portworx/sched-ops/k8s/stork"
"github.com/sirupsen/logrus"
"gocloud.dev/blob"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
)
// BackupSyncController reconciles applicationbackup objects
type BackupSyncController struct {
Recorder record.EventRecorder
SyncInterval time.Duration
stopChannel chan os.Signal
}
// Init Initializes the backup sync controller
func (b *BackupSyncController) Init(stopChannel chan os.Signal) error {
b.stopChannel = stopChannel
go b.startBackupSync()
return nil
}
func (b *BackupSyncController) startBackupSync() {
for {
select {
case <-time.After(b.SyncInterval):
backupLocations, err := storkops.Instance().ListBackupLocations("")
if err != nil {
logrus.Errorf("Error getting backup location to sync: %v", err)
continue
}
for _, backupLocation := range backupLocations.Items {
err := b.syncBackupsFromLocation(&backupLocation)
if err != nil {
log.BackupLocationLog(&backupLocation).Errorf("Error syncing backups from location: %v", err)
continue
}
}
case <-b.stopChannel:
return
}
}
}
func (b *BackupSyncController) syncBackupsFromLocation(location *storkv1.BackupLocation) error {
if !location.Location.Sync {
return nil
}
bucket, err := objectstore.GetBucket(location)
if err != nil {
return err
}
iterator := bucket.List(&blob.ListOptions{
Prefix: location.Namespace + "/",
Delimiter: "/",
})
backups := make(map[string]bool)
for {
object, err := iterator.Next(context.TODO())
if err == io.EOF {
break
}
if err != nil {
return err
}
if object.IsDir {
backups[object.Key] = true
}
}
for backupName := range backups {
iterator := bucket.List(&blob.ListOptions{
Prefix: backupName,
Delimiter: "/",
})
for {
object, err := iterator.Next(context.TODO())
if err == io.EOF {
break
}
if err != nil {
return err
}
if object.IsDir {
data, err := bucket.ReadAll(context.TODO(), filepath.Join(object.Key, metadataObjectName))
if err != nil {
log.BackupLocationLog(location).Errorf("Error syncing backup %v: %v", backupName, err)
continue
}
if location.Location.EncryptionKey != "" {
if data, err = crypto.Decrypt(data, location.Location.EncryptionKey); err != nil {
log.BackupLocationLog(location).Errorf("Error decrypting backup %v during sync: %v", backupName, err)
continue
}
}
backupInfo := storkv1.ApplicationBackup{}
if err = json.Unmarshal(data, &backupInfo); err != nil {
log.BackupLocationLog(location).Errorf("Error parsing backup %v during sync: %v", backupName, err)
continue
}
localBackupInfo, err := storkops.Instance().GetApplicationBackup(backupInfo.Name, backupInfo.Namespace)
if err == nil {
// The UIDs will match if it was originally created on this
// cluster. We don't want to sync those backups
if localBackupInfo.UID == backupInfo.UID {
continue
}
} else if !errors.IsNotFound(err) {
// Ignore any other error except NotFound
continue
}
// Now check if we've synced this backup to this cluster
// already using the generated name
syncedBackupName := b.getSyncedBackupName(&backupInfo)
_, err = storkops.Instance().GetApplicationBackup(syncedBackupName, backupInfo.Namespace)
if !errors.IsNotFound(err) {
// If we get anything other than NotFound ignore it
continue
}
backupInfo.Name = syncedBackupName
backupInfo.UID = ""
backupInfo.ResourceVersion = ""
backupInfo.SelfLink = ""
backupInfo.OwnerReferences = nil
backupInfo.Spec.ReclaimPolicy = storkv1.ApplicationBackupReclaimPolicyRetain
_, err = storkops.Instance().CreateApplicationBackup(&backupInfo)
if err != nil {
return err
}
}
}
}
return nil
}
func (b *BackupSyncController) getSyncedBackupName(backup *storkv1.ApplicationBackup) string {
// For scheduled backups use the original name
if _, ok := backup.Annotations[ApplicationBackupScheduleNameAnnotation]; ok {
return backup.Name
}
return backup.Name + "-" + backup.Status.TriggerTimestamp.Time.Format(nameTimeSuffixFormat)
}