Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Added code to perform full cluster stop when changing the deployed NiFi version #323

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deploy/crd/nificluster.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,10 @@ spec:
type: object
status:
nullable: true
properties:
deployed_version:
nullable: true
type: string
type: object
required:
- spec
Expand Down
4 changes: 4 additions & 0 deletions deploy/helm/nifi-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,10 @@ spec:
type: object
status:
nullable: true
properties:
deployed_version:
nullable: true
type: string
type: object
required:
- spec
Expand Down
4 changes: 4 additions & 0 deletions deploy/manifests/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,10 @@ spec:
type: object
status:
nullable: true
properties:
deployed_version:
nullable: true
type: string
type: object
required:
- spec
Expand Down
42 changes: 41 additions & 1 deletion docs/modules/ROOT/pages/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,45 @@ With this authorization methid, a single user has administrator capabilites.
=== LDAP
The operator uses the https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#fileusergroupprovider[`FileUserGroupProvider`] and https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#fileaccesspolicyprovider[FileAccessPolicyProvider] to bind the LDAP user to the Nifi administrator group. This user is then able to create and modify groups and polices in the web interface. These changes local to the `Pod` running Nifi and are *not* persistent.

== Updating NiFi

Updating (or downgrading for that matter) the deployed version of NiFi is as simple as changing the version stated in the CRD.
Continuing the example above, to change the deployed version from `1.16.3` to `1.16.2` you'd simply deploy the following CRD.

[source,yaml]
----
apiVersion: nifi.stackable.tech/v1alpha1
kind: NifiCluster
metadata:
name: simple-nifi
spec:
version: 1.16.2-stackable0.1.0 # <1>
zookeeperConfigMapName: simple-nifi-znode
config:
authentication:
method:
singleUser:
adminCredentialsSecret: nifi-admin-credentials-simple
sensitiveProperties:
keySecret: nifi-sensitive-property-key
nodes:
roleGroups:
default:
selector:
matchLabels:
kubernetes.io/os: linux
config:
log:
rootLogLevel: INFO
replicas: 3
----

<1> Change the NiFi version here

IMPORTANT: Due to a limitation in NiFi itself it is not possible to up- or downgrade a NiFi cluster in a rolling fashion.
So any change to the NiFi version you make in this CRD will result in a full cluster restart with a short downtime.
This does not affect the stackable image version, this can be changed in a rolling fashion, as long as the underlying NiFi version remains unchanged.

== Monitoring

The managed NiFi cluster is automatically configured to export Prometheus metrics. See
Expand All @@ -126,7 +165,8 @@ xref:home:operators:monitoring.adoc[] for more details.

The cluster definition also supports overriding configuration properties and environment variables, either per role or per role group, where the more specific override (role group) has precedence over the less specific one (role).

IMPORTANT: Do not override port numbers. This will lead to cluster malfunction.
IMPORTANT: Do not override port numbers.
This will lead to cluster malfunction.

=== Configuration Overrides

Expand Down
4 changes: 2 additions & 2 deletions examples/simple-nifi-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ kind: NifiCluster
metadata:
name: simple-nifi
spec:
version: 1.16.3-stackable0.1.0
version: 1.15.0-stackable0.1.0
zookeeperConfigMapName: simple-nifi-znode
config:
authentication:
Expand Down Expand Up @@ -77,7 +77,7 @@ spec:
rootLogLevel: INFO
resources:
memory:
limit: "1Gi" # Option
limit: "2Gi" # Option
cpu:
min: "2" # Option
max: "3" # Option
Expand Down
12 changes: 8 additions & 4 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub mod authentication;
use std::collections::BTreeMap;

