Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file.
- Deploy default and support custom affinities ([#436])
- Added the ability to mount extra volumes for files that may be needed for NiFi processors to work ([#434])
- Extend cluster resources for status and cluster operation (paused, stopped) ([#447])
- Cluster status conditions ([#448])

### Changed

Expand All @@ -22,6 +23,7 @@ All notable changes to this project will be documented in this file.
[#434]: https://github.com/stackabletech/nifi-operator/pull/434
[#436]: https://github.com/stackabletech/nifi-operator/pull/436
[#447]: https://github.com/stackabletech/nifi-operator/pull/447
[#448]: https://github.com/stackabletech/nifi-operator/pull/448

## [23.1.0] - 2023-01-23

Expand Down
44 changes: 44 additions & 0 deletions deploy/helm/nifi-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2956,9 +2956,53 @@ spec:
status:
nullable: true
properties:
conditions:
items:
properties:
lastTransitionTime:
description: Last time the condition transitioned from one status to another.
format: date-time
nullable: true
type: string
lastUpdateTime:
description: The last time this condition was updated.
format: date-time
nullable: true
type: string
message:
description: A human readable message indicating details about the transition.
nullable: true
type: string
reason:
description: The reason for the condition's last transition.
nullable: true
type: string
status:
description: Status of the condition, one of True, False, Unknown.
enum:
- 'True'
- 'False'
- Unknown
type: string
type:
description: Type of deployment condition.
enum:
- Available
- Degraded
- Progressing
- ReconciliationPaused
- Stopped
type: string
required:
- status
- type
type: object
type: array
deployed_version:
nullable: true
type: string
required:
- conditions
type: object
required:
- spec
Expand Down
1 change: 1 addition & 0 deletions deploy/helm/nifi-operator/templates/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ rules:
resources:
- statefulsets
verbs:
- get
- create
- delete
- list
Expand Down
14 changes: 12 additions & 2 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::collections::BTreeMap;
use affinity::get_affinity;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_operator::k8s_openapi::api::core::v1::Volume;
use stackable_operator::{
commons::{
affinity::StackableAffinity,
Expand All @@ -23,12 +22,13 @@ use stackable_operator::{
fragment::{self, ValidationError},
merge::Merge,
},
k8s_openapi::apimachinery::pkg::api::resource::Quantity,
k8s_openapi::{api::core::v1::Volume, apimachinery::pkg::api::resource::Quantity},
kube::{runtime::reflector::ObjectRef, CustomResource, ResourceExt},
product_config_utils::{ConfigError, Configuration},
product_logging::{self, spec::Logging},
role_utils::{Role, RoleGroup, RoleGroupRef},
schemars::{self, JsonSchema},
status::condition::{ClusterCondition, HasStatusCondition},
};

pub const APP_NAME: &str = "nifi";
Expand Down Expand Up @@ -173,6 +173,16 @@ pub enum NifiRole {
#[derive(Clone, Debug, Default, Deserialize, JsonSchema, Serialize)]
pub struct NifiStatus {
pub deployed_version: Option<String>,
pub conditions: Vec<ClusterCondition>,
}

impl HasStatusCondition for NifiCluster {
fn conditions(&self) -> Vec<ClusterCondition> {
match &self.status {
Some(status) => status.conditions.clone(),
None => vec![],
}
}
}

#[derive(
Expand Down
59 changes: 41 additions & 18 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ use stackable_operator::{
},
},
role_utils::{Role, RoleGroupRef},
status::condition::{
compute_conditions, operations::ClusterOperationsConditionBuilder,
statefulset::StatefulSetConditionBuilder,
},
};
use strum::{EnumDiscriminants, IntoStaticStr};
use tracing::Instrument;
Expand Down Expand Up @@ -346,6 +350,8 @@ pub async fn reconcile_nifi(nifi: Arc<NifiCluster>, ctx: Arc<Ctx>) -> Result<Act
.await
.context(ResolveVectorAggregatorAddressSnafu)?;

let mut ss_cond_builder = StatefulSetConditionBuilder::default();

for (rolegroup_name, rolegroup_config) in nifi_node_config.iter() {
let rg_span = tracing::info_span!("rolegroup_span", rolegroup = rolegroup_name.as_str());
async {
Expand Down Expand Up @@ -412,12 +418,14 @@ pub async fn reconcile_nifi(nifi: Arc<NifiCluster>, ctx: Arc<Ctx>) -> Result<Act
.with_context(|_| ApplyRoleGroupConfigSnafu {
rolegroup: rolegroup.clone(),
})?;
cluster_resources
.add(client, rg_statefulset)
.await
.with_context(|_| ApplyRoleGroupStatefulSetSnafu {
rolegroup: rolegroup.clone(),
})?;
ss_cond_builder.add(
cluster_resources
.add(client, rg_statefulset)
.await
.with_context(|_| ApplyRoleGroupStatefulSetSnafu {
rolegroup: rolegroup.clone(),
})?,
);

client
.apply_patch(CONTROLLER_NAME, &reporting_task_job, &reporting_task_job)
Expand All @@ -439,20 +447,35 @@ pub async fn reconcile_nifi(nifi: Arc<NifiCluster>, ctx: Arc<Ctx>) -> Result<Act
.await
.context(DeleteOrphanedResourcesSnafu)?;

let cluster_operation_cond_builder =
ClusterOperationsConditionBuilder::new(&nifi.spec.cluster_operation);

let conditions = compute_conditions(
nifi.as_ref(),
&[&ss_cond_builder, &cluster_operation_cond_builder],
);

// Update the deployed product version in the status after everything has been deployed, unless
// we are still in the process of updating
if version_change != VersionChangeState::BeginChange {
client
.apply_patch_status(
CONTROLLER_NAME,
nifi.deref(),
&NifiStatus {
deployed_version: Some(resolved_product_image.product_version),
},
)
.await
.with_context(|_| StatusUpdateSnafu {})?;
}
let status = if version_change != VersionChangeState::BeginChange {
NifiStatus {
deployed_version: Some(resolved_product_image.product_version),
conditions,
}
} else {
NifiStatus {
deployed_version: nifi
.status
.as_ref()
.and_then(|status| status.deployed_version.clone()),
conditions,
}
};

client
.apply_patch_status(OPERATOR_NAME, &*nifi, &status)
.await
.context(StatusUpdateSnafu)?;

Ok(Action::await_change())
}
Expand Down
2 changes: 2 additions & 0 deletions tests/templates/kuttl/cluster_operation/20-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 1200
commands:
- script: kubectl -n $NAMESPACE wait --for=condition=available nificlusters.nifi.stackable.tech/test-nifi --timeout 1201s
---
apiVersion: apps/v1
kind: StatefulSet
Expand Down
4 changes: 3 additions & 1 deletion tests/templates/kuttl/cluster_operation/30-assert.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 1200
timeout: 300
commands:
- script: kubectl -n $NAMESPACE wait --for=condition=stopped nificlusters.nifi.stackable.tech/test-nifi --timeout 301s
---
apiVersion: apps/v1
kind: StatefulSet
Expand Down
4 changes: 3 additions & 1 deletion tests/templates/kuttl/cluster_operation/40-assert.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 1200
timeout: 300
commands:
- script: kubectl -n $NAMESPACE wait --for=condition=reconciliationPaused nificlusters.nifi.stackable.tech/test-nifi --timeout 301s
---
apiVersion: apps/v1
kind: StatefulSet
Expand Down
4 changes: 3 additions & 1 deletion tests/templates/kuttl/cluster_operation/50-assert.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 1200
timeout: 600
commands:
- script: kubectl -n $NAMESPACE wait --for=condition=available nificlusters.nifi.stackable.tech/test-nifi --timeout 601s
---
apiVersion: apps/v1
kind: StatefulSet
Expand Down