Skip to content

Commit

Permalink
[TEM-1207] move postgres-exporter out of statefulset and into deploym…
Browse files Browse the repository at this point in the history
…ent (#117)

* move postgres-exporter out of statefulset and into deployment

* fmt

* fix clippy warnings, add correct connection string

* change policy creation to be async

* fix fmt

* set postgres-exporter password as secret, refactor envvars, set role pass

* remove local debug line

* fixing tests

* add more tests to assert deployments and secrets

* adjust PolicyRule for postgres-exporter

* fmt

* fix possible naming collisions

* move test to not have a race condition

* more race condition fixes
  • Loading branch information
nhudson authored Jun 30, 2023
1 parent b3877ed commit aada866
Show file tree
Hide file tree
Showing 13 changed files with 574 additions and 332 deletions.
8 changes: 4 additions & 4 deletions coredb-operator/src/apis/postgres_parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ impl ConfigValue {
}

pub fn merge_pg_configs(
vec1: &Vec<PgConfig>,
vec2: &Vec<PgConfig>,
vec1: &[PgConfig],
vec2: &[PgConfig],
name: &str,
) -> Result<Option<PgConfig>, MergeError> {
let config1 = vec1.clone().into_iter().find(|config| config.name == name);
let config2 = vec2.clone().into_iter().find(|config| config.name == name);
let config1 = vec1.iter().cloned().find(|config| config.name == name);
let config2 = vec2.iter().cloned().find(|config| config.name == name);
match (config1, config2) {
(Some(mut c1), Some(c2)) => match c1.value.combine(c2.value) {
Ok(combined_value) => {
Expand Down
129 changes: 104 additions & 25 deletions coredb-operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@ use futures::{
};

use crate::{
apis::{
coredb_types::{CoreDB, CoreDBStatus},
postgres_parameters::reconcile_pg_parameters_configmap,
},
config::Config,
cronjob::reconcile_cronjob,
deployment_postgres_exporter::reconcile_prometheus_exporter,
exec::{ExecCommand, ExecOutput},
extensions::{reconcile_extensions, Extension},
ingress::reconcile_postgres_ing_route_tcp,
postgres_exporter::{create_postgres_exporter_role, reconcile_prom_configmap},
psql::{PsqlCommand, PsqlOutput},
rbac::reconcile_rbac,
secret::{reconcile_postgres_exporter_secret, reconcile_secret, PrometheusExporterSecretData},
service::reconcile_svc,
statefulset::{reconcile_sts, stateful_set_from_cdb},
};
Expand All @@ -25,18 +34,11 @@ use kube::{
Resource,
};

use crate::{
apis::{
coredb_types::{CoreDB, CoreDBStatus},
postgres_parameters::reconcile_pg_parameters_configmap,
},
extensions::{reconcile_extensions, Extension},
ingress::reconcile_postgres_ing_route_tcp,
postgres_exporter::{create_postgres_exporter_role, reconcile_prom_configmap},
secret::reconcile_secret,
};
use k8s_openapi::{
api::core::v1::{Namespace, Pod},
api::{
core::v1::{Namespace, Pod},
rbac::v1::PolicyRule,
},
apimachinery::pkg::util::intstr::IntOrString,
};
use kube::runtime::wait::Condition;
Expand Down Expand Up @@ -107,6 +109,49 @@ fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
Action::requeue(Duration::from_secs(5 * 60))
}

// Create role policy rulesets
async fn create_policy_rules(cdb: &CoreDB) -> Vec<PolicyRule> {
vec![
// This policy allows get, list, watch access to the coredb resource
PolicyRule {
api_groups: Some(vec!["coredb.io".to_owned()]),
resource_names: Some(vec![cdb.name_any()]),
resources: Some(vec!["coredbs".to_owned()]),
verbs: vec!["get".to_string(), "list".to_string(), "watch".to_string()],
..PolicyRule::default()
},
// This policy allows get, patch, update, watch access to the coredb/status resource
PolicyRule {
api_groups: Some(vec!["coredb.io".to_owned()]),
resource_names: Some(vec![cdb.name_any()]),
resources: Some(vec!["coredbs/status".to_owned()]),
verbs: vec![
"get".to_string(),
"patch".to_string(),
"update".to_string(),
"watch".to_string(),
],
..PolicyRule::default()
},
// This policy allows get, watch access to a secret in the namespace
PolicyRule {
api_groups: Some(vec!["".to_owned()]),
resource_names: Some(vec![format!("{}-connection", cdb.name_any())]),
resources: Some(vec!["secrets".to_owned()]),
verbs: vec!["get".to_string(), "watch".to_string()],
..PolicyRule::default()
},
// This policy for now is specifically open for all configmaps in the namespace
// We currently do not have any configmaps
PolicyRule {
api_groups: Some(vec!["".to_owned()]),
resources: Some(vec!["configmaps".to_owned()]),
verbs: vec!["get".to_string(), "watch".to_string()],
..PolicyRule::default()
},
]
}

impl CoreDB {
// Reconcile (for non-finalizer related changes)
async fn reconcile(&self, ctx: Arc<Context>, cfg: &Config) -> Result<Action, Action> {
Expand Down Expand Up @@ -152,17 +197,39 @@ impl CoreDB {
}

// reconcile service account, role, and role binding
reconcile_rbac(self, ctx.clone()).await.map_err(|e| {
error!("Error reconciling service account: {:?}", e);
Action::requeue(Duration::from_secs(300))
})?;
reconcile_rbac(self, ctx.clone(), None, create_policy_rules(self).await)
.await
.map_err(|e| {
error!("Error reconciling service account: {:?}", e);
Action::requeue(Duration::from_secs(300))
})?;

// reconcile secret
reconcile_secret(self, ctx.clone()).await.map_err(|e| {
error!("Error reconciling secret: {:?}", e);
Action::requeue(Duration::from_secs(300))
})?;

// reconcile postgres exporter secret
let secret_data: Option<PrometheusExporterSecretData> = if self.spec.postgresExporterEnabled {
let result = reconcile_postgres_exporter_secret(self, ctx.clone())
.await
.map_err(|e| {
error!("Error reconciling postgres exporter secret: {:?}", e);
Action::requeue(Duration::from_secs(300))
})?;

match result {
Some(data) => Some(data),
None => {
warn!("Secret already exists, no new password is generated");
None
}
}
} else {
None
};

// reconcile cronjob for backups
reconcile_cronjob(self, ctx.clone()).await.map_err(|e| {
error!("Error reconciling cronjob: {:?}", e);
Expand All @@ -183,6 +250,16 @@ impl CoreDB {
Action::requeue(Duration::from_secs(300))
})?;

// reconcile prometheus exporter deployment if enabled
if self.spec.postgresExporterEnabled {
reconcile_prometheus_exporter(self, ctx.clone())
.await
.map_err(|e| {
error!("Error reconciling prometheus exporter deployment: {:?}", e);
Action::requeue(Duration::from_secs(300))
})?;
};

// reconcile service
reconcile_svc(self, ctx.clone()).await.map_err(|e| {
error!("Error reconciling service: {:?}", e);
Expand Down Expand Up @@ -210,16 +287,18 @@ impl CoreDB {
}

// creating exporter role is pre-requisite to the postgres pod becoming "ready"
create_postgres_exporter_role(self, ctx.clone())
.await
.map_err(|e| {
error!(
"Error creating postgres_exporter on CoreDB {}, {}",
self.metadata.name.clone().unwrap(),
e
);
Action::requeue(Duration::from_secs(300))
})?;
if self.spec.postgresExporterEnabled {
create_postgres_exporter_role(self, ctx.clone(), secret_data)
.await
.map_err(|e| {
error!(
"Error creating postgres_exporter on CoreDB {}, {}",
self.metadata.name.clone().unwrap(),
e
);
Action::requeue(Duration::from_secs(300))
})?;
}

if !is_pod_ready().matches_object(Some(&primary_pod)) {
info!("Did not pod ready {}, waiting a short period", self.name_any());
Expand Down
137 changes: 5 additions & 132 deletions coredb-operator/src/cronjob.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{apis::coredb_types::CoreDB, Context, Error};
use crate::{apis::coredb_types::CoreDB, rbac::reconcile_rbac, Context, Error};
use k8s_openapi::{
api::{
batch::v1::{CronJob, CronJobSpec, JobSpec, JobTemplateSpec},
core::v1::{Container, PodSpec, PodTemplateSpec, ServiceAccount},
rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
core::v1::{Container, PodSpec, PodTemplateSpec},
rbac::v1::PolicyRule,
},
apimachinery::pkg::apis::meta::v1::ObjectMeta,
};
Expand All @@ -23,12 +23,7 @@ pub async fn reconcile_cronjob(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Er
labels.insert("app".to_owned(), "coredb".to_string());
labels.insert("coredb.io/name".to_owned(), cdb.name_any());

// create service account for cronjob
let sa = reconcile_service_account(cdb, ctx.clone()).await?;
// create role for cronjob
let role = reconcile_role(cdb, ctx.clone()).await?;
// create role binding for cronjob
reconcile_role_binding(cdb, ctx.clone(), &sa, role).await?;
let rbac = reconcile_rbac(cdb, ctx.clone(), Some("backup"), create_policy_rules().await).await?;

// reconcile cronjob
let cronjob_metadata = ObjectMeta {
Expand All @@ -39,7 +34,7 @@ pub async fn reconcile_cronjob(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Er
..ObjectMeta::default()
};

let sa_name = sa.metadata.name;
let sa_name = rbac.service_account.metadata.name;

// create spec for cronjob
let cj_spec = CronJobSpec {
Expand Down Expand Up @@ -91,128 +86,6 @@ pub async fn reconcile_cronjob(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Er
Ok(())
}

// reconcile a kubernetes role
async fn reconcile_service_account(cdb: &CoreDB, ctx: Arc<Context>) -> Result<ServiceAccount, Error> {
let client = ctx.client.clone();
let ns = cdb.namespace().unwrap();
let name = format!("{}-backup", cdb.name_any());
let sa_api: Api<ServiceAccount> = Api::namespaced(client.clone(), &ns);

let mut labels: BTreeMap<String, String> = BTreeMap::new();
labels.insert("app".to_owned(), "coredb".to_string());
labels.insert("coredb.io/name".to_owned(), cdb.name_any());

let mut sa_metadata = ObjectMeta {
name: Some(name.to_owned()),
namespace: Some(ns.to_owned()),
labels: Some(labels.clone()),
..ObjectMeta::default()
};

if let Some(ref template_metadata) = cdb.spec.serviceAccountTemplate.metadata {
if let Some(ref annotations) = template_metadata.annotations {
sa_metadata.annotations = Some(annotations.clone());
}
}

let sa = ServiceAccount {
metadata: sa_metadata,
..ServiceAccount::default()
};

let ps = PatchParams::apply("cntrlr").force();
let _o = sa_api
.patch(&name, &ps, &Patch::Apply(&sa))
.await
.map_err(Error::KubeError)?;

Ok(sa)
}

// reconcile a kubernetes role
async fn reconcile_role(cdb: &CoreDB, ctx: Arc<Context>) -> Result<Role, Error> {
let client = ctx.client.clone();
let ns = cdb.namespace().unwrap();
let name = format!("{}-backup", cdb.name_any());
let role_api: Api<Role> = Api::namespaced(client.clone(), &ns);

let mut labels: BTreeMap<String, String> = BTreeMap::new();
labels.insert("app".to_owned(), "coredb".to_string());
labels.insert("coredb.io/name".to_owned(), cdb.name_any());

let rules = create_policy_rules();

let role = Role {
metadata: ObjectMeta {
name: Some(name.to_owned()),
namespace: Some(ns.to_owned()),
labels: Some(labels.clone()),
..ObjectMeta::default()
},
rules: Some(rules.await),
};

let ps = PatchParams::apply("cntrlr").force();
let _o = role_api
.patch(&name, &ps, &Patch::Apply(&role))
.await
.map_err(Error::KubeError)?;

Ok(role)
}

async fn reconcile_role_binding(
cdb: &CoreDB,
ctx: Arc<Context>,
sa: &ServiceAccount,
role: Role,
) -> Result<(), Error> {
let client = ctx.client.clone();
let ns = cdb.namespace().unwrap();
let name = format!("{}-backup", cdb.name_any());
let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), &ns);
let sa_name = sa.name_any();
let role_name = role.name_any();

let mut labels: BTreeMap<String, String> = BTreeMap::new();
labels.insert("app".to_owned(), "coredb".to_string());
labels.insert("coredb.io/name".to_owned(), cdb.name_any());

let role_ref = RoleRef {
api_group: "rbac.authorization.k8s.io".to_string(),
kind: "Role".to_string(),
name: role_name.to_string(),
};

let subject = Subject {
kind: "ServiceAccount".to_string(),
name: sa_name.to_string(),
namespace: Some(ns.to_owned()),
..Subject::default()
};

let metadata = ObjectMeta {
name: Some(name.to_owned()),
namespace: Some(ns.to_owned()),
labels: Some(labels.clone()),
..ObjectMeta::default()
};

let rb = RoleBinding {
metadata,
role_ref,
subjects: Some(vec![subject]),
};

let ps = PatchParams::apply("cntrlr").force();
let _o = role_binding_api
.patch(&name, &ps, &Patch::Apply(&rb))
.await
.map_err(Error::KubeError)?;

Ok(())
}

// Create role policy rulesets
async fn create_policy_rules() -> Vec<PolicyRule> {
vec![
Expand Down
Loading

0 comments on commit aada866

Please sign in to comment.