Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Add rolling upgrade support for upgrades between NiFi 2 versions ([#771]).

### Changed

- BREAKING: Replace stackable-operator `initialize_logging` with stackable-telemetry `Tracing` ([#767], [#776]).
Expand All @@ -21,6 +25,7 @@ All notable changes to this project will be documented in this file.
- Fix a bug where changes to ConfigMaps that are referenced in the NifiCluster spec didn't trigger a reconciliation ([#772]).

[#767]: https://github.com/stackabletech/nifi-operator/pull/767
[#771]: https://github.com/stackabletech/nifi-operator/pull/771
[#772]: https://github.com/stackabletech/nifi-operator/pull/772
[#774]: https://github.com/stackabletech/nifi-operator/pull/774
[#776]: https://github.com/stackabletech/nifi-operator/pull/776
Expand Down
10 changes: 8 additions & 2 deletions docs/modules/nifi/pages/usage_guide/updating.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ spec:

<1> Change the NiFi version here

WARNING: NiFi clusters cannot be upgraded or downgraded in a rolling fashion due to a limitation in NiFi.
Any change to the NiFi version in the CRD triggers a full cluster restart with brief downtime.
[WARNING]
====
NiFi clusters cannot be upgraded or downgraded in a rolling fashion due to a limitation in NiFi prior to version 2.

When upgrading between NiFi 1 versions or from NiFi 1 to NiFi 2, any change to the NiFi version in the CRD triggers a full cluster restart with brief downtime.
However, the Stackable image version can be updated in a rolling manner, provided the NiFi version remains unchanged.

For upgrades between NiFi 2 versions, e.g. from `2.0.0` to `2.2.0`, rolling upgrades are supported.
====

== NiFi 2.0.0

Before you can upgrade to `2.0.0` you https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance[need to update] to at least version 1.27.x!
133 changes: 49 additions & 84 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ use crate::{
STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, authentication::AuthenticationClassResolved,
v1alpha1,
},
operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs},
operations::{
graceful_shutdown::add_graceful_shutdown_config,
pdb::add_pdbs,
upgrade::{self, ClusterVersionUpdateState},
},
product_logging::extend_role_group_config_map,
reporting_task::{self, build_maybe_reporting_task, build_reporting_task_service_name},
security::{
Expand Down Expand Up @@ -346,6 +350,9 @@ pub enum Error {
AddVolumeMount {
source: builder::pod::container::Error,
},

#[snafu(display("Failed to determine the state of the version upgrade procedure"))]
ClusterVersionUpdateState { source: upgrade::Error },
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -356,13 +363,6 @@ impl ReconcilerError for Error {
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum VersionChangeState {
BeginChange,
Stopped,
NoChange,
}

pub async fn reconcile_nifi(
nifi: Arc<DeserializeGuard<v1alpha1::NifiCluster>>,
ctx: Arc<Ctx>,
Expand Down Expand Up @@ -391,78 +391,33 @@ pub async fn reconcile_nifi(
.await
.context(SecuritySnafu)?;

// Handle full restarts for a version change
let version_change = if let Some(deployed_version) = nifi
// If rolling upgrade is supported, kubernetes takes care of the cluster scaling automatically
// otherwise the operator handles it
// manage our own flow for upgrade from 1.x.x to 1.x.x/2.x.x
// TODO: this can be removed once 1.x.x is longer supported
let mut cluster_version_update_state = ClusterVersionUpdateState::NoVersionChange;
let deployed_version = nifi
.status
.as_ref()
.and_then(|status| status.deployed_version.as_ref())
{
if deployed_version != &resolved_product_image.product_version {
// Check if statefulsets are already scaled to zero, if not - requeue
let selector = LabelSelector {
match_expressions: None,
match_labels: Some(
Labels::role_selector(nifi, APP_NAME, &NifiRole::Node.to_string())
.context(LabelBuildSnafu)?
.into(),
),
};
.and_then(|status| status.deployed_version.as_ref());
let rolling_upgrade_supported = resolved_product_image.product_version.starts_with("2.")
&& deployed_version.is_some_and(|v| v.starts_with("2."));

// Retrieve the deployed statefulsets to check on the current status of the restart
let deployed_statefulsets = client
.list_with_label_selector::<StatefulSet>(namespace, &selector)
.await
.context(FetchStatefulsetsSnafu)?;

// Sum target replicas for all statefulsets
let target_replicas = deployed_statefulsets
.iter()
.filter_map(|statefulset| statefulset.spec.as_ref())
.filter_map(|spec| spec.replicas)
.sum::<i32>();

// Sum current ready replicas for all statefulsets
let current_replicas = deployed_statefulsets
.iter()
.filter_map(|statefulset| statefulset.status.as_ref())
.map(|status| status.replicas)
.sum::<i32>();

// If statefulsets have already been scaled to zero, but have remaining replicas
// we requeue to wait until a full stop has been performed.
if target_replicas == 0 && current_replicas > 0 {
tracing::info!(
"Cluster is performing a full restart at the moment and still shutting down, remaining replicas: [{}] - requeueing to wait for shutdown to finish",
current_replicas
);
return Ok(Action::await_change());
}
if !rolling_upgrade_supported {
cluster_version_update_state = upgrade::cluster_version_update_state(
nifi,
client,
&resolved_product_image.product_version,
deployed_version,
)
.await
.context(ClusterVersionUpdateStateSnafu)?;

// Otherwise we either still need to scale the statefulsets to 0 or all replicas have
// been stopped and we can restart the cluster.
// Both actions will be taken in the regular reconciliation, so we can simply continue
// here
if target_replicas > 0 {
tracing::info!(
"Version change detected, we'll need to scale down the cluster for a full restart."
);
VersionChangeState::BeginChange
} else {
tracing::info!("Cluster has been stopped for a restart, will scale back up.");
VersionChangeState::Stopped
}
} else {
// No version change detected, propagate this to the reconciliation
VersionChangeState::NoChange
if cluster_version_update_state == ClusterVersionUpdateState::UpdateInProgress {
return Ok(Action::await_change());
}
} else {
// No deployed version set in status, this is probably the first reconciliation ever
// for this cluster, so just let it progress normally
tracing::debug!(
"No deployed version found for this cluster, this is probably the first start, continue reconciliation"
);
VersionChangeState::NoChange
};
}
// end todo

let validated_config = validated_product_config(
nifi,
Expand Down Expand Up @@ -570,6 +525,14 @@ pub async fn reconcile_nifi(
)
.await?;

let role_group = role.role_groups.get(&rolegroup.role_group);
let replicas =
if cluster_version_update_state == ClusterVersionUpdateState::UpdateRequested {
Some(0)
} else {
role_group.and_then(|rg| rg.replicas).map(i32::from)
};

let rg_statefulset = build_node_rolegroup_statefulset(
nifi,
&resolved_product_image,
Expand All @@ -579,7 +542,8 @@ pub async fn reconcile_nifi(
rolegroup_config,
&merged_config,
&nifi_authentication_config,
&version_change,
rolling_upgrade_supported,
replicas,
&rbac_sa.name_any(),
)
.await?;
Expand Down Expand Up @@ -661,7 +625,7 @@ pub async fn reconcile_nifi(

// Update the deployed product version in the status after everything has been deployed, unless
// we are still in the process of updating
let status = if version_change != VersionChangeState::BeginChange {
let status = if cluster_version_update_state != ClusterVersionUpdateState::UpdateRequested {
NifiStatus {
deployed_version: Some(resolved_product_image.product_version),
conditions,
Expand Down Expand Up @@ -907,7 +871,8 @@ async fn build_node_rolegroup_statefulset(
rolegroup_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
merged_config: &NifiConfig,
nifi_auth_config: &NifiAuthenticationConfig,
version_change_state: &VersionChangeState,
rolling_update_supported: bool,
replicas: Option<i32>,
sa_name: &str,
) -> Result<StatefulSet> {
tracing::debug!("Building statefulset");
Expand Down Expand Up @@ -1391,11 +1356,7 @@ async fn build_node_rolegroup_statefulset(
.build(),
spec: Some(StatefulSetSpec {
pod_management_policy: Some("Parallel".to_string()),
replicas: if version_change_state == &VersionChangeState::BeginChange {
Some(0)
} else {
role_group.and_then(|rg| rg.replicas).map(i32::from)
},
replicas,
selector: LabelSelector {
match_labels: Some(
Labels::role_group_selector(
Expand All @@ -1412,7 +1373,11 @@ async fn build_node_rolegroup_statefulset(
service_name: rolegroup_ref.object_name(),
template: pod_template,
update_strategy: Some(StatefulSetUpdateStrategy {
type_: Some("OnDelete".to_string()),
type_: if rolling_update_supported {
Some("RollingUpdate".to_string())
} else {
Some("OnDelete".to_string())
},
..StatefulSetUpdateStrategy::default()
}),
volume_claim_templates: Some(vec![
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/operations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod graceful_shutdown;
pub mod pdb;
pub mod upgrade;
124 changes: 124 additions & 0 deletions rust/operator-binary/src/operations/upgrade.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// TODO: This module can be removed once we don't support NiFi 1.x versions anymore
// It manages the version upgrade procedure for NiFi versions prior to NiFi 2, since rolling upgrade is not supported there yet

use snafu::{OptionExt, ResultExt, Snafu};
use stackable_operator::{
client::Client,
k8s_openapi::{api::apps::v1::StatefulSet, apimachinery::pkg::apis::meta::v1::LabelSelector},
kvp::Labels,
};

use crate::crd::{APP_NAME, NifiRole, v1alpha1};

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("object defines no namespace"))]
ObjectHasNoNamespace,

#[snafu(display("failed to fetch deployed StatefulSets"))]
FetchStatefulsets {
source: stackable_operator::client::Error,
},

#[snafu(display("failed to build labels"))]
LabelBuild {
source: stackable_operator::kvp::LabelError,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;

// This struct is used for NiFi versions not supporting rolling upgrades since in that case
// we have to manage the restart process ourselves and need to track the state of it
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterVersionUpdateState {
UpdateRequested,
UpdateInProgress,
ClusterStopped,
NoVersionChange,
}

pub async fn cluster_version_update_state(
nifi: &v1alpha1::NifiCluster,
client: &Client,
resolved_version: &String,
deployed_version: Option<&String>,
) -> Result<ClusterVersionUpdateState> {
let namespace = &nifi
.metadata
.namespace
.clone()
.with_context(|| ObjectHasNoNamespaceSnafu {})?;

// Handle full restarts for a version change
match deployed_version {
Some(deployed_version) => {
if deployed_version != resolved_version {
// Check if statefulsets are already scaled to zero, if not - requeue
let selector = LabelSelector {
match_expressions: None,
match_labels: Some(
Labels::role_selector(nifi, APP_NAME, &NifiRole::Node.to_string())
.context(LabelBuildSnafu)?
.into(),
),
};

// Retrieve the deployed statefulsets to check on the current status of the restart
let deployed_statefulsets = client
.list_with_label_selector::<StatefulSet>(namespace, &selector)
.await
.context(FetchStatefulsetsSnafu)?;

// Sum target replicas for all statefulsets
let target_replicas = deployed_statefulsets
.iter()
.filter_map(|statefulset| statefulset.spec.as_ref())
.filter_map(|spec| spec.replicas)
.sum::<i32>();

// Sum current ready replicas for all statefulsets
let current_replicas = deployed_statefulsets
.iter()
.filter_map(|statefulset| statefulset.status.as_ref())
.map(|status| status.replicas)
.sum::<i32>();

// If statefulsets have already been scaled to zero, but have remaining replicas
// we requeue to wait until a full stop has been performed.
if target_replicas == 0 && current_replicas > 0 {
tracing::info!(
"Cluster is performing a full restart at the moment and still shutting down, remaining replicas: [{}] - requeueing to wait for shutdown to finish",
current_replicas
);
return Ok(ClusterVersionUpdateState::UpdateInProgress);
}

// Otherwise we either still need to scale the statefulsets to 0 or all replicas have
// been stopped and we can restart the cluster.
// Both actions will be taken in the regular reconciliation, so we can simply continue
// here
if target_replicas > 0 {
tracing::info!(
"Version change detected, we'll need to scale down the cluster for a full restart."
);
Ok(ClusterVersionUpdateState::UpdateRequested)
} else {
tracing::info!("Cluster has been stopped for a restart, will scale back up.");
Ok(ClusterVersionUpdateState::ClusterStopped)
}
} else {
// No version change detected, propagate this to the reconciliation
Ok(ClusterVersionUpdateState::NoVersionChange)
}
}
None => {
// No deployed version set in status, this is probably the first reconciliation ever
// for this cluster, so just let it progress normally
tracing::debug!(
"No deployed version found for this cluster, this is probably the first start, continue reconciliation"
);
Ok(ClusterVersionUpdateState::NoVersionChange)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ spec:
custom: "{{ test_scenario['values']['nifi-latest'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['nifi-latest'].split(',')[0] }}"
{% else %}
custom: null
productVersion: "{{ test_scenario['values']['nifi-latest'] }}"
{% endif %}
pullPolicy: IfNotPresent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ spec:
custom: "{{ test_scenario['values']['nifi-latest'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['nifi-latest'].split(',')[0] }}"
{% else %}
custom: null
productVersion: "{{ test_scenario['values']['nifi-latest'] }}"
{% endif %}
pullPolicy: IfNotPresent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ spec:
custom: "{{ test_scenario['values']['nifi-latest'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['nifi-latest'].split(',')[0] }}"
{% else %}
custom: null
productVersion: "{{ test_scenario['values']['nifi-latest'] }}"
{% endif %}
pullPolicy: IfNotPresent
Expand Down
Loading