Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v1beta1/questdbsnapshot_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("QuestDBSnapshot Webhook", func() {
},
Spec: QuestDBSnapshotSpec{
QuestDBName: q.Name,
VolumeSnapshotClassName: pointer.String("csi-hostpath-snapclass"),
VolumeSnapshotClassName: pointer.String("snapclass"),
},
}
})
Expand Down
2 changes: 1 addition & 1 deletion api/v1beta1/questdbsnapshotschedule_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var _ = Describe("QuestDBSnapshotSchedule Webhook", func() {
Spec: QuestDBSnapshotScheduleSpec{
Snapshot: QuestDBSnapshotSpec{
QuestDBName: q.Name,
VolumeSnapshotClassName: pointer.String("csi-hostpath-snapclass"),
VolumeSnapshotClassName: pointer.String("snapclass"),
},
Schedule: "*/1 * * * *",
},
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/questdbsnapshot_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ func (r *QuestDBSnapshotReconciler) handlePhaseEmpty(ctx context.Context, snap *
if err := r.Status().Update(ctx, snap); err != nil {
return ctrl.Result{}, err
}
r.Recorder.Eventf(snap, v1.EventTypeNormal, "SnapshotPending", "Running 'SNAPSHOT PREPARE;' for snapshot %s", snap.Name)
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -399,6 +398,7 @@ func (r *QuestDBSnapshotReconciler) handlePhasePending(ctx context.Context, snap
}

// Create the pre-snapshot job
r.Recorder.Eventf(snap, v1.EventTypeNormal, "SnapshotPending", "Running 'SNAPSHOT PREPARE;' for snapshot %s", snap.Name)
job, err := r.buildPreSnapshotJob(ctx, snap)
if err != nil {
return ctrl.Result{}, err
Expand Down
173 changes: 102 additions & 71 deletions internal/controller/questdbsnapshotschedule_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,25 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
var (
q *crdv1beta1.QuestDB
sched *crdv1beta1.QuestDBSnapshotSchedule

timeout = time.Second * 2
//consistencyTimeout = time.Millisecond * 600
timeout = time.Second * 2
interval = time.Millisecond * 100

r *QuestDBSnapshotScheduleReconciler
)

Context("golden path case", Ordered, func() {
var (
snapList = &crdv1beta1.QuestDBSnapshotList{}
timeSource *abtime.ManualTime

q *crdv1beta1.QuestDB
sched *crdv1beta1.QuestDBSnapshotSchedule
r *QuestDBSnapshotScheduleReconciler
)

BeforeAll(func() {
Expand All @@ -47,7 +44,7 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
By("Creating a QuestDB")
q = testutils.BuildAndCreateMockQuestDB(ctx, k8sClient)

By("Creating a QuestDBSnapshotSchedule")
By("Creating a QuestDBSnapshotSchedule that triggers every minute")
sched = &crdv1beta1.QuestDBSnapshotSchedule{
ObjectMeta: metav1.ObjectMeta{
Name: q.Name,
Expand All @@ -56,7 +53,7 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
Spec: crdv1beta1.QuestDBSnapshotScheduleSpec{
Snapshot: crdv1beta1.QuestDBSnapshotSpec{
QuestDBName: q.Name,
VolumeSnapshotClassName: pointer.String("csi-hostpath-snapclass"),
VolumeSnapshotClassName: pointer.String(testutils.SnapshotClassName),
},
Schedule: "*/1 * * * *",
},
Expand All @@ -82,8 +79,8 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {

It("should create a snapshot if the cron schedule has triggered", func() {

By("Bumping the clock more than 1 minute")
timeSource.Advance(time.Minute + 5*time.Second)
By("Advancing to the to the next minute to trigger a snapshot")
advanceToTheNextMinute(timeSource)

By("Forcing a reconcile")
_, err := r.Reconcile(ctx, ctrl.Request{
Expand All @@ -94,42 +91,48 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
By("Checking that a snapshot has been created")
Expect(k8sClient.List(ctx, snapList, client.InNamespace(sched.Namespace))).Should(Succeed())
Expect(snapList.Items).To(HaveLen(1))
Expect(snapList.Items[0].OwnerReferences).To(HaveLen(1))
Expect(snapList.Items[0].OwnerReferences[0].Name).To(Equal(sched.Name))
Expect(metav1.IsControlledBy(&snapList.Items[0], sched))
})

It("should report the phase of the latest snapshot", func() {
By("Getting the latest snapshot")
snapList := &crdv1beta1.QuestDBSnapshotList{}
Expect(k8sClient.List(ctx, snapList, client.InNamespace(sched.Namespace))).Should(Succeed())
Expect(snapList.Items).To(HaveLen(1))

latestSnap := &snapList.Items[0]
snap := &crdv1beta1.QuestDBSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: sched.Name + "-" + timeSource.Now().Format("20060102150405"),
Namespace: sched.Namespace,
},
}

By("Waiting for the snapshot to become pending")
By("Waiting for the snapshot to become pending (since the snapshot controller is running in the background)")
Eventually(func(g Gomega) {
k8sClient.Get(ctx, client.ObjectKeyFromObject(latestSnap), latestSnap)
g.Expect(latestSnap.Status.Phase).To(Equal(crdv1beta1.SnapshotPending))
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(snap), snap)).To(Succeed())
g.Expect(snap.Status.Phase).To(Equal(crdv1beta1.SnapshotPending))
}, timeout, interval).Should(Succeed())

By("Setting the snapshot to succeeded")
latestSnap.Status.Phase = crdv1beta1.SnapshotSucceeded
Expect(k8sClient.Status().Update(ctx, latestSnap)).To(Succeed())
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(snap), snap)).To(Succeed())
snap.Status.Phase = crdv1beta1.SnapshotSucceeded
Expect(k8sClient.Status().Update(ctx, snap)).To(Succeed())
}, timeout, interval).Should(Succeed())

By("Advancing time a few milliseconds")
timeSource.Advance(time.Millisecond * 2)

By("Forcing a reconcile")
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(sched),
})
Expect(err).ToNot(HaveOccurred())

By("Checking that the status has been updated")
By("Checking that the schedule status has been updated")
Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(sched), sched)).To(Succeed())
Expect(sched.Status.SnapshotPhase).To(Equal(crdv1beta1.SnapshotSucceeded))
})

It("should take a second snapshot if the cron schedule has triggered", func() {
By("Bumping the clock more than 1 minute")
timeSource.Advance(time.Minute + 5*time.Second)
By("Advancing the clock to the next minute")
advanceToTheNextMinute(timeSource)

By("Forcing a reconcile")
_, err := r.Reconcile(ctx, ctrl.Request{
Expand All @@ -139,9 +142,13 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {

By("Checking that a snapshot has been created")
Eventually(func(g Gomega) {
snapList := &crdv1beta1.QuestDBSnapshotList{}
g.Expect(k8sClient.List(ctx, snapList, client.InNamespace(sched.Namespace))).Should(Succeed())
g.Expect(snapList.Items).To(HaveLen(2))
snap := &crdv1beta1.QuestDBSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: sched.Name + "-" + timeSource.Now().Format("20060102150405"),
Namespace: sched.Namespace,
},
}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(snap), snap)).To(Succeed())
}, timeout, interval).Should(Succeed())
})

