Skip to content

Commit

Permalink
make sure we find the closest snapshot available when pitr is request…
Browse files Browse the repository at this point in the history
…ed (#622)
  • Loading branch information
nhudson committed Mar 7, 2024
1 parent 6972baa commit 9272a99
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 24 deletions.
2 changes: 1 addition & 1 deletion tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tembo-operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "controller"
description = "Tembo Operator for Postgres"
version = "0.37.13"
version = "0.38.0"
edition = "2021"
default-run = "controller"
license = "Apache-2.0"
Expand Down
156 changes: 134 additions & 22 deletions tembo-operator/src/snapshots/volumesnapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
},
Context,
};
use chrono::{DateTime, Utc};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::{
api::{ListParams, Patch, PatchParams},
Expand Down Expand Up @@ -398,7 +399,7 @@ fn generate_volume_snapshot(
// original instance you are restoring from
async fn lookup_volume_snapshot(cdb: &CoreDB, client: &Client) -> Result<VolumeSnapshot, Action> {
// name will be the name of the original instance
let name = cdb
let og_instance_name = cdb
.spec
.restore
.as_ref()
Expand All @@ -414,18 +415,36 @@ async fn lookup_volume_snapshot(cdb: &CoreDB, client: &Client) -> Result<VolumeS
// todo: This is a temporary fix to get the VolumeSnapshot from the same namespace as the
// instance you are attempting to restore from. We need to figure out a better way of
// doing this in case someone wants to name a namespace differently than the instance name.
let volume_snapshot_api: Api<VolumeSnapshot> = Api::namespaced(client.clone(), &name);

let label_selector = format!("cnpg.io/cluster={}", name);
let lp = ListParams::default().labels(&label_selector);
// At Tembo we assume that the namespace and the name of the instance name are the same.
let volume_snapshot_api: Api<VolumeSnapshot> =
Api::namespaced(client.clone(), &og_instance_name);

let label_selector = format!("cnpg.io/cluster={}", og_instance_name);
// Look for snapshots that are for the primary instance only, we currently do not
// support restoring from replicas
let field_selector = format!("metadata.annotations.cnpg.io/instanceRole={}", "primary");
let lp = ListParams::default()
.labels(&label_selector)
.fields(&field_selector);
let result = volume_snapshot_api.list(&lp).await.map_err(|e| {
error!("Error listing VolumeSnapshots for instance {}: {}", name, e);
error!(
"Error listing VolumeSnapshots for instance {}: {}",
og_instance_name, e
);
Action::requeue(tokio::time::Duration::from_secs(300))
})?;

// Set recovery_target_time if it's set in the CoreDB spec as DateTime<Utc>
let recovery_target_time: Option<DateTime<Utc>> = cdb
.spec
.restore
.as_ref()
.and_then(|r| r.recovery_target_time.as_deref())
.and_then(|time_str| DateTime::parse_from_rfc3339(time_str).ok())
.map(|dt| dt.with_timezone(&Utc));

// Filter snapshots that are ready to use and sort them by creation timestamp in descending order
let mut snapshots: Vec<VolumeSnapshot> = result
.items
let snapshots: Vec<VolumeSnapshot> = result
.into_iter()
.filter(|vs| {
vs.status
Expand All @@ -435,25 +454,60 @@ async fn lookup_volume_snapshot(cdb: &CoreDB, client: &Client) -> Result<VolumeS
})
.collect();

debug!("Found {} VolumeSnapshots for {}", snapshots.len(), name);

if snapshots.is_empty() {
error!("No ready VolumeSnapshots found for {}", name);
return Err(Action::requeue(tokio::time::Duration::from_secs(300)));
}

snapshots.sort_by(|a, b| {
b.metadata
.creation_timestamp
.cmp(&a.metadata.creation_timestamp)
});
let closest_snapshot_to_recovery_time = find_closest_snapshot(snapshots, recovery_target_time);

snapshots.first().cloned().ok_or_else(|| {
error!("Error getting first snapshot for instance {}", name);
closest_snapshot_to_recovery_time.ok_or_else(|| {
error!("No VolumeSnapshot found for instance {}", og_instance_name);
Action::requeue(tokio::time::Duration::from_secs(300))
})
}

fn find_closest_snapshot(
snapshots: Vec<VolumeSnapshot>,
recovery_target_time: Option<DateTime<Utc>>,
) -> Option<VolumeSnapshot> {
let transformed_snapshots: Vec<(Option<DateTime<Utc>>, VolumeSnapshot)> = snapshots
.into_iter()
.map(|snapshot| {
let end_time = snapshot
.metadata
.annotations
.as_ref()
.and_then(|ann| ann.get("cnpg.io/backupEndTime"))
.and_then(|end_time_str| DateTime::parse_from_rfc3339(end_time_str).ok())
.map(|dt| dt.with_timezone(&Utc));
(end_time, snapshot)
})
.collect();

match recovery_target_time {
Some(target_time) => {
// Filter snapshots to only include those before the target time
transformed_snapshots
.into_iter()
.filter_map(|(end_time, snapshot)| {
end_time.and_then(|end_time| {
if end_time <= target_time {
let duration = (target_time - end_time).num_seconds().abs();
Some((duration, snapshot))
} else {
None
}
})
})
.min_by_key(|(duration, _)| *duration)
.map(|(_, snapshot)| snapshot)
}
None => {
// When no recovery target time is specified, find the latest snapshot
transformed_snapshots
.into_iter()
.filter_map(|(end_time, snapshot)| end_time.map(|_| snapshot))
.max_by_key(|snapshot| snapshot.metadata.creation_timestamp.clone())
}
}
}

async fn lookup_volume_snapshot_content(
cdb: &CoreDB,
client: &Client,
Expand Down Expand Up @@ -496,7 +550,9 @@ mod tests {
VolumeSnapshotContentStatus,
},
};
use chrono::DateTime;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use std::collections::BTreeMap;

#[test]
fn test_generate_volume_snapshot_content() {
Expand Down Expand Up @@ -646,4 +702,60 @@ mod tests {
// The namespace of the generated VolumeSnapshot should match the namespace of the CoreDB
assert_eq!(result.metadata.namespace.unwrap(), "default");
}

fn create_volume_snapshot(name: &str, backup_end_time: &str) -> VolumeSnapshot {
let mut annotations = BTreeMap::new();
annotations.insert(
"cnpg.io/backupEndTime".to_string(),
backup_end_time.to_string(),
);

VolumeSnapshot {
metadata: ObjectMeta {
name: Some(name.to_string()),
annotations: Some(annotations),
..ObjectMeta::default()
},
spec: VolumeSnapshotSpec {
..VolumeSnapshotSpec::default()
},
status: None,
}
}

#[test]
fn test_find_closest_snapshot_pitr() {
let recovery_target_time_str = "2024-03-06T00:00:00Z";
let recovery_target_time = DateTime::parse_from_rfc3339(recovery_target_time_str).unwrap();

let snapshots = vec![
create_volume_snapshot("snapshot1", "2024-03-05T23:50:00Z"),
create_volume_snapshot("snapshot2", "2024-03-05T22:00:00Z"),
// (snapshot3) closest to target time
create_volume_snapshot("snapshot3", "2024-03-05T23:55:00Z"),
create_volume_snapshot("snapshot4", "2024-03-05T21:00:00Z"),
create_volume_snapshot("snapshot5", "2024-03-06T00:01:00Z"),
];

let closest_snapshot =
find_closest_snapshot(snapshots, Some(recovery_target_time.into())).unwrap();
assert_eq!(closest_snapshot.metadata.name.unwrap(), "snapshot3");
}

#[test]
fn test_find_latest_snapshot_when_target_time_empty() {
let recovery_target_time: Option<DateTime<Utc>> = None;
let snapshots = vec![
create_volume_snapshot("snapshot1", "2024-03-05T20:00:00Z"),
create_volume_snapshot("snapshot2", "2024-03-05T22:00:00Z"),
create_volume_snapshot("snapshot3", "2024-03-05T23:00:00Z"),
create_volume_snapshot("snapshot4", "2024-03-05T21:00:00Z"),
// (snapshot5) closest to target time/latest
create_volume_snapshot("snapshot5", "2024-03-06T00:01:00Z"),
];

// No recovery_target_time specified (None)
let closest_snapshot = find_closest_snapshot(snapshots, recovery_target_time).unwrap();
assert_eq!(closest_snapshot.metadata.name.unwrap(), "snapshot5");
}
}

0 comments on commit 9272a99

Please sign in to comment.