Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ All notable changes to this project will be documented in this file.

- [BREAKING]: Renamed global `config` to `clusterConfig` ([#417])
- [BREAKING]: Moved `zookeeper_configmap_name` to `clusterConfig` ([#417])
- [BREAKING] Support specifying Service type.
This enables us to later switch non-breaking to using `ListenerClasses` for the exposure of Services.
This change is breaking, because - for security reasons - we default to the `cluster-internal` `ListenerClass`.
If you need your cluster to be accessible from outside of Kubernetes you need to set `clusterConfig.listenerClass`
to `external-unstable` ([#449]).
- `operator-rs` `0.33.0` -> `0.39.0` ([#418], [#447])

[#417]: https://github.com/stackabletech/nifi-operator/pull/417
Expand All @@ -24,6 +29,8 @@ All notable changes to this project will be documented in this file.
[#436]: https://github.com/stackabletech/nifi-operator/pull/436
[#447]: https://github.com/stackabletech/nifi-operator/pull/447
[#448]: https://github.com/stackabletech/nifi-operator/pull/448
[#449]: https://github.com/stackabletech/nifi-operator/pull/449
[#451]: https://github.com/stackabletech/nifi-operator/pull/451

## [23.1.0] - 2023-01-23

Expand Down
12 changes: 12 additions & 0 deletions deploy/helm/nifi-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,18 @@ spec:
- name
type: object
type: array
listenerClass:
default: cluster-internal
description: |-
In the future this setting will control, which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service. Currently only a subset of the ListenerClasses are supported by choosing the type of the created Services by looking at the ListenerClass name specified, In a future release support for custom ListenerClasses will be introduced without a breaking change:

* cluster-internal: Use a ClusterIP service

* external-unstable: Use a NodePort service
enum:
- cluster-internal
- external-unstable
type: string
sensitiveProperties:
description: Configuration options for how NiFi encrypts sensitive properties on disk
properties:
Expand Down
32 changes: 32 additions & 0 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use stackable_operator::{
schemars::{self, JsonSchema},
status::condition::{ClusterCondition, HasStatusCondition},
};
use strum::Display;

pub const APP_NAME: &str = "nifi";

Expand Down Expand Up @@ -110,6 +111,37 @@ pub struct NifiClusterConfig {
/// These volumes will be mounted below `/stackable/userdata/{volumename}`
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub extra_volumes: Vec<Volume>,
/// In the future this setting will control, which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
/// will be used to expose the service.
/// Currently only a subset of the ListenerClasses are supported by choosing the type of the created Services
/// by looking at the ListenerClass name specified,
/// In a future release support for custom ListenerClasses will be introduced without a breaking change:
///
/// * cluster-internal: Use a ClusterIP service
///
/// * external-unstable: Use a NodePort service
#[serde(default)]
pub listener_class: CurrentlySupportedListenerClasses,
}

// TODO: Temporary solution until listener-operator is finished
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "PascalCase")]
pub enum CurrentlySupportedListenerClasses {
#[default]
#[serde(rename = "cluster-internal")]
ClusterInternal,
#[serde(rename = "external-unstable")]
ExternalUnstable,
}

impl CurrentlySupportedListenerClasses {
pub fn k8s_service_type(&self) -> String {
match self {
CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(),
CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(),
}
}
}

#[derive(Clone, Debug, Default, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
Expand Down
100 changes: 54 additions & 46 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use stackable_operator::{
batch::v1::{Job, JobSpec},
core::v1::{
CSIVolumeSource, ConfigMap, ConfigMapKeySelector, ConfigMapVolumeSource,
EmptyDirVolumeSource, EnvVar, EnvVarSource, Node, NodeAddress, ObjectFieldSelector,
EmptyDirVolumeSource, EnvVar, EnvVarSource, Node, ObjectFieldSelector,
PodSecurityContext, Probe, Secret, SecretVolumeSource, Service, ServicePort,
ServiceSpec, TCPSocketAction, Volume,
},
Expand Down Expand Up @@ -54,10 +54,11 @@ use strum::{EnumDiscriminants, IntoStaticStr};
use tracing::Instrument;

use stackable_nifi_crd::{
authentication::ResolvedAuthenticationMethod, Container, NifiCluster, NifiConfig,
NifiConfigFragment, NifiRole, NifiStatus, APP_NAME, BALANCE_PORT, BALANCE_PORT_NAME,
HTTPS_PORT, HTTPS_PORT_NAME, LOG_VOLUME_SIZE_IN_MIB, METRICS_PORT, METRICS_PORT_NAME,
PROTOCOL_PORT, PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR,
authentication::ResolvedAuthenticationMethod, Container, CurrentlySupportedListenerClasses,
NifiCluster, NifiConfig, NifiConfigFragment, NifiRole, NifiStatus, APP_NAME, BALANCE_PORT,
BALANCE_PORT_NAME, HTTPS_PORT, HTTPS_PORT_NAME, LOG_VOLUME_SIZE_IN_MIB, METRICS_PORT,
METRICS_PORT_NAME, PROTOCOL_PORT, PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR,
STACKABLE_LOG_DIR,
};

use crate::config::{
Expand Down Expand Up @@ -505,15 +506,18 @@ pub fn build_node_role_service(
))
.build(),
spec: Some(ServiceSpec {
type_: Some(nifi.spec.cluster_config.listener_class.k8s_service_type()),
ports: Some(vec![ServicePort {
name: Some(HTTPS_PORT_NAME.to_string()),
port: HTTPS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]),
selector: Some(role_selector_labels(nifi, APP_NAME, &role_name)),
type_: Some("NodePort".to_string()),
external_traffic_policy: Some("Local".to_string()),
external_traffic_policy: match nifi.spec.cluster_config.listener_class {
CurrentlySupportedListenerClasses::ClusterInternal => None,
CurrentlySupportedListenerClasses::ExternalUnstable => Some("Local".to_string()),
},
..ServiceSpec::default()
}),
status: None,
Expand Down Expand Up @@ -627,6 +631,8 @@ fn build_node_rolegroup_service(
.with_label("prometheus.io/scrape", "true")
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(vec![
ServicePort {
Expand Down Expand Up @@ -1302,46 +1308,48 @@ async fn get_proxy_hosts(
nifi: &NifiCluster,
nifi_service: &Service,
) -> Result<String> {
let selector = LabelSelector {
match_labels: {
let mut labels = BTreeMap::new();
labels.insert("kubernetes.io/os".to_string(), "linux".to_string());
Some(labels)
},
..LabelSelector::default()
};

let external_port = external_node_port(nifi_service)?;

let cluster_nodes = client
.list_with_label_selector::<Node>(&(), &selector)
.await
.with_context(|_| MissingNodesSnafu {
obj_ref: ObjectRef::from_obj(nifi),
selector,
})?;

// We need the addresses of all nodes to add these to the NiFi proxy setting
// Since there is no real convention about how to label these addresses we will simply
// take all published addresses for now to be on the safe side.
let mut proxy_setting = cluster_nodes
.into_iter()
.flat_map(|node| {
node.status
.unwrap_or_default()
.addresses
.unwrap_or_default()
})
.collect::<Vec<NodeAddress>>()
.iter()
.map(|node_address| format!("{}:{}", node_address.address, external_port))
.collect::<Vec<_>>();
let mut proxy_setting = vec![nifi
.node_role_service_fqdn()
.context(NoRoleServiceFqdnSnafu)?];

// Also add the loadbalancer service
proxy_setting.push(
nifi.node_role_service_fqdn()
.context(NoRoleServiceFqdnSnafu)?,
);
// In case NodePort is used add them as well
if nifi.spec.cluster_config.listener_class
== CurrentlySupportedListenerClasses::ExternalUnstable
{
let selector = LabelSelector {
match_labels: {
let mut labels = BTreeMap::new();
labels.insert("kubernetes.io/os".to_string(), "linux".to_string());
Some(labels)
},
..LabelSelector::default()
};

let external_port = external_node_port(nifi_service)?;

let cluster_nodes = client
.list_with_label_selector::<Node>(&(), &selector)
.await
.with_context(|_| MissingNodesSnafu {
obj_ref: ObjectRef::from_obj(nifi),
selector,
})?;

// We need the addresses of all nodes to add these to the NiFi proxy setting
// Since there is no real convention about how to label these addresses we will simply
// take all published addresses for now to be on the safe side.
proxy_setting.extend(
cluster_nodes
.into_iter()
.flat_map(|node| {
node.status
.unwrap_or_default()
.addresses
.unwrap_or_default()
})
.map(|node_address| format!("{}:{}", node_address.address, external_port)),
);
}

Ok(proxy_setting.join(","))
}
Expand Down
3 changes: 2 additions & 1 deletion tests/templates/kuttl/smoke/01-install-zk.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ spec:
image:
productVersion: "{{ test_scenario['values']['zookeeper'].split('-stackable')[0] }}"
stackableVersion: "{{ test_scenario['values']['zookeeper'].split('-stackable')[1] }}"
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
clusterConfig:
listenerClass: {{ test_scenario['values']['listener-class'] }}
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
logging:
vectorAggregatorConfigMapName: vector-aggregator-discovery
{% endif %}
Expand Down
4 changes: 2 additions & 2 deletions tests/templates/kuttl/smoke/02-install-nifi.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ spec:
productVersion: "{{ test_scenario['values']['nifi'].split('-stackable')[0] }}"
stackableVersion: "{{ test_scenario['values']['nifi'].split('-stackable')[1] }}"
clusterConfig:
zookeeperConfigMapName: test-nifi-znode
listenerClass: {{ test_scenario['values']['listener-class'] }}
authentication:
method:
singleUser:
Expand All @@ -32,12 +34,10 @@ spec:
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
vectorAggregatorConfigMapName: vector-aggregator-discovery
{% endif %}
zookeeperConfigMapName: test-nifi-znode
nodes:
config:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
roleGroups:
default:
config: {}
replicas: 2
6 changes: 6 additions & 0 deletions tests/test-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ dimensions:
values:
- "false"
- "true"
# Used for both, zookeeper and nifi
- name: listener-class
values:
- "cluster-internal"
- "external-unstable"
tests:
- name: upgrade
dimensions:
Expand All @@ -42,6 +47,7 @@ tests:
dimensions:
- nifi
- zookeeper
- listener-class
- name: resources
dimensions:
- nifi
Expand Down