Skip to content

Commit

Permalink
add better handling of snapshot restores, better debug logging (#629)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhudson committed Mar 7, 2024
1 parent 9459860 commit b24dbea
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 21 deletions.
2 changes: 1 addition & 1 deletion conductor/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions conductor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,11 @@ async fn init_cloud_perms(
// use volume_snapshot_enabled feature flag to only enable for specific org_id's
let volume_snapshot_enabled = is_volume_snapshot_enabled(read_msg, &restore_spec);

info!(
"Volume snapshot restore is {} for instance_id {} in org_id: {}",
volume_snapshot_enabled, read_msg.message.inst_id, read_msg.message.org_id
);

// Ensure a restore spec exists, otherwise return an error
let restore =
coredb_spec
Expand Down Expand Up @@ -732,8 +737,8 @@ fn is_volume_snapshot_enabled(msg: &Message<CRUDevent>, cdb_spec: &CoreDBSpec) -

if orgs.contains(&msg.message.org_id.as_str()) {
info!(
"Volume snapshot restore enabled for org_id: {}",
msg.message.org_id
"Volume snapshot restore enabled for instance_id {} in org_id: {}",
msg.message.inst_id, msg.message.org_id
);
cdb_spec
.backup
Expand All @@ -742,8 +747,8 @@ fn is_volume_snapshot_enabled(msg: &Message<CRUDevent>, cdb_spec: &CoreDBSpec) -
.map_or(false, |vs| vs.enabled)
} else {
info!(
"Volume snapshot restore disabled for org_id: {}",
msg.message.org_id
"Volume snapshot restore disabled for instance_id {} in org_id: {}",
msg.message.inst_id, msg.message.org_id
);
false
}
Expand Down
2 changes: 1 addition & 1 deletion tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tembo-operator/src/cloudnativepg/cnpg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ pub async fn reconcile_cnpg(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Actio
// the VolumeSnapshotContent and VolumeSnapshot so that the Cluster will have
// something to restore from.
if let Some(restore) = &cdb.spec.restore {
if restore.volume_snapshot.is_some() {
if restore.volume_snapshot == Some(true) {
debug!("Reconciling VolumeSnapshotContent and VolumeSnapshot for restore");
reconcile_volume_snapshot_restore(cdb, ctx.clone()).await?;
}
Expand Down
61 changes: 47 additions & 14 deletions tembo-operator/src/snapshots/volumesnapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ async fn lookup_volume_snapshot(cdb: &CoreDB, client: &Client) -> Result<VolumeS
Action::requeue(tokio::time::Duration::from_secs(300))
})?;

debug!(
"Looking up VolumeSnapshots for original instance: {}",
og_instance_name
);

// todo: This is a temporary fix to get the VolumeSnapshot from the same namespace as the
// instance you are attempting to restore from. We need to figure out a better way of
// doing this in case someone wants to name a namespace differently than the instance name.
Expand All @@ -420,12 +425,7 @@ async fn lookup_volume_snapshot(cdb: &CoreDB, client: &Client) -> Result<VolumeS
Api::namespaced(client.clone(), &og_instance_name);

let label_selector = format!("cnpg.io/cluster={}", og_instance_name);
// Look for snapshots that are for the primary instance only, we currently do not
// support restoring from replicas
let field_selector = format!("metadata.annotations.cnpg.io/instanceRole={}", "primary");
let lp = ListParams::default()
.labels(&label_selector)
.fields(&field_selector);
let lp = ListParams::default().labels(&label_selector);
let result = volume_snapshot_api.list(&lp).await.map_err(|e| {
error!(
"Error listing VolumeSnapshots for instance {}: {}",
Expand All @@ -434,6 +434,12 @@ async fn lookup_volume_snapshot(cdb: &CoreDB, client: &Client) -> Result<VolumeS
Action::requeue(tokio::time::Duration::from_secs(300))
})?;

debug!(
"Found {} VolumeSnapshots for instance {}",
result.items.len(),
og_instance_name
);

// Set recovery_target_time if it's set in the CoreDB spec as DateTime<Utc>
let recovery_target_time: Option<DateTime<Utc>> = cdb
.spec
Expand All @@ -443,23 +449,50 @@ async fn lookup_volume_snapshot(cdb: &CoreDB, client: &Client) -> Result<VolumeS
.and_then(|time_str| DateTime::parse_from_rfc3339(time_str).ok())
.map(|dt| dt.with_timezone(&Utc));

debug!("Recovery target time: {:?}", recovery_target_time);

// Filter snapshots that are ready to use and sort them by creation timestamp in descending order
let snapshots: Vec<VolumeSnapshot> = result
.into_iter()
.filter(|vs| {
vs.status
.as_ref()
.map(|s| s.ready_to_use.unwrap_or(false))
.unwrap_or(false)
.map_or(false, |s| s.ready_to_use.unwrap_or(false))
})
.filter(|vs| {
vs.metadata
.annotations
.as_ref()
.and_then(|ann| ann.get("cnpg.io/instanceRole"))
.map_or(false, |role| role == "primary")
})
.collect();

let closest_snapshot_to_recovery_time = find_closest_snapshot(snapshots, recovery_target_time);

closest_snapshot_to_recovery_time.ok_or_else(|| {
error!("No VolumeSnapshot found for instance {}", og_instance_name);
Action::requeue(tokio::time::Duration::from_secs(300))
})
debug!(
"Filtered {} VolumeSnapshots for primary role and readiness",
snapshots.len()
);

match find_closest_snapshot(snapshots, recovery_target_time) {
Some(snapshot) => {
debug!(
"Selected VolumeSnapshot: {}",
snapshot
.metadata
.name
.as_deref()
.map_or("unknown", |name| name)
);
Ok(snapshot)
}
None => {
error!(
"No suitable VolumeSnapshot found for instance {}",
og_instance_name
);
Err(Action::requeue(tokio::time::Duration::from_secs(300)))
}
}
}

fn find_closest_snapshot(
Expand Down

0 comments on commit b24dbea

Please sign in to comment.