Skip to content

Commit 6c43bbd

Browse files
1gtmanisurrahman75
andauthored
[cherry-pick] Add Automatic Restic Unlock feature (#274) (#277)
/cherry-pick Signed-off-by: Anisur Rahman <anisur@appscode.com> Co-authored-by: Anisur Rahman <54911684+anisurrahman75@users.noreply.github.com>
1 parent 0f12e09 commit 6c43bbd

File tree

7 files changed

+163
-4
lines changed

7 files changed

+163
-4
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ require (
2323
kmodules.xyz/custom-resources v0.30.0
2424
kmodules.xyz/offshoot-api v0.30.1
2525
kubedb.dev/apimachinery v0.47.0
26-
stash.appscode.dev/apimachinery v0.40.0
26+
stash.appscode.dev/apimachinery v0.40.1-0.20250731053416-f9825ca2e8f5
2727
)
2828

2929
require (

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,5 +562,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
562562
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
563563
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
564564
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
565-
stash.appscode.dev/apimachinery v0.40.0 h1:U6oNI0Ivx+Wo74GVnMDv9VoI1zMwdIGgd5HK2rs5oKc=
566-
stash.appscode.dev/apimachinery v0.40.0/go.mod h1:y1VgM/7CT990qqHAtE0JGg1N0sFWzmrq/9HzUU5V8dc=
565+
stash.appscode.dev/apimachinery v0.40.1-0.20250731053416-f9825ca2e8f5 h1:ymy/F4PBvEu5RFDWz6T5UwrRWiAjmWnLZ9GwPQyyg7Q=
566+
stash.appscode.dev/apimachinery v0.40.1-0.20250731053416-f9825ca2e8f5/go.mod h1:y1VgM/7CT990qqHAtE0JGg1N0sFWzmrq/9HzUU5V8dc=

pkg/backup.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,5 +253,10 @@ func (opt *redisOptions) backupRedis(targetRef api_v1beta1.TargetRef) (*restic.B
253253
if err != nil {
254254
return nil, err
255255
}
256+
err = resticWrapper.EnsureNoExclusiveLock(opt.kubeClient, opt.namespace)
257+
if err != nil {
258+
return nil, err
259+
}
260+
256261
return resticWrapper.RunBackup(opt.backupOptions, targetRef)
257262
}

vendor/modules.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ sigs.k8s.io/structured-merge-diff/v4/value
840840
## explicit; go 1.12
841841
sigs.k8s.io/yaml
842842
sigs.k8s.io/yaml/goyaml.v2
843-
# stash.appscode.dev/apimachinery v0.40.0
843+
# stash.appscode.dev/apimachinery v0.40.1-0.20250731053416-f9825ca2e8f5
844844
## explicit; go 1.23.0
845845
stash.appscode.dev/apimachinery/apis
846846
stash.appscode.dev/apimachinery/apis/repositories

vendor/stash.appscode.dev/apimachinery/pkg/restic/commands.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,32 @@ func (w *ResticWrapper) listKey() ([]byte, error) {
595595
return w.run(Command{Name: ResticCMD, Args: args})
596596
}
597597

598+
func (w *ResticWrapper) listLocks() ([]byte, error) {
599+
klog.Infoln("Listing restic locks")
600+
601+
args := []interface{}{"list", "locks", "--no-lock"}
602+
603+
args = w.appendCacheDirFlag(args)
604+
args = w.appendMaxConnectionsFlag(args)
605+
args = w.appendCaCertFlag(args)
606+
args = w.appendInsecureTLSFlag(args)
607+
608+
return w.run(Command{Name: ResticCMD, Args: args})
609+
}
610+
611+
func (w *ResticWrapper) lockStats(lockID string) ([]byte, error) {
612+
klog.Infoln("Getting stats of restic lock")
613+
614+
args := []interface{}{"cat", "lock", lockID, "--no-lock"}
615+
616+
args = w.appendCacheDirFlag(args)
617+
args = w.appendMaxConnectionsFlag(args)
618+
args = w.appendCaCertFlag(args)
619+
args = w.appendInsecureTLSFlag(args)
620+
621+
return w.run(Command{Name: ResticCMD, Args: args})
622+
}
623+
598624
func (w *ResticWrapper) updateKey(params keyParams) ([]byte, error) {
599625
klog.Infoln("Updating restic key")
600626

vendor/stash.appscode.dev/apimachinery/pkg/restic/output.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"bufio"
2121
"bytes"
2222
"encoding/json"
23+
"fmt"
2324
"io"
2425
"os"
2526
"path/filepath"
2627
"regexp"
2728
"strings"
29+
"time"
2830

2931
api_v1beta1 "stash.appscode.dev/apimachinery/apis/stash/v1beta1"
3032
)
@@ -241,3 +243,34 @@ type ForgetGroup struct {
241243
type StatsContainer struct {
242244
TotalSize uint64 `json:"total_size"`
243245
}
246+
247+
type LockStats struct {
248+
Time time.Time `json:"time"`
249+
Exclusive bool `json:"exclusive"` // true if the lock is exclusive, false if it is non-exclusive
250+
Hostname string `json:"hostname"` // Hostname of the machine where the lock was created, our case PodName
251+
Username string `json:"username"`
252+
PID int `json:"pid"`
253+
UID int `json:"uid"`
254+
GID int `json:"gid"`
255+
}
256+
257+
func extractLockStats(raw []byte) (*LockStats, error) {
258+
var stats LockStats
259+
if err := json.Unmarshal(raw, &stats); err != nil {
260+
return nil, fmt.Errorf("cannot decode lock JSON: %w", err)
261+
}
262+
return &stats, nil
263+
}
264+
265+
func extractLockIDs(r io.Reader) ([]string, error) {
266+
sc := bufio.NewScanner(r)
267+
var ids []string
268+
269+
for sc.Scan() {
270+
line := strings.TrimSpace(sc.Text())
271+
if len(line) >= 64 {
272+
ids = append(ids, line[:64])
273+
}
274+
}
275+
return ids, sc.Err()
276+
}

vendor/stash.appscode.dev/apimachinery/pkg/restic/unlock.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,102 @@ limitations under the License.
1616

1717
package restic
1818

19+
import (
20+
"bytes"
21+
"context"
22+
"fmt"
23+
"time"
24+
25+
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/api/errors"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
"k8s.io/client-go/kubernetes"
30+
"k8s.io/klog/v2"
31+
kutil "kmodules.xyz/client-go"
32+
)
33+
1934
func (w *ResticWrapper) UnlockRepository() error {
2035
_, err := w.unlock()
2136
return err
2237
}
38+
39+
// getLockIDs lists every lock ID currently held in the repository.
40+
func (w *ResticWrapper) getLockIDs() ([]string, error) {
41+
w.sh.ShowCMD = true
42+
out, err := w.listLocks()
43+
if err != nil {
44+
return nil, err
45+
}
46+
return extractLockIDs(bytes.NewReader(out))
47+
}
48+
49+
// getLockStats returns the decoded JSON for a single lock.
50+
func (w *ResticWrapper) getLockStats(lockID string) (*LockStats, error) {
51+
w.sh.ShowCMD = true
52+
out, err := w.lockStats(lockID)
53+
if err != nil {
54+
return nil, err
55+
}
56+
return extractLockStats(out)
57+
}
58+
59+
// getPodNameIfAnyExclusiveLock scans every lock and returns the hostname aka (Pod name) of the first exclusive lock it finds, or "" if none exist.
60+
func (w *ResticWrapper) getPodNameIfAnyExclusiveLock() (string, error) {
61+
klog.Infoln("Checking for exclusive locks in the repository...")
62+
ids, err := w.getLockIDs()
63+
if err != nil {
64+
return "", fmt.Errorf("failed to list locks: %w", err)
65+
}
66+
for _, id := range ids {
67+
st, err := w.getLockStats(id)
68+
if err != nil {
69+
return "", fmt.Errorf("failed to inspect lock %s: %w", id, err)
70+
}
71+
if st.Exclusive { // There's no chances to get multiple exclusive locks, so we can return the first one we find.
72+
return st.Hostname, nil
73+
}
74+
}
75+
return "", nil
76+
}
77+
78+
// EnsureNoExclusiveLock blocks until any exclusive lock is released.
79+
// If a lock is held by a Running Pod, it waits; otherwise it unlocks.
80+
func (w *ResticWrapper) EnsureNoExclusiveLock(k8sClient kubernetes.Interface, namespace string) error {
81+
klog.Infoln("Ensuring no exclusive lock is held in the repository...")
82+
podName, err := w.getPodNameIfAnyExclusiveLock()
83+
if err != nil {
84+
return fmt.Errorf("failed to query exclusive lock: %w", err)
85+
}
86+
if podName == "" {
87+
klog.Infoln("No exclusive lock found, nothing to do.")
88+
return nil // nothing to do
89+
}
90+
91+
return wait.PollUntilContextTimeout(
92+
context.Background(),
93+
5*time.Second,
94+
kutil.ReadinessTimeout,
95+
true,
96+
func(ctx context.Context) (bool, error) {
97+
klog.Infoln("Getting Pod:", podName, "to check if it's finished...")
98+
pod, err := k8sClient.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
99+
switch {
100+
case errors.IsNotFound(err): // Pod gone → unlock
101+
klog.Infoln("Pod:", podName, "not found, unlocking repository...")
102+
_, err := w.unlock()
103+
return true, err
104+
case err != nil: // API error → stop
105+
return false, err
106+
case pod.Status.Phase == corev1.PodSucceeded ||
107+
pod.Status.Phase == corev1.PodFailed: // Pod finished → unlock
108+
klog.Infoln("Pod:", podName, "finished with phase", pod.Status.Phase, ", unlocking repository...")
109+
_, err := w.unlock()
110+
return true, err
111+
default: // Not finished yet → keep waiting
112+
klog.Infoln("Pod:", podName, "is in phase", pod.Status.Phase, ", waiting for it to finish...")
113+
return false, nil
114+
}
115+
},
116+
)
117+
}

0 commit comments

Comments
 (0)