/
backup.go
114 lines (97 loc) · 2.58 KB
/
backup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package backup
import (
"context"
"github.com/pkg/errors"
client "sigs.k8s.io/controller-runtime/pkg/client"
log "sigs.k8s.io/controller-runtime/pkg/log"
api "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1"
)
type JobType int
const (
TypeBackup JobType = iota
TypeRestore
TypePITRestore
)
type Job struct {
Name string
Type JobType
}
func NewBackupJob(name string) Job {
return Job{
Name: name,
Type: TypeBackup,
}
}
func NewRestoreJob(cr *api.PerconaServerMongoDBRestore) Job {
j := Job{
Name: cr.Name,
Type: TypeRestore,
}
if cr.Spec.PITR != nil {
j.Type = TypePITRestore
}
return j
}
// HasActiveJobs returns true if there are running backups or restores
// in given cluster and namespace
func HasActiveJobs(ctx context.Context, newPBMFunc NewPBMFunc, cl client.Client, cluster *api.PerconaServerMongoDB, current Job, allowLock ...LockHeaderPredicate) (bool, error) {
l := log.FromContext(ctx)
bcps := &api.PerconaServerMongoDBBackupList{}
err := cl.List(ctx,
bcps,
&client.ListOptions{
Namespace: cluster.Namespace,
},
)
if err != nil {
return false, errors.Wrap(err, "get backup list")
}
for _, b := range bcps.Items {
if b.Name == current.Name && current.Type == TypeBackup {
continue
}
if b.Spec.GetClusterName() == cluster.Name &&
b.Status.State != api.BackupStateReady &&
b.Status.State != api.BackupStateError &&
b.Status.State != api.BackupStateWaiting {
l.Info("Waiting for backup to complete", "backup", b.Name, "status", b.Status.State)
return true, nil
}
}
rstrs := &api.PerconaServerMongoDBRestoreList{}
err = cl.List(ctx,
rstrs,
&client.ListOptions{
Namespace: cluster.Namespace,
},
)
if err != nil {
return false, errors.Wrap(err, "get restore list")
}
for _, r := range rstrs.Items {
if r.Name == current.Name && (current.Type == TypeRestore || current.Type == TypePITRestore) {
continue
}
if r.Spec.ClusterName == cluster.Name &&
r.Status.State != api.RestoreStateReady &&
r.Status.State != api.RestoreStateError &&
r.Status.State != api.RestoreStateWaiting {
l.Info("Waiting for restore to complete", "restore", r.Name, "status", r.Status.State)
return true, nil
}
}
pbm, err := newPBMFunc(ctx, cl, cluster)
if err != nil {
return false, errors.Wrap(err, "getting PBM object")
}
defer pbm.Close(ctx)
allowLock = append(allowLock, NotJobLock(current))
hasLocks, err := pbm.HasLocks(allowLock...)
if err != nil {
return false, errors.Wrap(err, "check PBM locks")
}
if hasLocks {
l.Info("Waiting for PBM locks to be relased")
}
return hasLocks, nil
}