Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If issuing a restart or updating an image, do that first before applying anything else. #698

Merged
merged 5 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tembo-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tembo-operator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "controller"
description = "Tembo Operator for Postgres"
version = "0.44.0"
version = "0.44.1"
edition = "2021"
default-run = "controller"
license = "Apache-2.0"
Expand Down
134 changes: 45 additions & 89 deletions tembo-operator/src/cloudnativepg/cnpg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
ClusterServiceAccountTemplate, ClusterServiceAccountTemplateMetadata, ClusterSpec,
ClusterStorage, ClusterSuperuserSecret,
},
cnpg_utils::{is_image_updated, patch_cluster, restart_and_wait_for_restart},
poolers::{
Pooler, PoolerCluster, PoolerPgbouncer, PoolerSpec, PoolerTemplate, PoolerTemplateSpec,
PoolerTemplateSpecContainers, PoolerType,
Expand All @@ -55,13 +56,11 @@ use crate::{
configmap::custom_metrics_configmap_settings,
errors::ValueError,
is_postgres_ready,
patch_cdb_status_merge,
postgres_exporter::EXPORTER_CONFIGMAP_PREFIX,
psql::PsqlOutput,
// snapshots::volumesnapshots::reconcile_volume_snapshot_restore,
trunk::extensions_that_require_load,
Context,
RESTARTED_AT,
};
use chrono::{DateTime, NaiveDateTime, Offset};
use k8s_openapi::api::core::v1::Service;
Expand All @@ -73,7 +72,6 @@ use kube::{
runtime::{controller::Action, wait::Condition},
Api, Resource, ResourceExt,
};
use serde_json::json;
use std::{collections::BTreeMap, sync::Arc};
use tokio::time::Duration;
use tracing::{debug, error, info, instrument, warn};
Expand Down Expand Up @@ -634,6 +632,7 @@ fn default_cluster_annotations(cdb: &CoreDB) -> BTreeMap<String, String> {
annotations
}

#[instrument(skip(cdb), fields(trace_id, instance_name = %cdb.name_any()))]
pub fn cnpg_cluster_from_cdb(
cdb: &CoreDB,
fenced_pods: Option<Vec<String>>,
Expand Down Expand Up @@ -967,49 +966,19 @@ async fn pods_to_fence(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Vec<String>, A
}
}

// cdb: the CoreDB object
// maybe_cluster, Option<Cluster> of the current CNPG cluster, if it exists
// new_spec: the new Cluster spec to be applied
fn update_restarted_at(
cdb: &CoreDB,
maybe_cluster: Option<&Cluster>,
new_spec: &mut Cluster,
) -> bool {
let Some(cdb_restarted_at) = cdb.annotations().get(RESTARTED_AT) else {
// No need to update the annotation if it's not present in the CoreDB
return false;
};

// Remember the previous value of the annotation, if any
let previous_restarted_at =
maybe_cluster.and_then(|cluster| cluster.annotations().get(RESTARTED_AT));

// Forward the `restartedAt` annotation from CoreDB over to the CNPG cluster,
// does not matter if changed or not.
new_spec
.metadata
.annotations
.as_mut()
.map(|cluster_annotations| {
cluster_annotations.insert(RESTARTED_AT.into(), cdb_restarted_at.to_owned())
});

let restart_annotation_updated = previous_restarted_at != Some(cdb_restarted_at);

if restart_annotation_updated {
let name = new_spec.metadata.name.as_deref().unwrap_or("unknown");
info!("restartAt changed for cluster {name}, setting to {cdb_restarted_at}.");
}

restart_annotation_updated
}

#[instrument(skip(cdb, ctx) fields(trace_id, instance_name = %cdb.name_any()))]
pub async fn reconcile_cnpg(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Action> {
debug!("Getting name of cluster");
let name = cdb.name_any();

debug!("Getting namespace of cluster");
let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
error!("Namespace is empty for instance: {}.", cdb.name_any());
Action::requeue(tokio::time::Duration::from_secs(300))
})?;

let pods_to_fence = pods_to_fence(cdb, ctx.clone()).await?;
let requires_load =
extensions_that_require_load(ctx.client.clone(), &cdb.metadata.namespace.clone().unwrap())
.await?;
let requires_load = extensions_that_require_load(ctx.client.clone(), namespace).await?;

// TODO: reenable this once we have a work around for snapshots
// If we are restoring and have volume snapshots enabled, make sure we setup
Expand All @@ -1025,24 +994,41 @@ pub async fn reconcile_cnpg(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Actio
debug!("Generating CNPG spec");
let mut cluster = cnpg_cluster_from_cdb(cdb, Some(pods_to_fence), requires_load);

debug!("Getting namespace of cluster");
let namespace = cluster
.metadata
.namespace
.clone()
.expect("CNPG Cluster should always have a namespace");
debug!("Getting name of cluster");
let name = cluster
.metadata
.name
.clone()
.expect("CNPG Cluster should always have a name");

let cluster_api: Api<Cluster> = Api::namespaced(ctx.client.clone(), namespace.as_str());
let maybe_cluster = cluster_api.get(&name).await;

let restart_annotation_updated =
update_restarted_at(cdb, maybe_cluster.as_ref().ok(), &mut cluster);
// Check if we are updating the cluster to reboot/restart the instance, if so do that first before
// updating the cluster spec. Also check to see if the image is being updated. If do
// update the image first before updating the cluster spec.
if let Ok(ref cluster) = maybe_cluster {
warn!("Cluster exists, checking if restart is required");
restart_and_wait_for_restart(cdb, ctx.clone(), Some(cluster)).await?;
is_image_updated(cdb, ctx.clone(), Some(cluster)).await?;
}

// Check CoreDB status if status.running is false, return requeue
let coredb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
let update_coredb = coredb_api.get(&name).await.map_err(|e| {
error!("Error getting CoreDB: {}", e);
Action::requeue(Duration::from_secs(300))
})?;

// Check update_coredb status is running: false, return requeue
let current_status = match update_coredb.status {
Some(status) => Some(status),
None => {
warn!("CoreDB status is empty for instance: {}", &name);
None
}
};

// Check if the CoreDB status is running: false, return requeue
if let Some(status) = current_status {
if !status.running {
info!("CoreDB status.running is false, requeuing 10 seconds");
return Err(Action::requeue(Duration::from_secs(10)));
}
}

let mut _restart_required = false;

Expand Down Expand Up @@ -1151,37 +1137,7 @@ pub async fn reconcile_cnpg(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Actio
// Writes to objects with managed fields can be forced, in which case the value of any conflicted field will be overridden,
// and the ownership will be transferred.
// https://kubernetes.io/docs/reference/using-api/server-side-apply/
let ps = PatchParams::apply("cntrlr").force();

let _o = cluster_api
.patch(&name, &ps, &Patch::Apply(&cluster))
.await
.map_err(|e| {
error!("Error patching cluster: {}", e);
Action::requeue(Duration::from_secs(300))
})?;

// If we updated the restartedAt annotation, set `status.running` in CoreDB to false
if restart_annotation_updated {
let cdb_cluster: Api<CoreDB> = Api::namespaced(ctx.client.clone(), &namespace);
let cluster_name = &name;

patch_cdb_status_merge(
&cdb_cluster,
cluster_name,
json!({
"status": {
"running": false
}
}),
)
.await?;
info!(
"Updated status.running to false in {}, requeuing 10 seconds",
&name
);
return Err(Action::requeue(Duration::from_secs(10)));
}
patch_cluster(&cluster, ctx.clone(), cdb).await?;

reconcile_metrics_service(cdb, ctx.clone()).await?;
reconcile_metrics_ingress_route(cdb, ctx.clone()).await?;
Expand Down
Loading
Loading