diff --git a/CHANGELOG.md b/CHANGELOG.md index 076279cb..7d181fdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,11 +13,13 @@ - Allow multiple Airflows in the same namespace to use Kubernetes executors. Previously, the operator would always use the same name for the executor Pod template ConfigMap. Thus when deploying multiple Airflow instances in the same namespace, there would be a conflict over the contents of that ConfigMap ([#678]). +- For versions >= 3 custom logging initializes the RemoteLogIO handler to fix remote logging ([#683]). [#667]: https://github.com/stackabletech/airflow-operator/pull/667 [#668]: https://github.com/stackabletech/airflow-operator/pull/668 [#669]: https://github.com/stackabletech/airflow-operator/pull/669 [#678]: https://github.com/stackabletech/airflow-operator/pull/678 +[#683]: https://github.com/stackabletech/airflow-operator/pull/683 ## [25.7.0] - 2025-07-23 diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 60a6b9dd..b06e1b6c 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -806,6 +806,7 @@ fn build_rolegroup_config_map( container, &Container::Vector, &mut cm_builder, + resolved_product_image, ) .context(InvalidLoggingConfigSnafu { cm_name: rolegroup.object_name(), diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index 67a0fcad..b2431a18 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -3,6 +3,7 @@ use std::fmt::{Display, Write}; use snafu::Snafu; use stackable_operator::{ builder::configmap::ConfigMapBuilder, + commons::product_image_selection::ResolvedProductImage, kube::Resource, product_logging::{ self, @@ -43,6 +44,7 @@ pub fn extend_config_map_with_log_config( main_container: &C, vector_container: &C, cm_builder: &mut ConfigMapBuilder, + resolved_product_image: &ResolvedProductImage, ) -> Result<()> where C: Clone + Ord + Display, @@ -53,7 +55,10 @@ where }) = logging.containers.get(main_container) { let log_dir = format!("{STACKABLE_LOG_DIR}/{main_container}"); - cm_builder.add_data(LOG_CONFIG_FILE, create_airflow_config(log_config, &log_dir)); + cm_builder.add_data( + LOG_CONFIG_FILE, + create_airflow_config(log_config, &log_dir, resolved_product_image), + ); } let vector_log_config = if let Some(ContainerLogConfig { @@ -75,7 +80,11 @@ where Ok(()) } -fn create_airflow_config(log_config: &AutomaticContainerLogConfig, log_dir: &str) -> String { +fn create_airflow_config( + log_config: &AutomaticContainerLogConfig, + log_dir: &str, + resolved_product_image: &ResolvedProductImage, +) -> String { let loggers_config = log_config .loggers .iter() @@ -92,18 +101,28 @@ LOGGING_CONFIG['loggers']['{name}']['level'] = {level} output }); + let remote_task_log = if resolved_product_image.product_version.starts_with("2.") { + "" + } else { + " +# This will cause the relevant RemoteLogIO handler to be initialized +REMOTE_TASK_LOG = airflow_local_settings.REMOTE_TASK_LOG +log = logging.getLogger(__name__) +log.info('Custom logging remote task log %s', REMOTE_TASK_LOG) +" + }; + format!( "\ import logging import os from copy import deepcopy -from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG +from airflow.config_templates import airflow_local_settings os.makedirs('{log_dir}', exist_ok=True) -LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) - -REMOTE_TASK_LOG = None +LOGGING_CONFIG = deepcopy(airflow_local_settings.DEFAULT_LOGGING_CONFIG) +{remote_task_log} LOGGING_CONFIG.setdefault('loggers', {{}}) for logger_name, logger_config in LOGGING_CONFIG['loggers'].items(): diff --git a/tests/templates/kuttl/remote-logging/00-patch-ns.yaml.j2 b/tests/templates/kuttl/remote-logging/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/remote-logging/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/remote-logging/00-range-limit.yaml.j2 b/tests/templates/kuttl/remote-logging/00-range-limit.yaml.j2 new file mode 100644 index 00000000..b35b57a4 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/00-range-limit.yaml.j2 @@ -0,0 +1,13 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: v1 +kind: LimitRange +metadata: + name: limit-request-ratio +spec: + limits: + - type: "Container" + maxLimitRequestRatio: + cpu: 5 + memory: 1 +{% endif %} diff --git a/tests/templates/kuttl/remote-logging/02-s3-secret.yaml b/tests/templates/kuttl/remote-logging/02-s3-secret.yaml new file mode 100644 index 00000000..2898ff1a --- /dev/null +++ b/tests/templates/kuttl/remote-logging/02-s3-secret.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials + labels: + secrets.stackable.tech/class: spark-pi-private-s3-credentials-class +timeout: 240 +stringData: + accessKey: minioAccessKey + secretKey: minioSecretKey + # The following two entries are used by the Bitnami chart for MinIO to + # set up credentials for accessing buckets managed by the MinIO tenant. + root-user: minioAccessKey + root-password: minioSecretKey +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: spark-pi-private-s3-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} diff --git a/tests/templates/kuttl/remote-logging/03-assert.yaml b/tests/templates/kuttl/remote-logging/03-assert.yaml new file mode 100644 index 00000000..d192e336 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/03-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: test-minio +status: + readyReplicas: 1 diff --git a/tests/templates/kuttl/remote-logging/03-setup-minio.yaml b/tests/templates/kuttl/remote-logging/03-setup-minio.yaml new file mode 100644 index 00000000..cf036aa6 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/03-setup-minio.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install test-minio + --namespace $NAMESPACE + --version 17.0.19 + -f helm-bitnami-minio-values.yaml + oci://registry-1.docker.io/bitnamicharts/minio + timeout: 240 diff --git a/tests/templates/kuttl/remote-logging/10-assert.yaml b/tests/templates/kuttl/remote-logging/10-assert.yaml new file mode 100644 index 00000000..319e927a --- /dev/null +++ b/tests/templates/kuttl/remote-logging/10-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-postgresql +timeout: 480 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-postgresql +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/remote-logging/10-install-postgresql.yaml b/tests/templates/kuttl/remote-logging/10-install-postgresql.yaml new file mode 100644 index 00000000..9e0529d1 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/10-install-postgresql.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install airflow-postgresql + --namespace $NAMESPACE + --version 16.4.2 + -f helm-bitnami-postgresql-values.yaml + oci://registry-1.docker.io/bitnamicharts/postgresql + --wait + timeout: 600 diff --git a/tests/templates/kuttl/remote-logging/20-assert.yaml.j2 b/tests/templates/kuttl/remote-logging/20-assert.yaml.j2 new file mode 100644 index 00000000..8d585401 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/20-assert.yaml.j2 @@ -0,0 +1,24 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-redis +timeout: 360 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-redis-master +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-redis-replicas +status: + readyReplicas: 1 + replicas: 1 +{% endif %} diff --git a/tests/templates/kuttl/remote-logging/20-install-redis.yaml.j2 b/tests/templates/kuttl/remote-logging/20-install-redis.yaml.j2 new file mode 100644 index 00000000..3a07199d --- /dev/null +++ b/tests/templates/kuttl/remote-logging/20-install-redis.yaml.j2 @@ -0,0 +1,14 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install airflow-redis + --namespace $NAMESPACE + --version 17.11.3 + -f helm-bitnami-redis-values.yaml + --repo https://charts.bitnami.com/bitnami redis + --wait + timeout: 600 +{% endif %} diff --git a/tests/templates/kuttl/remote-logging/40-assert.yaml.j2 b/tests/templates/kuttl/remote-logging/40-assert.yaml.j2 new file mode 100644 index 00000000..81337261 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/40-assert.yaml.j2 @@ -0,0 +1,81 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-available-condition +timeout: 600 +commands: + - script: kubectl -n $NAMESPACE wait --for=condition=available airflowclusters.airflow.stackable.tech/airflow --timeout 301s +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-cluster +timeout: 1200 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-webserver-default +spec: + template: + spec: + terminationGracePeriodSeconds: 120 +status: + readyReplicas: 1 + replicas: 1 +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-worker-default +spec: + template: + spec: + terminationGracePeriodSeconds: 300 +status: + readyReplicas: 2 + replicas: 2 +{% endif %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-scheduler-default +spec: + template: + spec: + terminationGracePeriodSeconds: 120 +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-webserver +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-worker +status: + expectedPods: 2 + currentHealthy: 2 + disruptionsAllowed: 1 +{% endif %} +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-scheduler +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 diff --git a/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 new file mode 100644 index 00000000..f3004952 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 @@ -0,0 +1,66 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: install-airflow-db +timeout: 480 +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-airflow-credentials +type: Opaque +stringData: + adminUser.username: airflow + adminUser.firstname: Airflow + adminUser.lastname: Admin + adminUser.email: airflow@airflow.com + adminUser.password: airflow + connections.secretKey: thisISaSECRET_1234 + connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql/airflow +{% if test_scenario['values']['executor'] == 'celery' %} + connections.celeryResultBackend: db+postgresql://airflow:airflow@airflow-postgresql/airflow + connections.celeryBrokerUrl: redis://:redis@airflow-redis-master:6379/0 +{% endif %} +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + image: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + loadExamples: true + credentialsSecret: test-airflow-credentials + webservers: + roleConfig: + listenerClass: external-unstable + roleGroups: + default: + replicas: 1 + envOverrides: &envOverrides + AIRFLOW_CONN_MINIO_CONN: "aws://minioAccessKey:minioSecretKey@/?endpoint_url=http%3A%2F%2Ftest-minio%3A9000" + AIRFLOW__LOGGING__REMOTE_LOGGING: "True" + AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://my-bucket/ + AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: minio_conn +{% if test_scenario['values']['executor'] == 'celery' %} + celeryExecutors: + roleGroups: + default: + replicas: 2 + envOverrides: *envOverrides +{% elif test_scenario['values']['executor'] == 'kubernetes' %} + kubernetesExecutors: + envOverrides: *envOverrides +{% endif %} + schedulers: + roleGroups: + default: + replicas: 1 + envOverrides: *envOverrides diff --git a/tests/templates/kuttl/remote-logging/50-assert.yaml b/tests/templates/kuttl/remote-logging/50-assert.yaml new file mode 100644 index 00000000..6edaa3c3 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/50-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-python +timeout: 240 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-airflow-python +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/remote-logging/50-install-airflow-python.yaml b/tests/templates/kuttl/remote-logging/50-install-airflow-python.yaml new file mode 100644 index 00000000..7be1fbf8 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/50-install-airflow-python.yaml @@ -0,0 +1,30 @@ +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-airflow-python + labels: + app: test-airflow-python +spec: + replicas: 1 + selector: + matchLabels: + app: test-airflow-python + template: + metadata: + labels: + app: test-airflow-python + spec: + containers: + - name: test-airflow-python + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + stdin: true + tty: true + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "128Mi" + cpu: "400m" diff --git a/tests/templates/kuttl/remote-logging/60-assert.yaml.j2 b/tests/templates/kuttl/remote-logging/60-assert.yaml.j2 new file mode 100644 index 00000000..b85052aa --- /dev/null +++ b/tests/templates/kuttl/remote-logging/60-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/remote-logging/60-health-check.yaml b/tests/templates/kuttl/remote-logging/60-health-check.yaml new file mode 100644 index 00000000..c72c4222 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/60-health-check.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +timeout: 480 +commands: + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/health.py test-airflow-python-0:/tmp + timeout: 240 diff --git a/tests/templates/kuttl/remote-logging/70-assert.yaml.j2 b/tests/templates/kuttl/remote-logging/70-assert.yaml.j2 new file mode 100644 index 00000000..68f0677b --- /dev/null +++ b/tests/templates/kuttl/remote-logging/70-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: remote-logging +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/remote-logging-metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/remote-logging-metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/remote-logging/70-install-remote-logging-script.yaml b/tests/templates/kuttl/remote-logging/70-install-remote-logging-script.yaml new file mode 100644 index 00000000..3b266b7e --- /dev/null +++ b/tests/templates/kuttl/remote-logging/70-install-remote-logging-script.yaml @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: remote-logging +commands: + - script: kubectl cp -n $NAMESPACE remote-logging-metrics.py test-airflow-python-0:/tmp + timeout: 240 diff --git a/tests/templates/kuttl/remote-logging/helm-bitnami-minio-values.yaml b/tests/templates/kuttl/remote-logging/helm-bitnami-minio-values.yaml new file mode 100644 index 00000000..94e90c6e --- /dev/null +++ b/tests/templates/kuttl/remote-logging/helm-bitnami-minio-values.yaml @@ -0,0 +1,70 @@ +--- +global: + security: + allowInsecureImages: true # needed starting with Chart version 14.9.0 if modifying images + +image: + repository: bitnamilegacy/minio +clientImage: + repository: bitnamilegacy/minio-client +defaultInitContainers: + volumePermissions: # volumePermissions moved under defaultInitContainers starting with Chart version 17.0.0 + enabled: false + image: + repository: bitnamilegacy/os-shell +console: + image: + repository: bitnamilegacy/minio-object-browser + +mode: standalone +disableWebUI: false +extraEnvVars: + - name: BITNAMI_DEBUG + value: "true" + - name: MINIO_LOG_LEVEL + value: DEBUG + +provisioning: + enabled: true + buckets: + - name: my-bucket + resources: + requests: + memory: 1Gi + cpu: "512m" + limits: + memory: "1Gi" + cpu: "1" + podSecurityContext: + enabled: false + containerSecurityContext: + enabled: false + +# volumePermissions can be removed starting with Chart version 17.0.0, moved under defaultInitContainers +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + +podSecurityContext: + enabled: false + +containerSecurityContext: + enabled: false + +persistence: + enabled: false + +resources: + requests: + memory: 1Gi + cpu: "1" + limits: + memory: 1Gi + cpu: "2" + +auth: + existingSecret: minio-credentials + +service: + type: NodePort diff --git a/tests/templates/kuttl/remote-logging/helm-bitnami-postgresql-values.yaml.j2 b/tests/templates/kuttl/remote-logging/helm-bitnami-postgresql-values.yaml.j2 new file mode 100644 index 00000000..8067902b --- /dev/null +++ b/tests/templates/kuttl/remote-logging/helm-bitnami-postgresql-values.yaml.j2 @@ -0,0 +1,43 @@ +--- +global: + security: + allowInsecureImages: true + +image: + repository: bitnamilegacy/postgresql + +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + securityContext: + runAsUser: auto + +metrics: + image: + repository: bitnamilegacy/postgres-exporter + +primary: + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "128Mi" + cpu: "400m" +shmVolume: + chmod: + enabled: false + +auth: + username: airflow + password: airflow + database: airflow diff --git a/tests/templates/kuttl/remote-logging/helm-bitnami-redis-values.yaml.j2 b/tests/templates/kuttl/remote-logging/helm-bitnami-redis-values.yaml.j2 new file mode 100644 index 00000000..293abaf1 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/helm-bitnami-redis-values.yaml.j2 @@ -0,0 +1,63 @@ +--- +global: + security: + allowInsecureImages: true # needed starting with Chart version 20.5.0 if modifying images +image: + repository: bitnamilegacy/redis +sentinel: + image: + repository: bitnamilegacy/redis-sentinel +metrics: + image: + repository: bitnamilegacy/redis-exporter +kubectl: + image: + repository: bitnamilegacy/kubectl +sysctl: + image: + repository: bitnamilegacy/os-shell + +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + containerSecurityContext: + runAsUser: auto + +master: + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + resources: + requests: + memory: "128Mi" + cpu: "200m" + limits: + memory: "128Mi" + cpu: "800m" + +replica: + replicaCount: 1 + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + resources: + requests: + memory: "128Mi" + cpu: "100m" + limits: + memory: "128Mi" + cpu: "400m" + +auth: + password: redis diff --git a/tests/templates/kuttl/remote-logging/remote-logging-metrics.py b/tests/templates/kuttl/remote-logging/remote-logging-metrics.py new file mode 100755 index 00000000..96eca5e0 --- /dev/null +++ b/tests/templates/kuttl/remote-logging/remote-logging-metrics.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python + +import requests +import time +import sys +from datetime import datetime, timezone +import argparse +import logging + + +def exception_handler(exception_type, exception, traceback): + print(f"{exception_type.__name__}: {exception.args}") + + +sys.excepthook = exception_handler + + +def collect_all_sources(payload): + # Return a flat list of every source string found in payload['content']. + sources = [] + for entry in payload.get("content", []): + if isinstance(entry, dict) and isinstance(entry.get("sources"), list): + sources.extend(entry["sources"]) + return sources + + +def prefix_is_matched(tasks, dag_run_id_root_url, headers, required_prefix) -> bool: + for ti in tasks["task_instances"]: + task_id = ti["task_id"] + try_number = ti["try_number"] + logs = requests.get( + f"{dag_run_id_root_url}/{task_id}/logs/{try_number}", + headers=headers, + params={"full_content": "true"}, + ).json() + print(f"Logs/full-content: {logs}") + all_sources = collect_all_sources(logs) + if not all_sources: + print("No 'sources' arrays were found yet in the payload...") + else: + # Look for *any* source that starts with the required prefix + matching = [s for s in all_sources if s.startswith(required_prefix)] + if matching: + for matched in matching: + print( + f"Validation passed - at least one source starts with the required prefix: {matched}" + ) + return True + else: + print(f"No source found yet beginning with '{required_prefix}'...") + return False + + +def remote_logging() -> None: + now = datetime.now(timezone.utc) + ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + + # Trigger a DAG run to create logs + dag_id = "example_trigger_target_dag" + dag_data = {"logical_date": f"{ts}", "conf": {"message": "Hello World"}} + required_prefix = "s3://my-bucket/dag_id=example_trigger_target_dag" + + print(f"DAG-Data: {dag_data}") + + # allow a few moments for the DAGs to be registered to all roles + time.sleep(10) + + rest_url = "http://airflow-webserver:8080/api/v2" + token_url = "http://airflow-webserver:8080/auth/token" + + data = {"username": "airflow", "password": "airflow"} + + headers = {"Content-Type": "application/json"} + + response = requests.post(token_url, headers=headers, json=data) + + if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") + else: + print( + f"Failed to obtain access token: {response.status_code} - {response.text}" + ) + sys.exit(1) + + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + # activate DAG + response = requests.patch( + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} + ) + + # trigger DAG + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data + ) + + # get logs + dagRuns = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data + ) + + iterations = 4 + loop = 0 + while True: + # get latest logs + response = requests.get( + f"{rest_url}/dags/{dag_id}/dagRuns", params={"limit": 1}, headers=headers + ) + dagRuns = response.json() + if response.status_code == 200: + latest_run = dagRuns["dag_runs"][0] + dag_run_id = latest_run["dag_run_id"] + print(f"Latest run ID: {dag_run_id}") + dag_run_id_root_url = ( + f"{rest_url}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" + ) + tasks = requests.get( + f"{dag_run_id_root_url}", + headers=headers, + ).json() + if prefix_is_matched(tasks, dag_run_id_root_url, headers, required_prefix): + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + + +if __name__ == "__main__": + log_level = "DEBUG" + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + parser = argparse.ArgumentParser(description="Airflow remote logging script") + parser.add_argument("--airflow-version", type=str, help="Airflow version") + opts = parser.parse_args() + + if opts.airflow_version and not opts.airflow_version.startswith("2."): + remote_logging() + else: + # should not happen as we are using airflow-latest + print("Remote logging is not tested for version < 3.x!") + sys.exit(1) diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index d2860cd8..f092e9a7 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -90,6 +90,11 @@ tests: - airflow - openshift - executor + - name: remote-logging + dimensions: + - airflow-latest + - openshift + - executor suites: - name: nightly # Run nightly with the latest airflow