Skip to content

Commit

Permalink
[xy] Interpolate global vars in k8s executor namespace (#4792)
Browse files Browse the repository at this point in the history
* [xy] Interpolate global vars in k8s executor namespace

* [xy] Fix var interpolation in k8s pipeline executor.
  • Loading branch information
wangxiaoyou1993 committed Mar 19, 2024
1 parent fe6fccb commit 75c6f84
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
9 changes: 8 additions & 1 deletion mage_ai/data_preparation/executors/k8s_block_executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import Dict

from jinja2 import Template

from mage_ai.data_preparation.executors.block_executor import BlockExecutor
from mage_ai.data_preparation.shared.utils import get_template_vars
from mage_ai.services.k8s.config import K8sExecutorConfig
from mage_ai.services.k8s.constants import DEFAULT_NAMESPACE
from mage_ai.services.k8s.job_manager import JobManager as K8sJobManager
Expand Down Expand Up @@ -29,9 +32,13 @@ def _execute(
job_name_prefix = self.executor_config.job_name_prefix

if self.executor_config.namespace:
namespace = self.executor_config.namespace
namespace = Template(self.executor_config.namespace).render(
variables=lambda x: global_vars.get(x) if global_vars else None,
**get_template_vars()
)
else:
namespace = DEFAULT_NAMESPACE

job_manager = K8sJobManager(
job_name=f'mage-{job_name_prefix}-block-{block_run_id}',
logger=self.logger,
Expand Down
13 changes: 12 additions & 1 deletion mage_ai/data_preparation/executors/k8s_pipeline_executor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import traceback
from typing import Dict

from jinja2 import Template

from mage_ai.data_preparation.executors.pipeline_executor import PipelineExecutor
from mage_ai.data_preparation.models.pipeline import Pipeline
from mage_ai.data_preparation.shared.utils import get_template_vars
from mage_ai.services.k8s.config import K8sExecutorConfig
from mage_ai.services.k8s.constants import DEFAULT_NAMESPACE
from mage_ai.services.k8s.job_manager import JobManager as K8sJobManager
Expand Down Expand Up @@ -39,6 +42,7 @@ def execute(
) -> None:
job_manager = self.get_job_manager(
pipeline_run_id=pipeline_run_id,
global_vars=global_vars,
**kwargs,
)
cmd = self._run_commands(
Expand All @@ -54,15 +58,22 @@ def execute(
def get_job_manager(
self,
pipeline_run_id: int = None,
global_vars: Dict = None,
**kwargs,
) -> K8sJobManager:
if global_vars is None:
global_vars = dict()
if not self.executor_config.job_name_prefix:
job_name_prefix = 'data-prep'
else:
job_name_prefix = self.executor_config.job_name_prefix

if self.executor_config.namespace:
namespace = self.executor_config.namespace

namespace = Template(self.executor_config.namespace).render(
variables=lambda x: global_vars.get(x) if global_vars else None,
**get_template_vars()
)
else:
namespace = DEFAULT_NAMESPACE

Expand Down
8 changes: 8 additions & 0 deletions mage_ai/services/k8s/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ def __init__(
logger=None,
logging_tags: Dict = None,
):
"""Initialize the kubernetes job manager.
Args:
job_name (str, optional): The name of the job.
namespace (str, optional): The namespace of the executor pod.
logger (None, optional): Logger to log the messages.
logging_tags (Dict, optional): Logging tags to be included in the log messages.
"""
self.job_name = job_name
self.namespace = namespace
self.logger = logger
Expand Down

0 comments on commit 75c6f84

Please sign in to comment.