use crate::authentication::NifiAuthenticationConfig;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, Snafu};
use stackable_operator::commons::resources::{
Expand All @@ -15,7 +14,10 @@ use stackable_operator::{
role_utils::Role,
schemars::{self, JsonSchema},
};
use std::collections::BTreeMap;

use crate::authentication::NifiAuthenticationConfig;

pub mod authentication;

pub const APP_NAME: &str = "nifi";

Expand Down Expand Up @@ -138,7 +140,9 @@ pub enum NifiRole {
}

#[derive(Clone, Debug, Default, Deserialize, JsonSchema, Serialize)]
pub struct NifiStatus {}
pub struct NifiStatus {
pub deployed_version: Option<String>,
}

#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down
108 changes: 105 additions & 3 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Ensures that `Pod`s are configured and running for each [`NifiCluster`]
use std::ops::Deref;
use std::{
borrow::Cow,
collections::{BTreeMap, HashMap},
Expand Down Expand Up @@ -31,6 +32,7 @@ use stackable_operator::{
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
},
kube::{runtime::controller::Action, runtime::reflector::ObjectRef, Resource, ResourceExt},
labels,
labels::{role_group_selector_labels, role_selector_labels},
logging::controller::ReconcilerError,
product_config::{types::PropertyNameKind, ProductConfigManager},
Expand All @@ -41,7 +43,7 @@ use tracing::Instrument;

use stackable_nifi_crd::{
authentication::{get_auth_configs, get_auth_volumes},
NifiCluster, NifiConfig, NifiLogConfig, NifiRole, NifiStorageConfig, HTTPS_PORT,
NifiCluster, NifiConfig, NifiLogConfig, NifiRole, NifiStatus, NifiStorageConfig, HTTPS_PORT,
HTTPS_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, PROTOCOL_PORT, PROTOCOL_PORT_NAME,
};
use stackable_nifi_crd::{APP_NAME, BALANCE_PORT, BALANCE_PORT_NAME};
Expand Down Expand Up @@ -89,6 +91,11 @@ pub enum Error {
ApplyRoleService {
source: stackable_operator::error::Error,
},
#[snafu(display("failed to update status"))]
StatusUpdate {
source: stackable_operator::error::Error,
},

#[snafu(display("failed to check sensitive property key secret"))]
SensitiveKeySecret {
source: stackable_operator::error::Error,
Expand Down Expand Up @@ -178,6 +185,13 @@ impl ReconcilerError for Error {
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum VersionChangeState {
BeginChange,
Stopped,
NoChange,
}

pub async fn reconcile_nifi(nifi: Arc<NifiCluster>, ctx: Arc<Ctx>) -> Result<Action> {
tracing::info!("Starting reconcile");
let client = &ctx.client;
Expand All @@ -193,6 +207,72 @@ pub async fn reconcile_nifi(nifi: Arc<NifiCluster>, ctx: Arc<Ctx>) -> Result<Act
tracing::info!("Checking for sensitive key configuration");
check_or_generate_sensitive_key(client, &nifi).await?;

// Handle full restarts for a version change
let version_change = if let Some(deployed_version) = nifi
.status
.as_ref()
.and_then(|status| status.deployed_version.as_ref())
{
if deployed_version != nifi_product_version {
// Check if statefulsets are already scaled to zero, if not - requeue
let selector = LabelSelector {
match_expressions: None,
match_labels: Some(labels::role_selector_labels(
nifi.deref(),
APP_NAME,
&NifiRole::Node.to_string(),
)),
};

// Retrieve the deployed statefulsets to check on the current status of the restart
let deployed_statefulsets = client
.list_with_label_selector::<StatefulSet>(Some(namespace), &selector)
.await
.context(ApplyRoleServiceSnafu)?;

// Sum target replicas for all statefulsets
let target_replicas = deployed_statefulsets
.iter()
.filter_map(|statefulset| statefulset.spec.clone())
maltesander marked this conversation as resolved.
Show resolved Hide resolved
razvan marked this conversation as resolved.
Show resolved Hide resolved
.filter_map(|spec| spec.replicas)
.sum::<i32>();

// Sum current ready replicas for all statefulsets
let current_replicas = deployed_statefulsets
.iter()
.filter_map(|statefulset| statefulset.status.as_ref())
.map(|status| status.replicas)
.sum::<i32>();

// If statefulsets have already been scaled to zero, but have remaining replicas
// we requeue to wait until a full stop has been performed.
if target_replicas == 0 && current_replicas > 0 {
tracing::info!("Cluster is performing a full restart at the moment and still shutting down, remaining replicas: [{}] - requeueing to wait for shutdown to finish", current_replicas);
return Ok(Action::await_change());
}

// Otherwise we either still need to scale the statefulsets to 0 or all replicas have
// been stopped and we can restart the cluster.
// Both actions will be taken in the regular reconciliation, so we can simply continue
// here
if target_replicas > 0 {
tracing::info!("Version change detected, we'll need to scale down the cluster for a full restart.");
VersionChangeState::BeginChange
} else {
tracing::info!("Cluster has been stopped for a restart, will scale back up.");
VersionChangeState::Stopped
}
} else {
// No version change detected, propagate this to the reconciliation
VersionChangeState::NoChange
}
} else {
// No deployed version set in status, this is probably the first reconciliation ever
// for this cluster, so just let it progress normally
tracing::debug!("No deployed version found for this cluster, this is probably the first start, continue reconciliation");
VersionChangeState::NoChange
};

let validated_config = validated_product_config(
&nifi,
nifi_product_version,
Expand Down Expand Up @@ -269,6 +349,7 @@ pub async fn reconcile_nifi(nifi: Arc<NifiCluster>, ctx: Arc<Ctx>) -> Result<Act
&resource_definition,
&auth_volumes,
&additional_auth_args,
&version_change,
)
.await?;

Expand Down Expand Up @@ -318,12 +399,29 @@ pub async fn reconcile_nifi(nifi: Arc<NifiCluster>, ctx: Arc<Ctx>) -> Result<Act

// Remove any orphaned resources that still exist in k8s, but have not been added to
// the cluster resources during the reconciliation
// TODO: this doesn't cater for a graceful cluster shrink
// TODO: this doesn't cater for a graceful cluster shrink, for that we'd need to predict
// the resources that will be removed and run a disconnect/offload job for those
// see https://github.com/stackabletech/nifi-operator/issues/314
cluster_resources
.delete_orphaned_resources(client)
.await
.context(DeleteOrphanedResourcesSnafu)?;

// Update the deployed product version in the status after everything has been deployed, unless
// we are still in the process of updating
if version_change != VersionChangeState::BeginChange {
client
.apply_patch_status(
CONTROLLER_NAME,
nifi.deref(),
&NifiStatus {
deployed_version: Some(nifi_product_version.to_string()),
},
)
.await
.with_context(|_| StatusUpdateSnafu {})?;
}

Ok(Action::await_change())
}

Expand Down Expand Up @@ -578,6 +676,7 @@ fn resolve_resource_config_for_rolegroup(
///
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the
/// corresponding [`Service`] (from [`build_node_rolegroup_service`]).
#[allow(clippy::too_many_arguments)]
async fn build_node_rolegroup_statefulset(
nifi: &NifiCluster,
rolegroup_ref: &RoleGroupRef<NifiCluster>,
Expand All @@ -586,6 +685,7 @@ async fn build_node_rolegroup_statefulset(
resource_definition: &Resources<NifiStorageConfig>,
auth_volumes: &BTreeMap<String, (String, Volume)>,
additional_auth_args: &[String],
version_change_state: &VersionChangeState,
) -> Result<StatefulSet> {
tracing::debug!("Building statefulset");
let zookeeper_host = "ZOOKEEPER_HOSTS";
Expand Down Expand Up @@ -917,7 +1017,9 @@ async fn build_node_rolegroup_statefulset(
.build(),
spec: Some(StatefulSetSpec {
pod_management_policy: Some("Parallel".to_string()),
replicas: if nifi.spec.stopped.unwrap_or(false) {
replicas: if nifi.spec.stopped.unwrap_or(false)
|| version_change_state == &VersionChangeState::BeginChange
{
Some(0)
} else {
rolegroup.and_then(|rg| rg.replicas).map(i32::from)
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/kuttl/ldap/02-install-test-nifi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ spec:
spec:
containers:
- name: test-nifi
image: python:3.10-slim
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
stdin: true
tty: true
2 changes: 0 additions & 2 deletions tests/templates/kuttl/ldap/20-test-nifi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,3 @@ apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: kubectl cp -n $NAMESPACE ./test_nifi.py test-nifi-0:/tmp
- script: kubectl cp -n $NAMESPACE ./requirements.txt test-nifi-0:/tmp
- script: kubectl exec -n $NAMESPACE test-nifi-0 -- pip install --user -r /tmp/requirements.txt
1 change: 0 additions & 1 deletion tests/templates/kuttl/ldap/requirements.txt

This file was deleted.

2 changes: 1 addition & 1 deletion tests/templates/kuttl/smoke/03-install-test-nifi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ spec:
spec:
containers:
- name: test-nifi
image: python:3.10-slim
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
stdin: true
tty: true
2 changes: 0 additions & 2 deletions tests/templates/kuttl/smoke/04-prepare-test-nifi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,3 @@ commands:
- script: kubectl cp -n $NAMESPACE ./test_nifi_metrics.py test-nifi-0:/tmp
- script: kubectl cp -n $NAMESPACE ./test_nifi.py test-nifi-0:/tmp
- script: kubectl cp -n $NAMESPACE ./cacert.pem test-nifi-0:/tmp
- script: kubectl cp -n $NAMESPACE ./requirements.txt test-nifi-0:/tmp
- script: kubectl exec -n $NAMESPACE test-nifi-0 -- pip install --user -r /tmp/requirements.txt
1 change: 0 additions & 1 deletion tests/templates/kuttl/smoke/requirements.txt

This file was deleted.

12 changes: 12 additions & 0 deletions tests/templates/kuttl/upgrade/00-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 600
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: test-zk-server-default
status:
readyReplicas: 1
replicas: 1
22 changes: 22 additions & 0 deletions tests/templates/kuttl/upgrade/00-install-zk.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperCluster
metadata:
name: test-zk
spec:
servers:
roleGroups:
default:
replicas: 1
config:
myidOffset: 10
razvan marked this conversation as resolved.
Show resolved Hide resolved
version: {{ test_scenario['values']['zookeeper'] }}
stopped: false
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: test-nifi-znode
spec:
clusterRef:
name: test-zk
Loading