Expand Down Expand Up @@ -169,8 +176,8 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
latestSnap.Status.Phase = crdv1beta1.SnapshotSucceeded
Expect(k8sClient.Status().Update(ctx, latestSnap)).To(Succeed())

By("Advancing time a few seconds")
timeSource.Advance(5 * time.Second)
By("Advancing time a few milliseconds to avoid creating a new snapshot")
timeSource.Advance(5 * time.Millisecond)

By("Forcing a reconcile")
_, err := r.Reconcile(ctx, ctrl.Request{
Expand All @@ -188,6 +195,12 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
})

It("should only report the status of snapshots owned by it", func() {
var (
sched *crdv1beta1.QuestDBSnapshotSchedule
r *QuestDBSnapshotScheduleReconciler
q *crdv1beta1.QuestDB
)

r = &QuestDBSnapshotScheduleReconciler{
Client: k8sClient,
Scheme: scheme.Scheme,
Expand All @@ -207,7 +220,7 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
Spec: crdv1beta1.QuestDBSnapshotScheduleSpec{
Snapshot: crdv1beta1.QuestDBSnapshotSpec{
QuestDBName: q.Name,
VolumeSnapshotClassName: pointer.String("csi-hostpath-snapclass"),
VolumeSnapshotClassName: pointer.String(testutils.SnapshotClassName),
},
Schedule: "*/1 * * * *",
},
Expand All @@ -226,13 +239,16 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
}
Expect(k8sClient.Create(ctx, snap)).To(Succeed())

By("Advancing time at least a minute to trigger a reconcile")
r.TimeSource.(*abtime.ManualTime).Advance(time.Minute + 5*time.Second)

By("Forcing a reconcile")
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(sched),
})
Expect(err).ToNot(HaveOccurred())

By("Ensuring that the status of the one-off snapshot is not reported")
By("Ensuring that the status of the one-off snapshot status is not reported")
Eventually(func(g Gomega) {
// Ensure that the one-off snapshot is pending
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(snap), snap)).To(Succeed())
Expand All @@ -245,7 +261,13 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
})

