Skip to content

Commit

Permalink
[Runtimes] Support Sparkjob monitoring & add new mounts support (#1427)
Browse files Browse the repository at this point in the history
  • Loading branch information
urihoenig committed Oct 19, 2021
1 parent 4e84467 commit a5cd62c
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 1 deletion.
2 changes: 1 addition & 1 deletion mlrun/platforms/__init__.py
Expand Up @@ -26,7 +26,7 @@
mount_v3io_legacy,
v3io_cred,
)
from .other import auto_mount, mount_pvc
from .other import auto_mount, mount_configmap, mount_hostpath, mount_pvc


def watch_stream(
Expand Down
51 changes: 51 additions & 0 deletions mlrun/platforms/other.py
Expand Up @@ -112,3 +112,54 @@ def _mount_secret(task):
)

return _mount_secret


def mount_configmap(configmap_name, mount_path, volume_name="configmap", items=None):
"""Modifier function to mount kubernetes configmap as files(s)
:param configmap_name: k8s configmap name
:param mount_path: path to mount inside the container
:param volume_name: unique volume name
:param items: If unspecified, each key-value pair in the Data field
of the referenced Configmap will be projected into the
volume as a file whose name is the key and content is
the value.
If specified, the listed keys will be projected into
the specified paths, and unlisted keys will not be
present.
"""

def _mount_configmap(task):
from kubernetes import client as k8s_client

vol = k8s_client.V1ConfigMapVolumeSource(name=configmap_name, items=items)
return task.add_volume(
k8s_client.V1Volume(name=volume_name, config_map=vol)
).add_volume_mount(
k8s_client.V1VolumeMount(mount_path=mount_path, name=volume_name)
)

return _mount_configmap


def mount_hostpath(host_path, mount_path, volume_name="hostpath"):
"""Modifier function to mount kubernetes configmap as files(s)
:param host_path: host path
:param mount_path: path to mount inside the container
:param volume_name: unique volume name
"""

def _mount_hostpath(task):
from kubernetes import client as k8s_client

return task.add_volume(
k8s_client.V1Volume(
name=volume_name,
host_path=k8s_client.V1HostPathVolumeSource(path=host_path, type=""),
)
).add_volume_mount(
k8s_client.V1VolumeMount(mount_path=mount_path, name=volume_name)
)

return _mount_hostpath
29 changes: 29 additions & 0 deletions mlrun/runtimes/sparkjob/spark3job.py
Expand Up @@ -56,6 +56,7 @@ def __init__(
use_default_image=False,
priority_class_name=None,
dynamic_allocation=None,
monitoring=None,
):

super().__init__(
Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(
self.dynamic_allocation = dynamic_allocation or {}
self.driver_node_selector = driver_node_selector
self.executor_node_selector = executor_node_selector
self.monitoring = monitoring or {}


class Spark3Runtime(AbstractSparkRuntime):
Expand Down Expand Up @@ -154,6 +156,16 @@ def _enrich_job(self, job):
update_in(
job, "spec.executor.nodeSelector", self.spec.executor_node_selector
)
if self.spec.monitoring:
if "enabled" in self.spec.monitoring and self.spec.monitoring["enabled"]:
update_in(job, "spec.monitoring.exposeDriverMetrics", True)
update_in(job, "spec.monitoring.exposeExecutorMetrics", True)
if "exporter_jar" in self.spec.monitoring:
update_in(
job,
"spec.monitoring.prometheus.jmxExporterJar",
self.spec.monitoring["exporter_jar"],
)
return

def _get_spark_version(self):
Expand All @@ -166,6 +178,7 @@ def _get_igz_deps(self):
"local:///spark/v3io-libs/v3io-spark3-streaming_2.12.jar",
"local:///spark/v3io-libs/v3io-spark3-object-dataframe_2.12.jar",
"local:///igz/java/libs/scala-library-2.12.14.jar",
"local:///spark/jars/jmx_prometheus_javaagent-0.16.1.jar",
],
"files": ["local:///igz/java/libs/v3io-pyspark.zip"],
}
Expand Down Expand Up @@ -248,3 +261,19 @@ def with_dynamic_allocation(
self.spec.dynamic_allocation["maxExecutors"] = max_executors
if initial_executors:
self.spec.dynamic_allocation["initialExecutors"] = initial_executors

def disable_monitoring(self):
self.spec.monitoring["enabled"] = False

def _with_monitoring(self, enabled=True, exporter_jar=None):
self.spec.monitoring["enabled"] = enabled
if enabled:
if exporter_jar:
self.spec.monitoring["exporter_jar"] = exporter_jar

def with_igz_spark(self):
super().with_igz_spark()
if "enabled" not in self.spec.monitoring or self.spec.monitoring["enabled"]:
self._with_monitoring(
exporter_jar="/spark/jars/jmx_prometheus_javaagent-0.16.1.jar",
)
56 changes: 56 additions & 0 deletions tests/platforms/test_other.py
@@ -0,0 +1,56 @@
import deepdiff

import mlrun
import mlrun.errors


def test_mount_configmap():
expected_volume = {"configMap": {"name": "my-config-map"}, "name": "my-volume"}
expected_volume_mount = {"mountPath": "/myConfMapPath", "name": "my-volume"}

function = mlrun.new_function(
"function-name", "function-project", kind=mlrun.runtimes.RuntimeKinds.job
)
function.apply(
mlrun.platforms.mount_configmap(
configmap_name="my-config-map",
mount_path="/myConfMapPath",
volume_name="my-volume",
)
)

assert (
deepdiff.DeepDiff([expected_volume], function.spec.volumes, ignore_order=True,)
== {}
)
assert (
deepdiff.DeepDiff(
[expected_volume_mount], function.spec.volume_mounts, ignore_order=True,
)
== {}
)


def test_mount_hostpath():
expected_volume = {"hostPath": {"path": "/tmp", "type": ""}, "name": "my-volume"}
expected_volume_mount = {"mountPath": "/myHostPath", "name": "my-volume"}

function = mlrun.new_function(
"function-name", "function-project", kind=mlrun.runtimes.RuntimeKinds.job
)
function.apply(
mlrun.platforms.mount_hostpath(
host_path="/tmp", mount_path="/myHostPath", volume_name="my-volume"
)
)

assert (
deepdiff.DeepDiff([expected_volume], function.spec.volumes, ignore_order=True,)
== {}
)
assert (
deepdiff.DeepDiff(
[expected_volume_mount], function.spec.volume_mounts, ignore_order=True,
)
== {}
)

0 comments on commit a5cd62c

Please sign in to comment.