Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
- Allow multiple Airflows in the same namespace to use Kubernetes executors.
Previously, the operator would always use the same name for the executor Pod template ConfigMap.
Thus when deploying multiple Airflow instances in the same namespace, there would be a conflict over the contents of that ConfigMap ([#678]).
- For versions >= 3 custom logging initializes the RemoteLogIO handler to fix remote logging ([#683]).

[#667]: https://github.com/stackabletech/airflow-operator/pull/667
[#668]: https://github.com/stackabletech/airflow-operator/pull/668
[#669]: https://github.com/stackabletech/airflow-operator/pull/669
[#678]: https://github.com/stackabletech/airflow-operator/pull/678
[#683]: https://github.com/stackabletech/airflow-operator/pull/683

## [25.7.0] - 2025-07-23

Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ fn build_rolegroup_config_map(
container,
&Container::Vector,
&mut cm_builder,
resolved_product_image,
)
.context(InvalidLoggingConfigSnafu {
cm_name: rolegroup.object_name(),
Expand Down
31 changes: 25 additions & 6 deletions rust/operator-binary/src/product_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::fmt::{Display, Write};
use snafu::Snafu;
use stackable_operator::{
builder::configmap::ConfigMapBuilder,
commons::product_image_selection::ResolvedProductImage,
kube::Resource,
product_logging::{
self,
Expand Down Expand Up @@ -43,6 +44,7 @@ pub fn extend_config_map_with_log_config<C, K>(
main_container: &C,
vector_container: &C,
cm_builder: &mut ConfigMapBuilder,
resolved_product_image: &ResolvedProductImage,
) -> Result<()>
where
C: Clone + Ord + Display,
Expand All @@ -53,7 +55,10 @@ where
}) = logging.containers.get(main_container)
{
let log_dir = format!("{STACKABLE_LOG_DIR}/{main_container}");
cm_builder.add_data(LOG_CONFIG_FILE, create_airflow_config(log_config, &log_dir));
cm_builder.add_data(
LOG_CONFIG_FILE,
create_airflow_config(log_config, &log_dir, resolved_product_image),
);
}

let vector_log_config = if let Some(ContainerLogConfig {
Expand All @@ -75,7 +80,11 @@ where
Ok(())
}

fn create_airflow_config(log_config: &AutomaticContainerLogConfig, log_dir: &str) -> String {
fn create_airflow_config(
log_config: &AutomaticContainerLogConfig,
log_dir: &str,
resolved_product_image: &ResolvedProductImage,
) -> String {
let loggers_config = log_config
.loggers
.iter()
Expand All @@ -92,18 +101,28 @@ LOGGING_CONFIG['loggers']['{name}']['level'] = {level}
output
});

let remote_task_log = if resolved_product_image.product_version.starts_with("2.") {
""
} else {
"
# This will cause the relevant RemoteLogIO handler to be initialized
REMOTE_TASK_LOG = airflow_local_settings.REMOTE_TASK_LOG
log = logging.getLogger(__name__)
log.info('Custom logging remote task log %s', REMOTE_TASK_LOG)
"
};

format!(
"\
import logging
import os
from copy import deepcopy
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.config_templates import airflow_local_settings

os.makedirs('{log_dir}', exist_ok=True)

LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)

REMOTE_TASK_LOG = None
LOGGING_CONFIG = deepcopy(airflow_local_settings.DEFAULT_LOGGING_CONFIG)
{remote_task_log}

LOGGING_CONFIG.setdefault('loggers', {{}})
for logger_name, logger_config in LOGGING_CONFIG['loggers'].items():
Expand Down
9 changes: 9 additions & 0 deletions tests/templates/kuttl/remote-logging/00-patch-ns.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% if test_scenario['values']['openshift'] == 'true' %}
# see https://github.com/stackabletech/issues/issues/566
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}'
timeout: 120
{% endif %}
13 changes: 13 additions & 0 deletions tests/templates/kuttl/remote-logging/00-range-limit.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{% if test_scenario['values']['executor'] == 'celery' %}
---
apiVersion: v1
kind: LimitRange
metadata:
name: limit-request-ratio
spec:
limits:
- type: "Container"
maxLimitRequestRatio:
cpu: 5
memory: 1
{% endif %}
25 changes: 25 additions & 0 deletions tests/templates/kuttl/remote-logging/02-s3-secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
apiVersion: v1
kind: Secret
metadata:
name: minio-credentials
labels:
secrets.stackable.tech/class: spark-pi-private-s3-credentials-class
timeout: 240
stringData:
accessKey: minioAccessKey
secretKey: minioSecretKey
# The following two entries are used by the Bitnami chart for MinIO to
# set up credentials for accessing buckets managed by the MinIO tenant.
root-user: minioAccessKey
root-password: minioSecretKey
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: spark-pi-private-s3-credentials-class
spec:
backend:
k8sSearch:
searchNamespace:
pod: {}
11 changes: 11 additions & 0 deletions tests/templates/kuttl/remote-logging/03-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 900
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-minio
status:
readyReplicas: 1
11 changes: 11 additions & 0 deletions tests/templates/kuttl/remote-logging/03-setup-minio.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: >-
helm install test-minio
--namespace $NAMESPACE
--version 17.0.19
-f helm-bitnami-minio-values.yaml
oci://registry-1.docker.io/bitnamicharts/minio
timeout: 240
14 changes: 14 additions & 0 deletions tests/templates/kuttl/remote-logging/10-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
metadata:
name: test-airflow-postgresql
timeout: 480
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-postgresql
status:
readyReplicas: 1
replicas: 1
12 changes: 12 additions & 0 deletions tests/templates/kuttl/remote-logging/10-install-postgresql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: >-
helm install airflow-postgresql
--namespace $NAMESPACE
--version 16.4.2
-f helm-bitnami-postgresql-values.yaml
oci://registry-1.docker.io/bitnamicharts/postgresql
--wait
timeout: 600
24 changes: 24 additions & 0 deletions tests/templates/kuttl/remote-logging/20-assert.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{% if test_scenario['values']['executor'] == 'celery' %}
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
metadata:
name: test-airflow-redis
timeout: 360
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-redis-master
status:
readyReplicas: 1
replicas: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-redis-replicas
status:
readyReplicas: 1
replicas: 1
{% endif %}
14 changes: 14 additions & 0 deletions tests/templates/kuttl/remote-logging/20-install-redis.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% if test_scenario['values']['executor'] == 'celery' %}
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: >-
helm install airflow-redis
--namespace $NAMESPACE
--version 17.11.3
-f helm-bitnami-redis-values.yaml
--repo https://charts.bitnami.com/bitnami redis
--wait
timeout: 600
{% endif %}
81 changes: 81 additions & 0 deletions tests/templates/kuttl/remote-logging/40-assert.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
metadata:
name: test-available-condition
timeout: 600
commands:
- script: kubectl -n $NAMESPACE wait --for=condition=available airflowclusters.airflow.stackable.tech/airflow --timeout 301s
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
metadata:
name: test-airflow-cluster
timeout: 1200
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-webserver-default
spec:
template:
spec:
terminationGracePeriodSeconds: 120
status:
readyReplicas: 1
replicas: 1
{% if test_scenario['values']['executor'] == 'celery' %}
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-worker-default
spec:
template:
spec:
terminationGracePeriodSeconds: 300
status:
readyReplicas: 2
replicas: 2
{% endif %}
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-scheduler-default
spec:
template:
spec:
terminationGracePeriodSeconds: 120
status:
readyReplicas: 1
replicas: 1
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: airflow-webserver
status:
expectedPods: 1
currentHealthy: 1
disruptionsAllowed: 1
{% if test_scenario['values']['executor'] == 'celery' %}
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: airflow-worker
status:
expectedPods: 2
currentHealthy: 2
disruptionsAllowed: 1
{% endif %}
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: airflow-scheduler
status:
expectedPods: 1
currentHealthy: 1
disruptionsAllowed: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
apiVersion: kuttl.dev/v1beta1
kind: TestStep
metadata:
name: install-airflow-db
timeout: 480
---
apiVersion: v1
kind: Secret
metadata:
name: test-airflow-credentials
type: Opaque
stringData:
adminUser.username: airflow
adminUser.firstname: Airflow
adminUser.lastname: Admin
adminUser.email: airflow@airflow.com
adminUser.password: airflow
connections.secretKey: thisISaSECRET_1234
connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql/airflow
{% if test_scenario['values']['executor'] == 'celery' %}
connections.celeryResultBackend: db+postgresql://airflow:airflow@airflow-postgresql/airflow
connections.celeryBrokerUrl: redis://:redis@airflow-redis-master:6379/0
{% endif %}
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
name: airflow
spec:
image:
{% if test_scenario['values']['airflow-latest'].find(",") > 0 %}
custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}"
{% else %}
productVersion: "{{ test_scenario['values']['airflow-latest'] }}"
{% endif %}
pullPolicy: IfNotPresent
clusterConfig:
loadExamples: true
credentialsSecret: test-airflow-credentials
webservers:
roleConfig:
listenerClass: external-unstable
roleGroups:
default:
replicas: 1
envOverrides: &envOverrides
AIRFLOW_CONN_MINIO_CONN: "aws://minioAccessKey:minioSecretKey@/?endpoint_url=http%3A%2F%2Ftest-minio%3A9000"
AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://my-bucket/
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: minio_conn
{% if test_scenario['values']['executor'] == 'celery' %}
celeryExecutors:
roleGroups:
default:
replicas: 2
envOverrides: *envOverrides
{% elif test_scenario['values']['executor'] == 'kubernetes' %}
kubernetesExecutors:
envOverrides: *envOverrides
{% endif %}
schedulers:
roleGroups:
default:
replicas: 1
envOverrides: *envOverrides
Loading