It("Should only garbage collect succeeded snapshots", func() {
var retention int32 = 5
var (
retention int32 = 5
sched *crdv1beta1.QuestDBSnapshotSchedule
r *QuestDBSnapshotScheduleReconciler
q *crdv1beta1.QuestDB
timeSource *abtime.ManualTime
)

r = &QuestDBSnapshotScheduleReconciler{
Client: k8sClient,
Expand All @@ -257,7 +279,7 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
By("Creating a QuestDB")
q = testutils.BuildAndCreateMockQuestDB(ctx, k8sClient)

By("Creating a QuestDBSnapshotSchedule")
By("Creating a QuestDBSnapshotSchedule with a nonexistant snapshot class to fail all snapshots")
sched = &crdv1beta1.QuestDBSnapshotSchedule{
ObjectMeta: metav1.ObjectMeta{
Name: q.Name,
Expand All @@ -266,66 +288,69 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
Spec: crdv1beta1.QuestDBSnapshotScheduleSpec{
Snapshot: crdv1beta1.QuestDBSnapshotSpec{
QuestDBName: q.Name,
VolumeSnapshotClassName: pointer.String("csi-hostpath-snapclass"),
VolumeSnapshotClassName: pointer.String("this-snapshot-class-does-not-exist"),
},
Schedule: "*/1 * * * *",
Retention: retention,
},
}
Expect(k8sClient.Create(ctx, sched)).To(Succeed())
r.TimeSource = abtime.NewManualAtTime(sched.CreationTimestamp.Time)
timeSource = r.TimeSource.(*abtime.ManualTime)

By("Advancing time enough to create retention * 2 snapshots and failing them all")
By("Advancing to the next minute for determinism")
advanceToTheNextMinute(timeSource)

By("Advancing time by minute enough to create retention * 2 snapshots")
for i := int32(0); i < retention*2; i++ {
r.TimeSource.(*abtime.ManualTime).Advance(time.Minute)

timeSource.Advance(time.Minute)
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(sched),
})
Expect(err).ToNot(HaveOccurred())

// Wait for the snapshot status to hit pending, then fail it
expectedSnapshotName := sched.Name + "-" + r.TimeSource.Now().Format("20060102150405")
// Wait for the snapshot to transition to Failed before advancing time again
snap := &crdv1beta1.QuestDBSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: sched.Name + "-" + r.TimeSource.Now().Format("20060102150405"),
Namespace: sched.Namespace,
},
}
Eventually(func(g Gomega) {
snap := &crdv1beta1.QuestDBSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: expectedSnapshotName,
Namespace: sched.Namespace,
},
}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(snap), snap)).To(Succeed())
g.Expect(snap.Status.Phase).To(Equal(crdv1beta1.SnapshotPending))

snap.Status.Phase = crdv1beta1.SnapshotFailed
g.Expect(k8sClient.Status().Update(ctx, snap)).To(Succeed())
g.Expect(snap.Status.Phase).To(Equal(crdv1beta1.SnapshotFailed))
}, timeout, interval).Should(Succeed())

}

By("Check that all snapshots are still there")
By("Getting all snapshots and ensuring that they are all failed")
snapList := &crdv1beta1.QuestDBSnapshotList{}
Expect(k8sClient.List(ctx, snapList, client.InNamespace(sched.Namespace))).Should(Succeed())
Expect(snapList.Items).To(HaveLen(int(retention * 2)))
Eventually(func(g Gomega) {
g.Expect(k8sClient.List(ctx, snapList, client.InNamespace(sched.Namespace))).Should(Succeed())
g.Expect(snapList.Items).To(HaveLen(int(retention * 2)))
for _, snap := range snapList.Items {
g.Expect(snap.Status.Phase).To(Equal(crdv1beta1.SnapshotFailed))
}
}, timeout, interval).Should(Succeed())

By("Set retention + 1 snapshots to succeeded and see the list shrink by 1")
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var err error
Expect(k8sClient.List(ctx, snapList, client.InNamespace(sched.Namespace))).Should(Succeed())
for i, snap := range snapList.Items {
if i < int(retention+1) {
snap.Status.Phase = crdv1beta1.SnapshotSucceeded
err = k8sClient.Status().Update(ctx, &snap)
if err != nil {
return err
}
}
By("Set retention + 1 snapshots to succeeded and delete their finalizers")
for idx := range snapList.Items {
if idx < int(retention+1) {
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(&snapList.Items[idx]), &snapList.Items[idx])).To(Succeed())
snapList.Items[idx].Status.Phase = crdv1beta1.SnapshotSucceeded
snapList.Items[idx].Finalizers = []string{}
g.Expect(k8sClient.Status().Update(ctx, &snapList.Items[idx])).To(Succeed())
}, timeout, interval).Should(Succeed())
}
return nil
})
Expect(err).ToNot(HaveOccurred())
}

By("Advancing time a small amount to not trigger another snapshot creation")
r.TimeSource.(*abtime.ManualTime).Advance(time.Second)
r.TimeSource.(*abtime.ManualTime).Advance(5 * time.Millisecond)

By("Forcing a reconcile")
_, err = r.Reconcile(ctx, ctrl.Request{
_, err := r.Reconcile(ctx, ctrl.Request{
NamespacedName: client.ObjectKeyFromObject(sched),
})
Expect(err).ToNot(HaveOccurred())
Expand All @@ -337,3 +362,9 @@ var _ = Describe("QuestDBSnapshotSchedule Controller", func() {
})

})

func advanceToTheNextMinute(timeSource *abtime.ManualTime) {
nextMinute := timeSource.Now().Add(time.Minute).Truncate(time.Minute)
timeToNextMinute := nextMinute.Sub(timeSource.Now())
timeSource.Advance(timeToNextMinute)
}