From a5cd62c7ec8a7faf3c695bfe0a8b321db65f20a2 Mon Sep 17 00:00:00 2001 From: urihoenig Date: Wed, 20 Oct 2021 00:46:00 +0300 Subject: [PATCH] [Runtimes] Support Sparkjob monitoring & add new mounts support (#1427) --- mlrun/platforms/__init__.py | 2 +- mlrun/platforms/other.py | 51 +++++++++++++++++++++++++ mlrun/runtimes/sparkjob/spark3job.py | 29 ++++++++++++++ tests/platforms/test_other.py | 56 ++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 tests/platforms/test_other.py diff --git a/mlrun/platforms/__init__.py b/mlrun/platforms/__init__.py index 23ede3572a2..5bd06862ebe 100644 --- a/mlrun/platforms/__init__.py +++ b/mlrun/platforms/__init__.py @@ -26,7 +26,7 @@ mount_v3io_legacy, v3io_cred, ) -from .other import auto_mount, mount_pvc +from .other import auto_mount, mount_configmap, mount_hostpath, mount_pvc def watch_stream( diff --git a/mlrun/platforms/other.py b/mlrun/platforms/other.py index 51703579a25..f4625c99f76 100644 --- a/mlrun/platforms/other.py +++ b/mlrun/platforms/other.py @@ -112,3 +112,54 @@ def _mount_secret(task): ) return _mount_secret + + +def mount_configmap(configmap_name, mount_path, volume_name="configmap", items=None): + """Modifier function to mount kubernetes configmap as files(s) + + :param configmap_name: k8s configmap name + :param mount_path: path to mount inside the container + :param volume_name: unique volume name + :param items: If unspecified, each key-value pair in the Data field + of the referenced Configmap will be projected into the + volume as a file whose name is the key and content is + the value. + If specified, the listed keys will be projected into + the specified paths, and unlisted keys will not be + present. + """ + + def _mount_configmap(task): + from kubernetes import client as k8s_client + + vol = k8s_client.V1ConfigMapVolumeSource(name=configmap_name, items=items) + return task.add_volume( + k8s_client.V1Volume(name=volume_name, config_map=vol) + ).add_volume_mount( + k8s_client.V1VolumeMount(mount_path=mount_path, name=volume_name) + ) + + return _mount_configmap + + +def mount_hostpath(host_path, mount_path, volume_name="hostpath"): + """Modifier function to mount kubernetes configmap as files(s) + + :param host_path: host path + :param mount_path: path to mount inside the container + :param volume_name: unique volume name + """ + + def _mount_hostpath(task): + from kubernetes import client as k8s_client + + return task.add_volume( + k8s_client.V1Volume( + name=volume_name, + host_path=k8s_client.V1HostPathVolumeSource(path=host_path, type=""), + ) + ).add_volume_mount( + k8s_client.V1VolumeMount(mount_path=mount_path, name=volume_name) + ) + + return _mount_hostpath diff --git a/mlrun/runtimes/sparkjob/spark3job.py b/mlrun/runtimes/sparkjob/spark3job.py index 3296e33a319..1b54f4107e5 100644 --- a/mlrun/runtimes/sparkjob/spark3job.py +++ b/mlrun/runtimes/sparkjob/spark3job.py @@ -56,6 +56,7 @@ def __init__( use_default_image=False, priority_class_name=None, dynamic_allocation=None, + monitoring=None, ): super().__init__( @@ -94,6 +95,7 @@ def __init__( self.dynamic_allocation = dynamic_allocation or {} self.driver_node_selector = driver_node_selector self.executor_node_selector = executor_node_selector + self.monitoring = monitoring or {} class Spark3Runtime(AbstractSparkRuntime): @@ -154,6 +156,16 @@ def _enrich_job(self, job): update_in( job, "spec.executor.nodeSelector", self.spec.executor_node_selector ) + if self.spec.monitoring: + if "enabled" in self.spec.monitoring and self.spec.monitoring["enabled"]: + update_in(job, "spec.monitoring.exposeDriverMetrics", True) + update_in(job, "spec.monitoring.exposeExecutorMetrics", True) + if "exporter_jar" in self.spec.monitoring: + update_in( + job, + "spec.monitoring.prometheus.jmxExporterJar", + self.spec.monitoring["exporter_jar"], + ) return def _get_spark_version(self): @@ -166,6 +178,7 @@ def _get_igz_deps(self): "local:///spark/v3io-libs/v3io-spark3-streaming_2.12.jar", "local:///spark/v3io-libs/v3io-spark3-object-dataframe_2.12.jar", "local:///igz/java/libs/scala-library-2.12.14.jar", + "local:///spark/jars/jmx_prometheus_javaagent-0.16.1.jar", ], "files": ["local:///igz/java/libs/v3io-pyspark.zip"], } @@ -248,3 +261,19 @@ def with_dynamic_allocation( self.spec.dynamic_allocation["maxExecutors"] = max_executors if initial_executors: self.spec.dynamic_allocation["initialExecutors"] = initial_executors + + def disable_monitoring(self): + self.spec.monitoring["enabled"] = False + + def _with_monitoring(self, enabled=True, exporter_jar=None): + self.spec.monitoring["enabled"] = enabled + if enabled: + if exporter_jar: + self.spec.monitoring["exporter_jar"] = exporter_jar + + def with_igz_spark(self): + super().with_igz_spark() + if "enabled" not in self.spec.monitoring or self.spec.monitoring["enabled"]: + self._with_monitoring( + exporter_jar="/spark/jars/jmx_prometheus_javaagent-0.16.1.jar", + ) diff --git a/tests/platforms/test_other.py b/tests/platforms/test_other.py new file mode 100644 index 00000000000..5235eab65b4 --- /dev/null +++ b/tests/platforms/test_other.py @@ -0,0 +1,56 @@ +import deepdiff + +import mlrun +import mlrun.errors + + +def test_mount_configmap(): + expected_volume = {"configMap": {"name": "my-config-map"}, "name": "my-volume"} + expected_volume_mount = {"mountPath": "/myConfMapPath", "name": "my-volume"} + + function = mlrun.new_function( + "function-name", "function-project", kind=mlrun.runtimes.RuntimeKinds.job + ) + function.apply( + mlrun.platforms.mount_configmap( + configmap_name="my-config-map", + mount_path="/myConfMapPath", + volume_name="my-volume", + ) + ) + + assert ( + deepdiff.DeepDiff([expected_volume], function.spec.volumes, ignore_order=True,) + == {} + ) + assert ( + deepdiff.DeepDiff( + [expected_volume_mount], function.spec.volume_mounts, ignore_order=True, + ) + == {} + ) + + +def test_mount_hostpath(): + expected_volume = {"hostPath": {"path": "/tmp", "type": ""}, "name": "my-volume"} + expected_volume_mount = {"mountPath": "/myHostPath", "name": "my-volume"} + + function = mlrun.new_function( + "function-name", "function-project", kind=mlrun.runtimes.RuntimeKinds.job + ) + function.apply( + mlrun.platforms.mount_hostpath( + host_path="/tmp", mount_path="/myHostPath", volume_name="my-volume" + ) + ) + + assert ( + deepdiff.DeepDiff([expected_volume], function.spec.volumes, ignore_order=True,) + == {} + ) + assert ( + deepdiff.DeepDiff( + [expected_volume_mount], function.spec.volume_mounts, ignore_order=True, + ) + == {} + )