diff --git a/go.mod b/go.mod index 5516a0a10..62856b6dc 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( kmodules.xyz/client-go v0.30.44 kmodules.xyz/custom-resources v0.30.0 kmodules.xyz/offshoot-api v0.30.1 - stash.appscode.dev/apimachinery v0.40.0 + stash.appscode.dev/apimachinery v0.40.1-0.20250729081845-3cc491bb0ada ) require ( diff --git a/go.sum b/go.sum index 0134e3ebc..6fcbb15f1 100644 --- a/go.sum +++ b/go.sum @@ -554,5 +554,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= -stash.appscode.dev/apimachinery v0.40.0 h1:U6oNI0Ivx+Wo74GVnMDv9VoI1zMwdIGgd5HK2rs5oKc= -stash.appscode.dev/apimachinery v0.40.0/go.mod h1:y1VgM/7CT990qqHAtE0JGg1N0sFWzmrq/9HzUU5V8dc= +stash.appscode.dev/apimachinery v0.40.1-0.20250729081845-3cc491bb0ada h1:P/mwSqUJKNlW67ZCJJmGCgGZyACktHeD85d+zw47DtA= +stash.appscode.dev/apimachinery v0.40.1-0.20250729081845-3cc491bb0ada/go.mod h1:y1VgM/7CT990qqHAtE0JGg1N0sFWzmrq/9HzUU5V8dc= diff --git a/pkg/backup.go b/pkg/backup.go index cc58a2d5e..84bc0a848 100644 --- a/pkg/backup.go +++ b/pkg/backup.go @@ -234,5 +234,9 @@ func (opt *postgresOptions) backupPostgreSQL(targetRef api_v1beta1.TargetRef) (* return nil, err } + err = resticWrapper.EnsureNoExclusiveLock(opt.kubeClient, opt.namespace) + if err != nil { + return nil, err + } return resticWrapper.RunBackup(opt.backupOptions, targetRef) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 9d98f8026..9595150af 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -821,7 +821,7 @@ sigs.k8s.io/structured-merge-diff/v4/value ## explicit; go 1.12 sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 -# stash.appscode.dev/apimachinery v0.40.0 +# stash.appscode.dev/apimachinery v0.40.1-0.20250729081845-3cc491bb0ada ## explicit; go 1.23.0 stash.appscode.dev/apimachinery/apis stash.appscode.dev/apimachinery/apis/repositories diff --git a/vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go b/vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go index f36100ded..8304b8f77 100644 --- a/vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go +++ b/vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go @@ -494,7 +494,7 @@ func (w *ResticWrapper) run(commands ...Command) ([]byte, error) { if err != nil { return nil, formatError(err, errBuff.String()) } - klog.Infoln("sh-output:", string(out)) + klog.V(2).Infoln("sh-output:", string(out)) return out, nil } @@ -595,6 +595,32 @@ func (w *ResticWrapper) listKey() ([]byte, error) { return w.run(Command{Name: ResticCMD, Args: args}) } +func (w *ResticWrapper) listLocks() ([]byte, error) { + klog.Infoln("Listing restic locks") + + args := []interface{}{"list", "locks", "--no-lock"} + + args = w.appendCacheDirFlag(args) + args = w.appendMaxConnectionsFlag(args) + args = w.appendCaCertFlag(args) + args = w.appendInsecureTLSFlag(args) + + return w.run(Command{Name: ResticCMD, Args: args}) +} + +func (w *ResticWrapper) lockStats(lockID string) ([]byte, error) { + klog.Infoln("Getting stats of restic lock") + + args := []interface{}{"cat", "lock", lockID, "--no-lock"} + + args = w.appendCacheDirFlag(args) + args = w.appendMaxConnectionsFlag(args) + args = w.appendCaCertFlag(args) + args = w.appendInsecureTLSFlag(args) + + return w.run(Command{Name: ResticCMD, Args: args}) +} + func (w *ResticWrapper) updateKey(params keyParams) ([]byte, error) { klog.Infoln("Updating restic key") diff --git a/vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go b/vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go index bc4d5f5ff..e6de5fec8 100644 --- a/vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go +++ b/vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go @@ -20,11 +20,13 @@ import ( "bufio" "bytes" "encoding/json" + "fmt" "io" "os" "path/filepath" "regexp" "strings" + "time" api_v1beta1 "stash.appscode.dev/apimachinery/apis/stash/v1beta1" ) @@ -241,3 +243,34 @@ type ForgetGroup struct { type StatsContainer struct { TotalSize uint64 `json:"total_size"` } + +type LockStats struct { + Time time.Time `json:"time"` + Exclusive bool `json:"exclusive"` // true if the lock is exclusive, false if it is non-exclusive + Hostname string `json:"hostname"` // Hostname of the machine where the lock was created, our case PodName + Username string `json:"username"` + PID int `json:"pid"` + UID int `json:"uid"` + GID int `json:"gid"` +} + +func extractLockStats(raw []byte) (*LockStats, error) { + var stats LockStats + if err := json.Unmarshal(raw, &stats); err != nil { + return nil, fmt.Errorf("cannot decode lock JSON: %w", err) + } + return &stats, nil +} + +func extractLockIDs(r io.Reader) ([]string, error) { + sc := bufio.NewScanner(r) + var ids []string + + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + if len(line) >= 64 { + ids = append(ids, line[:64]) + } + } + return ids, sc.Err() +} diff --git a/vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go b/vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go index ae9b74b3e..6914b6634 100644 --- a/vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go +++ b/vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go @@ -16,7 +16,102 @@ limitations under the License. package restic +import ( + "bytes" + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + kutil "kmodules.xyz/client-go" +) + func (w *ResticWrapper) UnlockRepository() error { _, err := w.unlock() return err } + +// getLockIDs lists every lock ID currently held in the repository. +func (w *ResticWrapper) getLockIDs() ([]string, error) { + w.sh.ShowCMD = true + out, err := w.listLocks() + if err != nil { + return nil, err + } + return extractLockIDs(bytes.NewReader(out)) +} + +// getLockStats returns the decoded JSON for a single lock. +func (w *ResticWrapper) getLockStats(lockID string) (*LockStats, error) { + w.sh.ShowCMD = true + out, err := w.lockStats(lockID) + if err != nil { + return nil, err + } + return extractLockStats(out) +} + +// getPodNameIfAnyExclusiveLock scans every lock and returns the hostname aka (Pod name) of the first exclusive lock it finds, or "" if none exist. +func (w *ResticWrapper) getPodNameIfAnyExclusiveLock() (string, error) { + klog.Infoln("Checking for exclusive locks in the repository...") + ids, err := w.getLockIDs() + if err != nil { + return "", fmt.Errorf("failed to list locks: %w", err) + } + for _, id := range ids { + st, err := w.getLockStats(id) + if err != nil { + return "", fmt.Errorf("failed to inspect lock %s: %w", id, err) + } + if st.Exclusive { // There's no chances to get multiple exclusive locks, so we can return the first one we find. + return st.Hostname, nil + } + } + return "", nil +} + +// EnsureNoExclusiveLock blocks until any exclusive lock is released. +// If a lock is held by a Running Pod, it waits; otherwise it unlocks. +func (w *ResticWrapper) EnsureNoExclusiveLock(k8sClient kubernetes.Interface, namespace string) error { + klog.Infoln("Ensuring no exclusive lock is held in the repository...") + podName, err := w.getPodNameIfAnyExclusiveLock() + if err != nil { + return fmt.Errorf("failed to query exclusive lock: %w", err) + } + if podName == "" { + klog.Infoln("No exclusive lock found, nothing to do.") + return nil // nothing to do + } + + return wait.PollUntilContextTimeout( + context.Background(), + 5*time.Second, + kutil.ReadinessTimeout, + true, + func(ctx context.Context) (bool, error) { + klog.Infoln("Getting Pod:", podName, "to check if it's finished...") + pod, err := k8sClient.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) + switch { + case errors.IsNotFound(err): // Pod gone → unlock + klog.Infoln("Pod:", podName, "not found, unlocking repository...") + _, err := w.unlock() + return true, err + case err != nil: // API error → stop + return false, err + case pod.Status.Phase == corev1.PodSucceeded || + pod.Status.Phase == corev1.PodFailed: // Pod finished → unlock + klog.Infoln("Pod:", podName, "finished with phase", pod.Status.Phase, ", unlocking repository...") + _, err := w.unlock() + return true, err + default: // Not finished yet → keep waiting + klog.Infoln("Pod:", podName, "is in phase", pod.Status.Phase, ", waiting for it to finish...") + return false, nil + } + }, + ) +}