From dbcfb2f00f8639f28a7d3c6f24b89d85928b63a0 Mon Sep 17 00:00:00 2001 From: dervoeti Date: Wed, 20 Sep 2023 14:16:58 +0200 Subject: [PATCH] Removed AirflowDB (#322) * Removed AirflowDB * Formatting fixes * Updated changelog * Removed "databaseInitialization" option * Update rust/operator-binary/src/airflow_controller.rs Co-authored-by: Malte Sander --------- Co-authored-by: Malte Sander --- CHANGELOG.md | 1 + deploy/helm/airflow-operator/crds/crds.yaml | 263 ----------- .../getting_started/code/getting_started.sh | 17 - .../code/getting_started.sh.j2 | 17 - .../pages/getting_started/first_steps.adoc | 29 +- .../airflow/pages/usage-guide/logging.adoc | 3 - rust/crd/src/airflowdb.rs | 202 --------- rust/crd/src/lib.rs | 26 +- .../operator-binary/src/airflow_controller.rs | 164 +------ .../src/airflow_db_controller.rs | 416 ------------------ rust/operator-binary/src/main.rs | 89 +--- rust/operator-binary/src/rbac.rs | 44 -- rust/operator-binary/src/util.rs | 33 +- .../04-install-airflow-cluster.yaml.j2 | 20 - 14 files changed, 45 insertions(+), 1279 deletions(-) delete mode 100644 rust/crd/src/airflowdb.rs delete mode 100644 rust/operator-binary/src/airflow_db_controller.rs delete mode 100644 rust/operator-binary/src/rbac.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 36951e5d..dc9317a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [BREAKING] Consolidated `spec.clusterConfig.authenticationConfig` to `spec.clusterConfig.authentication` which now takes a vector of AuthenticationClass references ([#303]). - `vector` `0.26.0` -> `0.31.0` ([#308]). - `operator-rs` `0.44.0` -> `0.45.1` ([#308]). +- [BREAKING] Removed AirflowDB object, since it created some problems when reinstalling or upgrading an Airflow cluster. Instead, the initialization of the database was moved to the startup phase of each scheduler pod. To make sure the initialization does not run in parallel, the `PodManagementPolicy` of the scheduler StatefulSet was set to `OrderedReady`. The `.spec.clusterConfig.databaseInitialization` option was removed from the CRD, since it was just there to enable logging for the database initialization Job, which doesn't exist anymore ([#322]). ### Fixed diff --git a/deploy/helm/airflow-operator/crds/crds.yaml b/deploy/helm/airflow-operator/crds/crds.yaml index 41b08f23..59bbe3be 100644 --- a/deploy/helm/airflow-operator/crds/crds.yaml +++ b/deploy/helm/airflow-operator/crds/crds.yaml @@ -6937,86 +6937,6 @@ spec: - repo type: object type: array - databaseInitialization: - nullable: true - properties: - logging: - default: - enableVectorAgent: null - containers: {} - properties: - containers: - additionalProperties: - anyOf: - - required: - - custom - - {} - description: Fragment derived from `ContainerLogConfigChoice` - properties: - console: - nullable: true - properties: - level: - description: Log levels - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - custom: - description: Custom log configuration provided in a ConfigMap - properties: - configMap: - nullable: true - type: string - type: object - file: - nullable: true - properties: - level: - description: Log levels - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - loggers: - additionalProperties: - properties: - level: - description: Log levels - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - default: {} - type: object - type: object - type: object - enableVectorAgent: - nullable: true - type: boolean - type: object - type: object exposeConfig: nullable: true type: boolean @@ -24873,186 +24793,3 @@ spec: storage: true subresources: status: {} ---- -apiVersion: apiextensions.k8s.io/v1 -kind: CustomResourceDefinition -metadata: - name: airflowdbs.airflow.stackable.tech - annotations: - helm.sh/resource-policy: keep -spec: - group: airflow.stackable.tech - names: - categories: [] - kind: AirflowDB - plural: airflowdbs - shortNames: [] - singular: airflowdb - scope: Namespaced - versions: - - additionalPrinterColumns: [] - name: v1alpha1 - schema: - openAPIV3Schema: - description: Auto-generated derived type for AirflowDBSpec via `CustomResource` - properties: - spec: - properties: - config: - properties: - logging: - default: - enableVectorAgent: null - containers: {} - properties: - containers: - additionalProperties: - anyOf: - - required: - - custom - - {} - description: Fragment derived from `ContainerLogConfigChoice` - properties: - console: - nullable: true - properties: - level: - description: Log levels - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - custom: - description: Custom log configuration provided in a ConfigMap - properties: - configMap: - nullable: true - type: string - type: object - file: - nullable: true - properties: - level: - description: Log levels - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - loggers: - additionalProperties: - properties: - level: - description: Log levels - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - default: {} - type: object - type: object - type: object - enableVectorAgent: - nullable: true - type: boolean - type: object - type: object - credentialsSecret: - type: string - image: - anyOf: - - required: - - custom - - productVersion - - required: - - productVersion - description: The Airflow image to use - properties: - custom: - description: Overwrite the docker image. Specify the full docker image name, e.g. `docker.stackable.tech/stackable/superset:1.4.1-stackable2.1.0` - type: string - productVersion: - description: Version of the product, e.g. `1.4.1`. - type: string - pullPolicy: - default: Always - description: '[Pull policy](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy) used when pulling the Images' - enum: - - IfNotPresent - - Always - - Never - type: string - pullSecrets: - description: '[Image pull secrets](https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod) to pull images from a private registry' - items: - description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. - properties: - name: - description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' - type: string - type: object - nullable: true - type: array - repo: - description: Name of the docker repo, e.g. `docker.stackable.tech/stackable` - nullable: true - type: string - stackableVersion: - description: Stackable version of the product, e.g. `23.4`, `23.4.1` or `0.0.0-dev`. If not specified, the operator will use its own version, e.g. `23.4.1`. When using a nightly operator or a pr version, it will use the nightly `0.0.0-dev` image. - nullable: true - type: string - type: object - vectorAggregatorConfigMapName: - nullable: true - type: string - required: - - config - - credentialsSecret - - image - type: object - status: - nullable: true - properties: - condition: - enum: - - Pending - - Initializing - - Ready - - Failed - type: string - startedAt: - description: Time is a wrapper around time.Time which supports correct marshaling to YAML and JSON. Wrappers are provided for many of the factory methods that the time package offers. - format: date-time - nullable: true - type: string - required: - - condition - type: object - required: - - spec - title: AirflowDB - type: object - served: true - storage: true - subresources: - status: {} diff --git a/docs/modules/airflow/examples/getting_started/code/getting_started.sh b/docs/modules/airflow/examples/getting_started/code/getting_started.sh index bb20c9b1..a85df754 100755 --- a/docs/modules/airflow/examples/getting_started/code/getting_started.sh +++ b/docs/modules/airflow/examples/getting_started/code/getting_started.sh @@ -71,23 +71,6 @@ echo "Creating Airflow cluster" kubectl apply -f airflow.yaml # end::install-airflow[] -for (( i=1; i<=15; i++ )) -do - echo "Waiting for AirflowDB to appear ..." - if eval kubectl get airflowdb airflow; then - break - fi - - sleep 1 -done - -echo "Waiting on AirflowDB to become ready ..." -# tag::wait-airflowdb[] -kubectl wait airflowdb/airflow \ - --for jsonpath='{.status.condition}'=Ready \ - --timeout 300s -# end::wait-airflowdb[] - sleep 5 echo "Awaiting Airflow rollout finish ..." diff --git a/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 b/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 index 1dec4fb6..a994515f 100755 --- a/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 +++ b/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 @@ -71,23 +71,6 @@ echo "Creating Airflow cluster" kubectl apply -f airflow.yaml # end::install-airflow[] -for (( i=1; i<=15; i++ )) -do - echo "Waiting for AirflowDB to appear ..." - if eval kubectl get airflowdb airflow; then - break - fi - - sleep 1 -done - -echo "Waiting on AirflowDB to become ready ..." -# tag::wait-airflowdb[] -kubectl wait airflowdb/airflow \ - --for jsonpath='{.status.condition}'=Ready \ - --timeout 300s -# end::wait-airflowdb[] - sleep 5 echo "Awaiting Airflow rollout finish ..." diff --git a/docs/modules/airflow/pages/getting_started/first_steps.adoc b/docs/modules/airflow/pages/getting_started/first_steps.adoc index 6ffebbb3..0ae27ddf 100644 --- a/docs/modules/airflow/pages/getting_started/first_steps.adoc +++ b/docs/modules/airflow/pages/getting_started/first_steps.adoc @@ -67,30 +67,7 @@ It should generally be safe to simply use the latest image version that is avail This will create the actual Airflow cluster. -== Initialization of the Airflow database - -When creating an Airflow cluster, a database-initialization job is first started to ensure that the database schema is present and correct (i.e. populated with an admin user). A Kubernetes job is created which starts a pod to initialize the database. This can take a while. - -You can use kubectl to wait on the resource, although the cluster itself will not be created until this step is complete.: - -[source,bash] -include::example$getting_started/code/getting_started.sh[tag=wait-airflowdb] - -The job status can be inspected and verified like this: - -[source,bash] ----- -kubectl get jobs ----- - -which will show something like this: - ----- -NAME COMPLETIONS DURATION AGE -airflow 1/1 85s 11m ----- - -Then, make sure that all the Pods in the StatefulSets are ready: +After a while, all the Pods in the StatefulSets should be ready: [source,bash] ---- @@ -109,10 +86,6 @@ airflow-webserver-default 1/1 11m airflow-worker-default 2/2 11m ---- -The completed set of pods for the Airflow cluster will look something like this: - -image::getting_started/airflow_pods.png[Airflow pods] - When the Airflow cluster has been created and the database is initialized, Airflow can be opened in the browser: the webserver UI port defaults to `8080` can be forwarded to the local host: diff --git a/docs/modules/airflow/pages/usage-guide/logging.adoc b/docs/modules/airflow/pages/usage-guide/logging.adoc index e3875626..c1a1150c 100644 --- a/docs/modules/airflow/pages/usage-guide/logging.adoc +++ b/docs/modules/airflow/pages/usage-guide/logging.adoc @@ -8,9 +8,6 @@ ConfigMap for the aggregator and by enabling the log agent: spec: clusterConfig: vectorAggregatorConfigMapName: vector-aggregator-discovery - databaseInitialization: - logging: - enableVectorAgent: true webservers: config: logging: diff --git a/rust/crd/src/airflowdb.rs b/rust/crd/src/airflowdb.rs deleted file mode 100644 index 972d05f8..00000000 --- a/rust/crd/src/airflowdb.rs +++ /dev/null @@ -1,202 +0,0 @@ -use crate::{build_recommended_labels, AirflowCluster}; - -use serde::{Deserialize, Serialize}; -use snafu::{ResultExt, Snafu}; -use stackable_operator::{ - builder::ObjectMetaBuilder, - commons::product_image_selection::{ProductImage, ResolvedProductImage}, - config::{ - fragment::{self, Fragment, ValidationError}, - merge::Merge, - }, - k8s_openapi::{apimachinery::pkg::apis::meta::v1::Time, chrono::Utc}, - kube::{CustomResource, ResourceExt}, - product_logging::{self, spec::Logging}, - schemars::{self, JsonSchema}, -}; -use strum::{Display, EnumIter}; - -pub const AIRFLOW_DB_CONTROLLER_NAME: &str = "airflow-db"; - -#[derive(Snafu, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum Error { - #[snafu(display("fragment validation failure"))] - FragmentValidationFailure { source: ValidationError }, -} - -type Result = std::result::Result; - -#[derive( - Clone, - Debug, - Deserialize, - Display, - Eq, - EnumIter, - JsonSchema, - Ord, - PartialEq, - PartialOrd, - Serialize, -)] -#[serde(rename_all = "kebab-case")] -#[strum(serialize_all = "kebab-case")] -pub enum Container { - AirflowInitDb, - Vector, -} - -#[derive(Clone, Debug, Default, Eq, Fragment, JsonSchema, PartialEq)] -#[fragment_attrs( - derive( - Clone, - Debug, - Default, - Deserialize, - Merge, - JsonSchema, - PartialEq, - Serialize - ), - serde(rename_all = "camelCase") -)] -pub struct AirflowDbConfig { - #[fragment_attrs(serde(default))] - pub logging: Logging, -} - -impl AirflowDbConfig { - fn default_config() -> AirflowDbConfigFragment { - AirflowDbConfigFragment { - logging: product_logging::spec::default_logging(), - } - } -} - -#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] -#[kube( - group = "airflow.stackable.tech", - version = "v1alpha1", - kind = "AirflowDB", - plural = "airflowdbs", - status = "AirflowDBStatus", - namespaced, - crates( - kube_core = "stackable_operator::kube::core", - k8s_openapi = "stackable_operator::k8s_openapi", - schemars = "stackable_operator::schemars" - ) -)] -#[serde(rename_all = "camelCase")] -pub struct AirflowDBSpec { - /// The Airflow image to use - pub image: ProductImage, - pub credentials_secret: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub vector_aggregator_config_map_name: Option, - pub config: AirflowDbConfigFragment, -} - -impl AirflowDB { - /// Returns an AirflowDB resource with the same name, namespace and Airflow version as the cluster. - pub fn for_airflow( - airflow: &AirflowCluster, - resolved_product_image: &ResolvedProductImage, - ) -> Result { - Ok(Self { - // The db is deliberately not owned by the cluster so it doesn't get deleted when the - // cluster gets deleted. The schema etc. still exists in the database and can be reused - // when the cluster is created again. - metadata: ObjectMetaBuilder::new() - .name_and_namespace(airflow) - .with_recommended_labels(build_recommended_labels( - airflow, - AIRFLOW_DB_CONTROLLER_NAME, - &resolved_product_image.product_version, - "db-initializer", - "global", - )) - .build(), - spec: AirflowDBSpec { - image: airflow.spec.image.clone(), - credentials_secret: airflow.spec.cluster_config.credentials_secret.clone(), - vector_aggregator_config_map_name: airflow - .spec - .cluster_config - .vector_aggregator_config_map_name - .clone(), - config: AirflowDbConfigFragment { - logging: airflow - .spec - .cluster_config - .database_initialization - .clone() - .unwrap_or_default() - .logging, - }, - }, - status: None, - }) - } - - pub fn job_name(&self) -> String { - self.name_unchecked() - } - - pub fn merged_config(&self) -> Result { - let defaults = AirflowDbConfig::default_config(); - let mut config = self.spec.config.to_owned(); - config.merge(&defaults); - fragment::validate(config).context(FragmentValidationFailureSnafu) - } -} - -#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct AirflowDBStatus { - #[serde(skip_serializing_if = "Option::is_none")] - pub started_at: Option