Skip to content

Commit

Permalink
Add generic pipeline support for Airflow 2.x
Browse files Browse the repository at this point in the history
This is a breaking change since Airflow 2 has no backwards compatible
imports with Airflow 1 for the KubernetesPodOperator

We change to the Kubernetes client SDK since the Airflow abstractions
were all deprecated and removed except for Secret.

Finally, Airflow 2 adds logic that makes config_file mutually exclusive
with in_cluster, so we need to ensure that None is passed as None and
not string "None".
  • Loading branch information
ianonavy committed Jun 21, 2023
1 parent 3c27ada commit f9d1329
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
25 changes: 20 additions & 5 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,13 +596,24 @@ def render_volumes(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
str_to_render = ""
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
Volume(name="{v.pvc_name}", configs={{"persistentVolumeClaim": {{"claimName": "{v.pvc_name}"}}}}),"""
k8s.V1Volume(
name="{v.pvc_name}",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="{v.pvc_name}",
),
),"""

# set custom shared memory size
shm = elyra_properties.get(pipeline_constants.KUBERNETES_SHARED_MEM_SIZE)
if shm is not None and shm.size:
config = f"""configs={{"emptyDir": {{"medium": "Memory", "sizeLimit": "{shm.size}{shm.units}"}}}}"""
str_to_render += f"""
Volume(name="shm", {config}),"""
k8s.V1Volume(
name="shm",
empty_dir=k8s.V1EmptyDirVolumeSource(
medium="Memory",
size_limit="{shm.size}{shm.units}",
),
),"""
return dedent(str_to_render)

def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
Expand All @@ -613,8 +624,12 @@ def render_mounts(self, elyra_properties: Dict[str, ElyraProperty]) -> str:
str_to_render = ""
for v in elyra_properties.get(pipeline_constants.MOUNTED_VOLUMES, []):
str_to_render += f"""
VolumeMount(name="{v.pvc_name}", mount_path="{v.path}",
sub_path="{v.sub_path}", read_only={v.read_only}),"""
k8s.V1VolumeMount(
name="{v.pvc_name}",
mount_path="{v.path}",
sub_path="{v.sub_path}",
read_only={v.read_only},
),"""
return dedent(str_to_render)

def render_secrets(self, elyra_properties: Dict[str, ElyraProperty], cos_secret: Optional[str]) -> str:
Expand Down
7 changes: 3 additions & 4 deletions elyra/templates/airflow/airflow_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ dag = DAG(
{% endfor %}
{% else %}
from airflow.kubernetes.secret import Secret
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
{% endif %}

{% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %}
Expand Down Expand Up @@ -67,7 +66,7 @@ op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.n
labels={{ processor.render_labels(operation.elyra_props) }},
tolerations=[{{ processor.render_tolerations(operation.elyra_props) }}],
in_cluster={{ in_cluster }},
config_file="{{ kube_config_path }}",
config_file={% if kube_config_path is string %}"{{ kube_config_path }}"{% else %}{{ kube_config_path }}{% endif %},
{% endif %}
dag=dag)
{% if operation.image_pull_policy %}
Expand Down

0 comments on commit f9d1329

Please sign in to comment.