From 6cad4b05c555c926662a83d2924c2f641775f504 Mon Sep 17 00:00:00 2001 From: Chengjie Li <109656400+ChengjieLi28@users.noreply.github.com> Date: Mon, 8 May 2023 15:10:04 +0800 Subject: [PATCH] CLN: Remove k8s deploy in _mars (#431) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../_mars/deploy/kubernetes/__init__.py | 16 - .../xorbits/_mars/deploy/kubernetes/client.py | 481 ------------- .../xorbits/_mars/deploy/kubernetes/config.py | 674 ------------------ .../_mars/deploy/kubernetes/config.yml | 7 - .../xorbits/_mars/deploy/kubernetes/core.py | 223 ------ .../_mars/deploy/kubernetes/docker/Dockerfile | 23 - .../deploy/kubernetes/docker/Dockerfile.base | 32 - .../deploy/kubernetes/docker/entrypoint.sh | 8 - .../_mars/deploy/kubernetes/docker/retry.sh | 13 - .../_mars/deploy/kubernetes/supervisor.py | 33 - .../deploy/kubernetes/tests/Dockerfile.test | 18 - .../_mars/deploy/kubernetes/tests/__init__.py | 14 - .../deploy/kubernetes/tests/build_ext.sh | 4 - .../kubernetes/tests/docker-logging.conf | 50 -- .../deploy/kubernetes/tests/entrypoint.sh | 20 - .../deploy/kubernetes/tests/graceful_stop.sh | 16 - .../deploy/kubernetes/tests/test_config.py | 130 ---- .../kubernetes/tests/test_kubernetes.py | 285 -------- .../xorbits/_mars/deploy/kubernetes/worker.py | 56 -- 19 files changed, 2103 deletions(-) delete mode 100644 python/xorbits/_mars/deploy/kubernetes/__init__.py delete mode 100644 python/xorbits/_mars/deploy/kubernetes/client.py delete mode 100644 python/xorbits/_mars/deploy/kubernetes/config.py delete mode 100644 python/xorbits/_mars/deploy/kubernetes/config.yml delete mode 100644 python/xorbits/_mars/deploy/kubernetes/core.py delete mode 100644 python/xorbits/_mars/deploy/kubernetes/docker/Dockerfile delete mode 100644 python/xorbits/_mars/deploy/kubernetes/docker/Dockerfile.base delete mode 100755 python/xorbits/_mars/deploy/kubernetes/docker/entrypoint.sh delete mode 100755 python/xorbits/_mars/deploy/kubernetes/docker/retry.sh delete mode 100644 python/xorbits/_mars/deploy/kubernetes/supervisor.py delete mode 100644 python/xorbits/_mars/deploy/kubernetes/tests/Dockerfile.test delete mode 100644 python/xorbits/_mars/deploy/kubernetes/tests/__init__.py delete mode 100644 python/xorbits/_mars/deploy/kubernetes/tests/build_ext.sh delete mode 100644 python/xorbits/_mars/deploy/kubernetes/tests/docker-logging.conf delete mode 100755 python/xorbits/_mars/deploy/kubernetes/tests/entrypoint.sh delete mode 100644 python/xorbits/_mars/deploy/kubernetes/tests/graceful_stop.sh delete mode 100644 python/xorbits/_mars/deploy/kubernetes/tests/test_config.py delete mode 100644 python/xorbits/_mars/deploy/kubernetes/tests/test_kubernetes.py delete mode 100644 python/xorbits/_mars/deploy/kubernetes/worker.py diff --git a/python/xorbits/_mars/deploy/kubernetes/__init__.py b/python/xorbits/_mars/deploy/kubernetes/__init__.py deleted file mode 100644 index 9bbbe4a3c..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .client import KubernetesClusterClient, new_cluster diff --git a/python/xorbits/_mars/deploy/kubernetes/client.py b/python/xorbits/_mars/deploy/kubernetes/client.py deleted file mode 100644 index d8c4d8174..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/client.py +++ /dev/null @@ -1,481 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import functools -import logging -import random -import time -import uuid -from urllib.parse import urlparse - -from ...lib.aio import new_isolation, stop_isolation -from ...services.cluster.api import WebClusterAPI -from ...session import new_session -from ...utils import calc_size_by_str -from ..utils import wait_services_ready -from .config import ( - MarsSupervisorsConfig, - MarsWorkersConfig, - NamespaceConfig, - RoleBindingConfig, - RoleConfig, - ServiceConfig, -) - -try: - from kubernetes.client.rest import ApiException as K8SApiException -except ImportError: # pragma: no cover - K8SApiException = None - -logger = logging.getLogger(__name__) - - -class KubernetesClusterClient: - def __init__(self, cluster): - self._cluster = cluster - self._endpoint = None - self._session = None - - @property - def endpoint(self): - return self._endpoint - - @property - def namespace(self): - return self._cluster.namespace - - @property - def session(self): - return self._session - - def start(self): - try: - self._endpoint = self._cluster.start() - self._session = new_session(self._endpoint) - except: # noqa: E722 # nosec # pylint: disable=bare-except - self.stop() - raise - - def stop(self, wait=False, timeout=0): - self._cluster.stop(wait=wait, timeout=timeout) - - -class KubernetesCluster: - _supervisor_config_cls = MarsSupervisorsConfig - _worker_config_cls = MarsWorkersConfig - _default_service_port = 7103 - _default_web_port = 7104 - - def __init__( - self, - kube_api_client=None, - image=None, - namespace=None, - supervisor_num=1, - supervisor_cpu=1, - supervisor_mem="4G", - supervisor_mem_limit_ratio=None, - worker_num=1, - worker_cpu=None, - worker_mem=None, - worker_spill_paths=None, - worker_cache_mem=None, - min_worker_num=None, - worker_min_cache_mem=None, - worker_mem_limit_ratio=None, - web_port=None, - service_name=None, - service_type=None, - timeout=None, - **kwargs, - ): - from kubernetes import client as kube_client - - if worker_cpu is None or worker_mem is None: # pragma: no cover - raise TypeError("`worker_cpu` and `worker_mem` must be specified") - - self._api_client = kube_api_client - self._core_api = kube_client.CoreV1Api(kube_api_client) - - self._namespace = namespace - self._image = image - self._timeout = timeout - self._service_name = service_name or "marsservice" - self._service_type = service_type or "NodePort" - self._extra_volumes = kwargs.pop("extra_volumes", ()) - self._pre_stop_command = kwargs.pop("pre_stop_command", None) - self._log_when_fail = kwargs.pop("log_when_fail", False) - - extra_modules = kwargs.pop("extra_modules", None) or [] - extra_modules = ( - extra_modules.split(",") - if isinstance(extra_modules, str) - else extra_modules - ) - extra_envs = kwargs.pop("extra_env", None) or dict() - extra_labels = kwargs.pop("extra_labels", None) or dict() - service_port = kwargs.pop("service_port", None) or self._default_service_port - - def _override_modules(updates): - modules = set(extra_modules) - updates = updates.split(",") if isinstance(updates, str) else updates - modules.update(updates) - return sorted(modules) - - def _override_dict(d, updates): - updates = updates or dict() - ret = d.copy() - ret.update(updates) - return ret - - _override_envs = functools.partial(_override_dict, extra_envs) - _override_labels = functools.partial(_override_dict, extra_labels) - - self._supervisor_num = supervisor_num - self._supervisor_cpu = supervisor_cpu - self._supervisor_mem = calc_size_by_str(supervisor_mem, None) - self._supervisor_mem_limit_ratio = supervisor_mem_limit_ratio - self._supervisor_extra_modules = _override_modules( - kwargs.pop("supervisor_extra_modules", []) - ) - self._supervisor_extra_env = _override_envs( - kwargs.pop("supervisor_extra_env", None) - ) - self._supervisor_extra_labels = _override_labels( - kwargs.pop("supervisor_extra_labels", None) - ) - self._supervisor_service_port = ( - kwargs.pop("supervisor_service_port", None) or service_port - ) - self._web_port = web_port or self._default_web_port - self._external_web_endpoint = None - - self._worker_num = worker_num - self._worker_cpu = worker_cpu - self._worker_mem = calc_size_by_str(worker_mem, None) - self._worker_mem_limit_ratio = worker_mem_limit_ratio - self._worker_spill_paths = worker_spill_paths - self._worker_cache_mem = worker_cache_mem - self._worker_min_cache_men = worker_min_cache_mem - self._min_worker_num = min_worker_num - self._worker_extra_modules = _override_modules( - kwargs.pop("worker_extra_modules", []) - ) - self._worker_extra_env = _override_envs(kwargs.pop("worker_extra_env", None)) - self._worker_extra_labels = _override_labels( - kwargs.pop("worker_extra_labels", None) - ) - self._worker_service_port = ( - kwargs.pop("worker_service_port", None) or service_port - ) - - @property - def namespace(self): - return self._namespace - - def _get_free_namespace(self): - while True: - namespace = "mars-ns-" + str(uuid.uuid4().hex) - try: - self._core_api.read_namespace(namespace) - except K8SApiException as ex: - if ex.status != 404: # pragma: no cover - raise - return namespace - - def _create_kube_service(self): - if self._service_type != "NodePort": # pragma: no cover - raise NotImplementedError( - f"Service type {self._service_type} not supported" - ) - - service_config = ServiceConfig( - self._service_name, - service_type="NodePort", - port=self._web_port, - selector={"mars/service-type": MarsSupervisorsConfig.rc_name}, - ) - self._core_api.create_namespaced_service( - self._namespace, service_config.build() - ) - - def _get_ready_pod_count(self, label_selector): - query = self._core_api.list_namespaced_pod( - namespace=self._namespace, label_selector=label_selector - ).to_dict() - cnt = 0 - for el in query["items"]: - if el["status"]["phase"] in ("Error", "Failed"): - logger.warning( - "Error in starting pod, message: %s", el["status"]["message"] - ) - continue - if "status" not in el or "conditions" not in el["status"]: - cnt += 1 - elif any( - cond["type"] == "Ready" and cond["status"] == "True" - for cond in el["status"].get("conditions") or () - ): - cnt += 1 - return cnt - - def _create_namespace(self): - if self._namespace is None: - namespace = self._namespace = self._get_free_namespace() - else: - namespace = self._namespace - - self._core_api.create_namespace(NamespaceConfig(namespace).build()) - - def _create_roles_and_bindings(self): - # create role and binding - role_config = RoleConfig( - "mars-pod-operator", - self._namespace, - api_groups="", - resources="pods,endpoints,services", - verbs="get,watch,list,patch", - ) - role_config.create_namespaced(self._api_client, self._namespace) - role_binding_config = RoleBindingConfig( - "mars-pod-operator-binding", self._namespace, "mars-pod-operator", "default" - ) - role_binding_config.create_namespaced(self._api_client, self._namespace) - - def _create_supervisors(self): - supervisors_config = self._supervisor_config_cls( - self._supervisor_num, - image=self._image, - cpu=self._supervisor_cpu, - memory=self._supervisor_mem, - memory_limit_ratio=self._supervisor_mem_limit_ratio, - modules=self._supervisor_extra_modules, - volumes=self._extra_volumes, - service_name=self._service_name, - service_port=self._supervisor_service_port, - web_port=self._web_port, - pre_stop_command=self._pre_stop_command, - ) - supervisors_config.add_simple_envs(self._supervisor_extra_env) - supervisors_config.add_labels(self._supervisor_extra_labels) - supervisors_config.create_namespaced(self._api_client, self._namespace) - - def _create_workers(self): - workers_config = self._worker_config_cls( - self._worker_num, - image=self._image, - cpu=self._worker_cpu, - memory=self._worker_mem, - memory_limit_ratio=self._worker_mem_limit_ratio, - spill_volumes=self._worker_spill_paths, - modules=self._worker_extra_modules, - volumes=self._extra_volumes, - worker_cache_mem=self._worker_cache_mem, - min_cache_mem=self._worker_min_cache_men, - service_name=self._service_name, - service_port=self._worker_service_port, - pre_stop_command=self._pre_stop_command, - supervisor_web_port=self._web_port, - ) - workers_config.add_simple_envs(self._worker_extra_env) - workers_config.add_labels(self._worker_extra_labels) - workers_config.create_namespaced(self._api_client, self._namespace) - - def _create_services(self): - self._create_supervisors() - self._create_workers() - - def _wait_services_ready(self): - min_worker_num = int(self._min_worker_num or self._worker_num) - limits = [self._supervisor_num, min_worker_num] - selectors = [ - "mars/service-type=" + MarsSupervisorsConfig.rc_name, - "mars/service-type=" + MarsWorkersConfig.rc_name, - ] - start_time = time.time() - logger.debug("Start waiting pods to be ready") - wait_services_ready( - selectors, - limits, - lambda sel: self._get_ready_pod_count(sel), - timeout=self._timeout, - ) - logger.info("All service pods ready.") - if self._timeout is not None: # pragma: no branch - self._timeout -= time.time() - start_time - - def _get_web_address(self): - svc_data = self._core_api.read_namespaced_service( - "marsservice", self._namespace - ).to_dict() - node_port = svc_data["spec"]["ports"][0]["node_port"] - - # docker desktop use a VM to hold docker processes, hence - # we need to use API address instead - desktop_nodes = self._core_api.list_node( - field_selector="metadata.name=docker-desktop" - ).to_dict() - if desktop_nodes["items"]: # pragma: no cover - host_ip = urlparse( - self._core_api.api_client.configuration.host - ).netloc.split(":", 1)[0] - else: - web_pods = self._core_api.list_namespaced_pod( - self._namespace, - label_selector="mars/service-type=" + MarsSupervisorsConfig.rc_name, - ).to_dict() - host_ip = random.choice(web_pods["items"])["status"]["host_ip"] - return f"http://{host_ip}:{node_port}" - - def _wait_web_ready(self): - loop = new_isolation().loop - - async def get_supervisors(): - start_time = time.time() - while True: - try: - cluster_api = WebClusterAPI(self._external_web_endpoint) - supervisors = await cluster_api.get_supervisors() - - if len(supervisors) == self._supervisor_num: - break - except: # noqa: E722 # nosec # pylint: disable=bare-except # pragma: no cover - if ( - self._timeout is not None - and time.time() - start_time > self._timeout - ): - logger.exception("Error when fetching supervisors") - raise TimeoutError( - "Wait for kubernetes cluster timed out" - ) from None - - asyncio.run_coroutine_threadsafe(get_supervisors(), loop).result() - - def _load_cluster_logs(self): - log_dict = dict() - pod_items = self._core_api.list_namespaced_pod(self._namespace).to_dict() - for item in pod_items["items"]: - log_dict[item["metadata"]["name"]] = self._core_api.read_namespaced_pod_log( - name=item["metadata"]["name"], namespace=self._namespace - ) - return log_dict - - def start(self): - try: - self._create_namespace() - self._create_roles_and_bindings() - - self._create_services() - self._create_kube_service() - - self._wait_services_ready() - - self._external_web_endpoint = self._get_web_address() - self._wait_web_ready() - return self._external_web_endpoint - except: # noqa: E722 - if self._log_when_fail: # pargma: no cover - logger.error("Error when creating cluster") - for name, log in self._load_cluster_logs().items(): - logger.error("Error logs for %s:\n%s", name, log) - self.stop() - raise - - def stop(self, wait=False, timeout=0): - # stop isolation - stop_isolation() - - from kubernetes.client import CoreV1Api - - api = CoreV1Api(self._api_client) - api.delete_namespace(self._namespace) - if wait: - start_time = time.time() - while True: - try: - api.read_namespace(self._namespace) - except K8SApiException as ex: - if ex.status != 404: # pragma: no cover - raise - break - else: - time.sleep(1) - if ( - timeout and time.time() - start_time > timeout - ): # pragma: no cover - raise TimeoutError - - -def new_cluster( - kube_api_client=None, - image=None, - supervisor_num=1, - supervisor_cpu=None, - supervisor_mem=None, - worker_num=1, - worker_cpu=None, - worker_mem=None, - worker_spill_paths=None, - worker_cache_mem=None, - min_worker_num=None, - web_num=1, - web_cpu=None, - web_mem=None, - service_type=None, - timeout=None, - **kwargs, -): - """ - :param kube_api_client: Kubernetes API client, can be created with ``new_client_from_config`` - :param image: Docker image to use, ``marsproject/mars:`` by default - :param supervisor_num: Number of supervisors in the cluster, 1 by default - :param supervisor_cpu: Number of CPUs for every supervisor - :param supervisor_mem: Memory size for every supervisor - :param worker_num: Number of workers in the cluster, 1 by default - :param worker_cpu: Number of CPUs for every worker - :param worker_mem: Memory size for every worker - :param worker_spill_paths: Spill paths for worker pods on hosts - :param worker_cache_mem: Size or ratio of cache memory for every worker - :param min_worker_num: Minimal ready workers - :param web_num: Number of web services in the cluster, 1 by default - :param web_cpu: Number of CPUs for every web service - :param web_mem: Memory size for every web service - :param service_type: Type of Kubernetes Service, currently only ``NodePort`` supported - :param timeout: Timeout when creating clusters - """ - cluster_cls = kwargs.pop("cluster_cls", KubernetesCluster) - cluster = cluster_cls( - kube_api_client, - image=image, - supervisor_num=supervisor_num, - supervisor_cpu=supervisor_cpu, - supervisor_mem=supervisor_mem, - worker_num=worker_num, - worker_cpu=worker_cpu, - worker_mem=worker_mem, - worker_spill_paths=worker_spill_paths, - worker_cache_mem=worker_cache_mem, - min_worker_num=min_worker_num, - web_num=web_num, - web_cpu=web_cpu, - web_mem=web_mem, - service_type=service_type, - timeout=timeout, - **kwargs, - ) - client = KubernetesClusterClient(cluster) - client.start() - return client diff --git a/python/xorbits/_mars/deploy/kubernetes/config.py b/python/xorbits/_mars/deploy/kubernetes/config.py deleted file mode 100644 index 09e0e65f7..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/config.py +++ /dev/null @@ -1,674 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import abc -import functools -import math -import re - -from ... import __version__ as mars_version -from ...utils import calc_size_by_str, parse_readable_size - -DEFAULT_IMAGE = "marsproject/mars:v" + mars_version -DEFAULT_WORKER_CACHE_MEM = "40%" - - -def _remove_nones(cfg): - return dict((k, v) for k, v in cfg.items() if v is not None) - - -_kube_api_mapping = { - "v1": "CoreV1Api", - "apps/v1": "AppsV1Api", - "rbac.authorization.k8s.io/v1": "RbacAuthorizationV1Api", -} - - -@functools.lru_cache(10) -def _get_k8s_api(api_version, k8s_api_client): - from kubernetes import client as kube_client - - return getattr(kube_client, _kube_api_mapping[api_version])(k8s_api_client) - - -@functools.lru_cache(10) -def _camel_to_underline(name): - s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) - return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower() - - -class KubeConfig(abc.ABC): - api_version = "v1" - - def create_namespaced(self, k8s_api_client, namespace): - api = _get_k8s_api(self.api_version, k8s_api_client) - config = self.build() - method_name = f'create_namespaced_{_camel_to_underline(config["kind"])}' - return getattr(api, method_name)(namespace, config) - - @abc.abstractmethod - def build(self): - """Build config dict of the object""" - - -class RoleConfig(KubeConfig): - """ - Configuration builder for Kubernetes RBAC roles - """ - - api_version = "rbac.authorization.k8s.io/v1" - - def __init__(self, name, namespace, api_groups, resources, verbs): - self._name = name - self._namespace = namespace - self._api_groups = api_groups.split(",") - self._resources = resources.split(",") - self._verbs = verbs.split(",") - - def build(self): - return { - "kind": "Role", - "metadata": {"name": self._name, "namespace": self._namespace}, - "rules": [ - { - "apiGroups": self._api_groups, - "resources": self._resources, - "verbs": self._verbs, - } - ], - } - - -class RoleBindingConfig(KubeConfig): - """ - Configuration builder for Kubernetes RBAC role bindings - """ - - api_version = "rbac.authorization.k8s.io/v1" - - def __init__(self, name, namespace, role_name, service_account_name): - self._name = name - self._namespace = namespace - self._role_name = role_name - self._service_account_name = service_account_name - - def build(self): - return { - "kind": "RoleBinding", - "metadata": {"name": self._name, "namespace": self._namespace}, - "roleRef": { - "apiGroup": "rbac.authorization.k8s.io", - "kind": "Role", - "name": self._role_name, - }, - "subjects": [ - { - "kind": "ServiceAccount", - "name": self._service_account_name, - "namespace": self._namespace, - } - ], - } - - -class NamespaceConfig(KubeConfig): - """ - Configuration builder for Kubernetes namespaces - """ - - def __init__(self, name): - self._name = name - - def build(self): - return { - "kind": "Namespace", - "metadata": { - "name": self._name, - "labels": { - "name": self._name, - }, - }, - } - - -class ServiceConfig(KubeConfig): - """ - Configuration builder for Kubernetes services - """ - - def __init__( - self, name, service_type, selector, port, target_port=None, protocol=None - ): - self._name = name - self._type = service_type - self._protocol = protocol or "TCP" - self._selector = selector - self._port = port - self._target_port = target_port - - def build(self): - return { - "kind": "Service", - "metadata": { - "name": self._name, - "labels": { - "mars/service-name": self._name, - }, - }, - "spec": _remove_nones( - { - "type": self._type, - "selector": self._selector, - "ports": [ - _remove_nones( - { - "protocol": self._protocol, - "port": self._port, - "targetPort": self._target_port, - } - ), - ], - } - ), - } - - -class ResourceConfig: - """ - Configuration builder for Kubernetes computation resources - """ - - def __init__(self, cpu, memory): - self._cpu = cpu - self._memory, ratio = ( - parse_readable_size(memory) if memory is not None else (None, False) - ) - assert not ratio - - @property - def cpu(self): - return self._cpu - - @property - def memory(self): - return self._memory - - def build(self): - return _remove_nones( - { - "cpu": f"{int(self._cpu * 1000)}m" if self._cpu else None, - "memory": str(int(self._memory)) if self._memory else None, - } - ) - - -class PortConfig: - """ - Configuration builder for Kubernetes ports definition for containers - """ - - def __init__(self, container_port): - self._container_port = int(container_port) - - def build(self): - return { - "containerPort": self._container_port, - } - - -class VolumeConfig(abc.ABC): - """ - Base configuration builder for Kubernetes volumes - """ - - def __init__(self, name, mount_path): - self.name = name - self.mount_path = mount_path - - @abc.abstractmethod - def build(self): - """Build volume config""" - - def build_mount(self): - return { - "name": self.name, - "mountPath": self.mount_path, - } - - -class HostPathVolumeConfig(VolumeConfig): - """ - Configuration builder for Kubernetes host volumes - """ - - def __init__(self, name, mount_path, host_path, volume_type=None): - super().__init__(name, mount_path) - self._host_path = host_path - self._volume_type = volume_type or "DirectoryOrCreate" - - def build(self): - return { - "name": self.name, - "hostPath": {"path": self._host_path, "type": self._volume_type}, - } - - -class EmptyDirVolumeConfig(VolumeConfig): - """ - Configuration builder for Kubernetes empty-dir volumes - """ - - def __init__(self, name, mount_path, use_memory=True, size_limit=None): - super().__init__(name, mount_path) - self._medium = "Memory" if use_memory else None - self._size_limit = size_limit - - def build(self): - result = {"name": self.name, "emptyDir": {}} - if self._medium: - result["emptyDir"]["medium"] = self._medium - if self._size_limit: - result["emptyDir"]["sizeLimit"] = str(int(self._size_limit)) - return result - - -class ContainerEnvConfig: - """ - Configuration builder for Kubernetes container environments - """ - - def __init__(self, name, value=None, field_path=None): - self._name = name - self._value = value - self._field_path = field_path - - def build(self): - result = dict(name=self._name) - if self._value is not None: - result["value"] = str(self._value) - elif self._field_path is not None: # pragma: no branch - result["valueFrom"] = {"fieldRef": {"fieldPath": self._field_path}} - return result - - -class ProbeConfig: - """ - Base configuration builder for Kubernetes liveness and readiness probes - """ - - def __init__( - self, - initial_delay=5, - period=1, - timeout=None, - success_thresh=None, - failure_thresh=None, - ): - self._initial_delay = initial_delay - self._period = period - self._timeout = timeout - self._success_thresh = success_thresh - self._failure_thresh = failure_thresh - - def build(self): - return _remove_nones( - { - "initialDelaySeconds": self._initial_delay, - "periodSeconds": self._period, - "timeoutSeconds": self._timeout, - "successThreshold": self._success_thresh, - "failureThreshold": self._failure_thresh, - } - ) - - -class TcpSocketProbeConfig(ProbeConfig): - """ - Configuration builder for TCP liveness and readiness probes - """ - - def __init__(self, port: int, **kwargs): - super().__init__(**kwargs) - self._port = port - - def build(self): - ret = super().build() - ret["tcpSocket"] = {"port": self._port} - return ret - - -class ReplicationConfig(KubeConfig): - """ - Base configuration builder for Kubernetes replication controllers - """ - - _default_kind = "Deployment" - - def __init__( - self, - name, - image, - replicas, - resource_request=None, - resource_limit=None, - liveness_probe=None, - readiness_probe=None, - pre_stop_command=None, - kind=None, - ): - self._name = name - self._kind = kind or self._default_kind - self._image = image - self._replicas = replicas - self._ports = [] - self._volumes = [] - self._envs = dict() - self._labels = dict() - - self.add_default_envs() - - self._resource_request = resource_request - self._resource_limit = resource_limit - - self._liveness_probe = liveness_probe - self._readiness_probe = readiness_probe - - self._pre_stop_command = pre_stop_command - - @property - def api_version(self): - return "apps/v1" if self._kind in ("Deployment", "ReplicaSet") else "v1" - - def add_env(self, name, value=None, field_path=None): - self._envs[name] = ContainerEnvConfig(name, value=value, field_path=field_path) - - def remove_env(self, name): # pragma: no cover - self._envs.pop(name, None) - - def add_simple_envs(self, envs): - for k, v in envs.items() or (): - self.add_env(k, v) - - def add_labels(self, labels): - self._labels.update(labels) - - def add_port(self, container_port): - self._ports.append(PortConfig(container_port)) - - def add_default_envs(self): - pass # pragma: no cover - - def add_volume(self, vol): - self._volumes.append(vol) - - @abc.abstractmethod - def build_container_command(self): - """Output container command""" - - def build_container(self): - resources_dict = { - "requests": self._resource_request.build() - if self._resource_request - else None, - "limits": self._resource_limit.build() if self._resource_limit else None, - } - lifecycle_dict = _remove_nones( - { - "preStop": { - "exec": {"command": self._pre_stop_command}, - } - if self._pre_stop_command - else None, - } - ) - return _remove_nones( - { - "command": self.build_container_command(), - "env": [env.build() for env in self._envs.values()] or None, - "image": self._image, - "name": self._name, - "resources": dict((k, v) for k, v in resources_dict.items() if v) - or None, - "ports": [p.build() for p in self._ports] or None, - "volumeMounts": [vol.build_mount() for vol in self._volumes] or None, - "livenessProbe": self._liveness_probe.build() - if self._liveness_probe - else None, - "readinessProbe": self._readiness_probe.build() - if self._readiness_probe - else None, - "lifecycle": lifecycle_dict or None, - } - ) - - def build_template_spec(self): - result = { - "containers": [self.build_container()], - "volumes": [vol.build() for vol in self._volumes], - } - return dict((k, v) for k, v in result.items() if v) - - def build(self): - return { - "kind": self._kind, - "metadata": { - "name": self._name, - }, - "spec": { - "replicas": int(self._replicas), - "template": { - "metadata": { - "labels": _remove_nones(self._labels) or None, - }, - "spec": self.build_template_spec(), - }, - }, - } - - -class MarsReplicationConfig(ReplicationConfig, abc.ABC): - """ - Base configuration builder for replication controllers for Mars - """ - - rc_name = None - default_readiness_port = 15031 - - def __init__( - self, - replicas, - cpu=None, - memory=None, - limit_resources=False, - memory_limit_ratio=None, - image=None, - modules=None, - volumes=None, - service_name=None, - service_port=None, - **kwargs, - ): - self._cpu = cpu - self._memory, ratio = ( - parse_readable_size(memory) if memory is not None else (None, False) - ) - assert not ratio - - if isinstance(modules, str): - self._modules = modules.split(",") - else: - self._modules = modules - - req_res = ResourceConfig(cpu, memory) if cpu or memory else None - limit_res = ( - ResourceConfig(req_res.cpu, req_res.memory * (memory_limit_ratio or 1)) - if req_res and memory - else None - ) - - self._service_name = service_name - self._service_port = service_port - - super().__init__( - self.rc_name, - image or DEFAULT_IMAGE, - replicas, - resource_request=req_res, - resource_limit=limit_res if limit_resources else None, - readiness_probe=self.config_readiness_probe(), - **kwargs, - ) - if service_port: - self.add_port(service_port) - - for vol in volumes or (): - self.add_volume(vol) - - self.add_labels({"mars/service-type": self.rc_name}) - - def add_default_envs(self): - self.add_env("MARS_K8S_POD_NAME", field_path="metadata.name") - self.add_env("MARS_K8S_POD_NAMESPACE", field_path="metadata.namespace") - self.add_env("MARS_K8S_POD_IP", field_path="status.podIP") - - if self._service_name: - self.add_env("MARS_K8S_SERVICE_NAME", str(self._service_name)) - if self._service_port: - self.add_env("MARS_K8S_SERVICE_PORT", str(self._service_port)) - - self.add_env("MARS_CONTAINER_IP", field_path="status.podIP") - - if self._cpu: - self.add_env("MKL_NUM_THREADS", str(self._cpu)) - self.add_env("MARS_CPU_TOTAL", str(self._cpu)) - if getattr(self, "stat_type", "cgroup") == "cgroup": - self.add_env("MARS_USE_CGROUP_STAT", "1") - - if self._memory: - self.add_env("MARS_MEMORY_TOTAL", str(int(self._memory))) - - if self._modules: - self.add_env("MARS_LOAD_MODULES", ",".join(self._modules)) - - def config_readiness_probe(self): - raise NotImplementedError - - @staticmethod - def get_local_app_module(mod_name): - return __name__.rsplit(".", 1)[0] + "." + mod_name - - def build(self): - result = super().build() - if self._kind in ("Deployment", "ReplicaSet"): - result["spec"]["selector"] = { - "matchLabels": {"mars/service-type": self.rc_name} - } - else: - result["spec"]["selector"] = {"mars/service-type": self.rc_name} - return result - - -class MarsSupervisorsConfig(MarsReplicationConfig): - """ - Configuration builder for Mars supervisor service - """ - - rc_name = "marssupervisor" - - def __init__(self, *args, **kwargs): - self._web_port = kwargs.pop("web_port", None) - self._readiness_port = kwargs.pop("readiness_port", self.default_readiness_port) - super().__init__(*args, **kwargs) - if self._web_port: - self.add_port(self._web_port) - - def config_readiness_probe(self): - return TcpSocketProbeConfig(self._readiness_port, timeout=60, failure_thresh=10) - - def build_container_command(self): - cmd = [ - "/srv/entrypoint.sh", - self.get_local_app_module("supervisor"), - ] - if self._service_port: - cmd += ["-p", str(self._service_port)] - if self._web_port: - cmd += ["-w", str(self._web_port)] - if self._cpu: - cmd += ["--n-process", str(int(math.ceil(self._cpu)))] - return cmd - - -class MarsWorkersConfig(MarsReplicationConfig): - """ - Configuration builder for Mars worker service - """ - - rc_name = "marsworker" - - def __init__(self, *args, **kwargs): - spill_volumes = kwargs.pop("spill_volumes", None) or () - mount_shm = kwargs.pop("mount_shm", True) - self._limit_resources = kwargs["limit_resources"] = kwargs.get( - "limit_resources", True - ) - worker_cache_mem = ( - kwargs.pop("worker_cache_mem", None) or DEFAULT_WORKER_CACHE_MEM - ) - min_cache_mem = kwargs.pop("min_cache_mem", None) - self._readiness_port = kwargs.pop("readiness_port", self.default_readiness_port) - supervisor_web_port = kwargs.pop("supervisor_web_port", None) - - super().__init__(*args, **kwargs) - - self._spill_volumes = [] - for idx, vol in enumerate(spill_volumes): - if isinstance(vol, str): - path = f"/mnt/hostpath{idx}" - self.add_volume(HostPathVolumeConfig(f"host-path-vol-{idx}", path, vol)) - self._spill_volumes.append(path) - else: - self.add_volume(vol) - self._spill_volumes.append(vol.mount_path) - if self._spill_volumes: - self.add_env("MARS_SPILL_DIRS", ":".join(self._spill_volumes)) - - if self._memory: - size_limit = calc_size_by_str(worker_cache_mem, self._memory) - self.add_env("MARS_CACHE_MEM_SIZE", worker_cache_mem) - else: - size_limit = None - - if mount_shm: - self.add_volume( - EmptyDirVolumeConfig("mars-shared", "/dev/shm", size_limit=size_limit) - ) - - if min_cache_mem: - self.add_env("MARS_MIN_CACHE_MEM_SIZE", min_cache_mem) - if supervisor_web_port: - self.add_env("MARS_K8S_SUPERVISOR_WEB_PORT", supervisor_web_port) - - def config_readiness_probe(self): - return TcpSocketProbeConfig(self._readiness_port, timeout=60, failure_thresh=10) - - def build_container_command(self): - cmd = [ - "/srv/entrypoint.sh", - self.get_local_app_module("worker"), - ] - if self._service_port: - cmd += ["-p", str(self._service_port)] - return cmd diff --git a/python/xorbits/_mars/deploy/kubernetes/config.yml b/python/xorbits/_mars/deploy/kubernetes/config.yml deleted file mode 100644 index 127ff0ec7..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/config.yml +++ /dev/null @@ -1,7 +0,0 @@ -"@inherits": ../oscar/base_config.yml -cluster: - backend: k8s -storage: - backends: [plasma] - plasma: - store_memory: 20% diff --git a/python/xorbits/_mars/deploy/kubernetes/core.py b/python/xorbits/_mars/deploy/kubernetes/core.py deleted file mode 100644 index 617696d19..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/core.py +++ /dev/null @@ -1,223 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import logging -import os -from typing import AsyncGenerator, Dict, List, Optional, TypeVar - -from ...services.cluster import WebClusterAPI -from ...services.cluster.backends import ( - AbstractClusterBackend, - register_cluster_backend, -) -from ...services.cluster.core import NodeRole -from ..utils import next_in_thread, wait_all_supervisors_ready -from .config import MarsReplicationConfig - -logger = logging.getLogger(__name__) -RetType = TypeVar("RetType") - - -@register_cluster_backend -class K8SClusterBackend(AbstractClusterBackend): - name = "k8s" - - def __init__( - self, node_role=None, pool_address=None, k8s_config=None, k8s_namespace=None - ): - from kubernetes import client - - self._node_role = node_role - self._pool_address = pool_address - self._k8s_config = k8s_config - - verify_ssl = bool(int(os.environ.get("KUBE_VERIFY_SSL", "1"))) - if not verify_ssl: - c = client.Configuration() - c.verify_ssl = False - client.Configuration.set_default(c) - - self._k8s_namespace = ( - k8s_namespace or os.environ.get("MARS_K8S_POD_NAMESPACE") or "default" - ) - self._service_name = os.environ.get("MARS_K8S_SERVICE_NAME") - self._full_label_selector = None - self._client = client.CoreV1Api(client.ApiClient(self._k8s_config)) - - self._pod_to_ep = dict() - - @classmethod - async def create( - cls, node_role: NodeRole, lookup_address: Optional[str], pool_address: str - ) -> "AbstractClusterBackend": - from kubernetes import client, config - - if lookup_address is None: - k8s_namespace = None - k8s_config = config.load_incluster_config() - else: - address_parts = lookup_address.rsplit("?", 1) - k8s_namespace = None if len(address_parts) == 1 else address_parts[1] - - k8s_config = client.Configuration() - if "://" in address_parts[0]: - k8s_config.host = address_parts[0] - else: - config.load_kube_config( - address_parts[0], client_configuration=k8s_config - ) - return cls(node_role, pool_address, k8s_config, k8s_namespace) - - def __reduce__(self): - return ( - type(self), - ( - self._node_role, - self._pool_address, - self._k8s_config, - self._k8s_namespace, - ), - ) - - @staticmethod - def _format_endpoint_query_result(result: Dict, filter_ready: bool = True): - port = os.environ["MARS_K8S_SERVICE_PORT"] - endpoints = [ - f"{addr['ip']}:{port}" for addr in result["subsets"][0]["addresses"] or [] - ] - if not filter_ready: - endpoints = [ - f"{addr['ip']}:{port}" - for addr in result["subsets"][0]["not_ready_addresses"] or [] - ] - return endpoints - - def _get_web_cluster_api(self): - supervisor_web_port = os.environ["MARS_K8S_SUPERVISOR_WEB_PORT"] - web_url = ( - f"http://{self._service_name}.{self._k8s_namespace}:{supervisor_web_port}" - ) - api = WebClusterAPI(web_url) - return api - - async def _watch_supervisors_by_service_api( - self, - ) -> AsyncGenerator[List[str], None]: - from kubernetes.watch import Watch as K8SWatch - from urllib3.exceptions import ReadTimeoutError - - w = K8SWatch() - - while True: - streamer = w.stream( - self._client.list_namespaced_endpoints, - namespace=self._k8s_namespace, - label_selector=f"mars/service-name={self._service_name}", - timeout_seconds=60, - ) - while True: - try: - event = await next_in_thread(streamer) - obj_dict = event["object"].to_dict() - yield self._format_endpoint_query_result(obj_dict) - except (ReadTimeoutError, StopAsyncIteration): - break - except: # noqa: E722 # pragma: no cover # pylint: disable=bare-except - logger.exception("Unexpected error when watching on kubernetes") - break - - async def _watch_supervisors_by_cluster_web_api(self): - while True: - try: - api = self._get_web_cluster_api() - async for supervisors in api.watch_supervisors(): - yield supervisors - except (OSError, asyncio.TimeoutError): - pass - - async def _get_supervisors_by_service_api( - self, filter_ready: bool = True - ) -> List[str]: - result = ( - await asyncio.to_thread( - self._client.read_namespaced_endpoints, - name=self._service_name, - namespace=self._k8s_namespace, - ) - ).to_dict() - return self._format_endpoint_query_result(result, filter_ready=filter_ready) - - async def _get_supervisors_by_cluster_web_api(self, filter_ready: bool = True): - api = self._get_web_cluster_api() - try: - supervisors = await api.get_supervisors(filter_ready=filter_ready) - return supervisors - except (OSError, asyncio.TimeoutError): # pragma: no cover - return [] - - async def get_supervisors(self, filter_ready: bool = True) -> List[str]: - if self._node_role == NodeRole.SUPERVISOR: - return await self._get_supervisors_by_service_api(filter_ready) - else: - return await self._get_supervisors_by_cluster_web_api(filter_ready) - - async def watch_supervisors(self) -> AsyncGenerator[List[str], None]: - if self._node_role == NodeRole.SUPERVISOR: - watch_fun = self._watch_supervisors_by_service_api - else: - watch_fun = self._watch_supervisors_by_cluster_web_api - - try: - async for supervisors in watch_fun(): - yield supervisors - except asyncio.CancelledError: - pass - - async def request_worker( - self, worker_cpu: int = None, worker_mem: int = None, timeout: int = None - ) -> str: - raise NotImplementedError - - async def release_worker(self, address: str): - raise NotImplementedError - - async def reconstruct_worker(self, address: str): - raise NotImplementedError - - -class K8SServiceMixin: - @staticmethod - def write_pid_file(): - with open("/tmp/mars-service.pid", "w") as pid_file: - pid_file.write(str(os.getpid())) - - async def wait_all_supervisors_ready(self): - """ - Wait till all containers are ready - """ - await wait_all_supervisors_ready(self.args.endpoint) - - async def start_readiness_server(self): - readiness_port = os.environ.get( - "MARS_K8S_READINESS_PORT", MarsReplicationConfig.default_readiness_port - ) - self._readiness_server = await asyncio.start_server( - lambda r, w: None, port=readiness_port - ) - - async def stop_readiness_server(self): - self._readiness_server.close() - await self._readiness_server.wait_closed() diff --git a/python/xorbits/_mars/deploy/kubernetes/docker/Dockerfile b/python/xorbits/_mars/deploy/kubernetes/docker/Dockerfile deleted file mode 100644 index 2675538ab..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/docker/Dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -ARG BASE_CONTAINER=marsproject/mars-base -FROM ${BASE_CONTAINER} - -COPY . /opt/mars/ - -RUN apt-get -yq update --allow-releaseinfo-change \ - && apt-get -yq install gcc g++ \ - && curl -fsSL https://deb.nodesource.com/setup_14.x | sudo -E bash - \ - && sudo apt-get install -y nodejs \ - && /opt/conda/bin/pip install -e /opt/mars \ - && apt-get -yq remove gcc g++ nodejs \ - && apt-get -yq autoremove \ - && apt-get -yq clean \ - && rm -rf /var/lib/apt/lists/* \ - && rm -rf /usr/local/lib/node_modules -RUN mkdir -p /srv -WORKDIR /srv - -RUN cp /opt/mars/mars/deploy/oscar/file-logging.conf /srv/logging.conf \ - && cp /opt/mars/mars/deploy/kubernetes/docker/entrypoint.sh /srv/entrypoint.sh \ - && chmod a+x /srv/*.sh - -ENTRYPOINT [ "/srv/entrypoint.sh" ] diff --git a/python/xorbits/_mars/deploy/kubernetes/docker/Dockerfile.base b/python/xorbits/_mars/deploy/kubernetes/docker/Dockerfile.base deleted file mode 100644 index 2e43d968d..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/docker/Dockerfile.base +++ /dev/null @@ -1,32 +0,0 @@ -ARG BASE_CONTAINER=continuumio/miniconda3:4.9.2 -FROM ${BASE_CONTAINER} - -COPY retry.sh /srv/retry.sh - -RUN /srv/retry.sh 3 /opt/conda/bin/conda install \ - cloudpickle \ - cython \ - greenlet \ - mkl \ - numba \ - numexpr \ - numpy\>=1.14.0 \ - pandas\>=1.0.0 \ - psutil \ - scikit-learn \ - scipy \ - sqlalchemy \ - tornado \ - lz4 \ - && /srv/retry.sh 3 /opt/conda/bin/conda install -c conda-forge \ - libiconv \ - pyarrow\>=1.0 \ - tiledb-py \ - python-kubernetes \ - uvloop \ - && /opt/conda/bin/conda clean --all -f -y - -RUN apt-get -yq update --allow-releaseinfo-change \ - && apt-get -yq install curl sudo procps \ - && apt-get -yq clean \ - && rm -rf /var/lib/apt/lists/* \ diff --git a/python/xorbits/_mars/deploy/kubernetes/docker/entrypoint.sh b/python/xorbits/_mars/deploy/kubernetes/docker/entrypoint.sh deleted file mode 100755 index 97eb6b0cb..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/docker/entrypoint.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/bash -set -e - -if [[ "$1" == *"/"* ]]; then - $@ -else - /opt/conda/bin/python -m "$1" ${@:2} -fi diff --git a/python/xorbits/_mars/deploy/kubernetes/docker/retry.sh b/python/xorbits/_mars/deploy/kubernetes/docker/retry.sh deleted file mode 100755 index ca87a335e..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/docker/retry.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash -RETRIES=$1 -shift -for (( RETRY=1; RETRY <= $RETRIES ; RETRY++ )); do - "$@" - EXIT=$? - if [[ $EXIT != 0 ]]; then - echo "Command attempt $RETRY failed" - else - exit 0 - fi -done -exit $EXIT diff --git a/python/xorbits/_mars/deploy/kubernetes/supervisor.py b/python/xorbits/_mars/deploy/kubernetes/supervisor.py deleted file mode 100644 index 0fc6305a0..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/supervisor.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ..oscar.supervisor import SupervisorCommandRunner -from .core import K8SServiceMixin - - -class K8SSupervisorCommandRunner(K8SServiceMixin, SupervisorCommandRunner): - async def start_services(self): - await super().start_services() - await self.start_readiness_server() - - async def stop_services(self): - await self.stop_readiness_server() - await super().stop_services() - - -main = K8SSupervisorCommandRunner() - -if __name__ == "__main__": # pragma: no branch - main() diff --git a/python/xorbits/_mars/deploy/kubernetes/tests/Dockerfile.test b/python/xorbits/_mars/deploy/kubernetes/tests/Dockerfile.test deleted file mode 100644 index 2634fd3a7..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/tests/Dockerfile.test +++ /dev/null @@ -1,18 +0,0 @@ -ARG BASE_CONTAINER=marsproject/mars-base -FROM ${BASE_CONTAINER} - -RUN /srv/retry.sh 3 /opt/conda/bin/conda install -c pkgs/main \ - coverage\>=5.0 cloudpickle \ - && conda clean --all -f -y - -RUN apt-get -yq update --allow-releaseinfo-change -RUN apt-get -yq install git gcc g++ - -COPY docker-logging.conf /srv/logging.conf -COPY build_ext.sh /srv/build_ext.sh -COPY entrypoint.sh /srv/entrypoint.sh -COPY graceful_stop.sh /srv/graceful_stop.sh - -RUN echo "import coverage; coverage.process_startup()" > \ - $(/opt/conda/bin/python -c "import site; print(site.getsitepackages()[-1])")/coverage.pth -RUN chmod a+x /srv/*.sh diff --git a/python/xorbits/_mars/deploy/kubernetes/tests/__init__.py b/python/xorbits/_mars/deploy/kubernetes/tests/__init__.py deleted file mode 100644 index 9b7110cd3..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/tests/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/python/xorbits/_mars/deploy/kubernetes/tests/build_ext.sh b/python/xorbits/_mars/deploy/kubernetes/tests/build_ext.sh deleted file mode 100644 index 073b2b3ad..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/tests/build_ext.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -set -e -cd /mnt/mars -/opt/conda/bin/python setup.py build_ext -i diff --git a/python/xorbits/_mars/deploy/kubernetes/tests/docker-logging.conf b/python/xorbits/_mars/deploy/kubernetes/tests/docker-logging.conf deleted file mode 100644 index 320ca6cb3..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/tests/docker-logging.conf +++ /dev/null @@ -1,50 +0,0 @@ -[loggers] -keys=root,main,deploy,services,oscar,tornado - -[handlers] -keys=stream_handler - -[formatters] -keys=formatter - -[logger_root] -level=WARN -handlers=stream_handler - -[logger_main] -level=DEBUG -handlers=stream_handler -qualname=__main__ -propagate=0 - -[logger_deploy] -level=DEBUG -handlers=stream_handler -qualname=mars.deploy -propagate=0 - -[logger_oscar] -level=DEBUG -handlers=stream_handler -qualname=mars.oscar -propagate=0 - -[logger_services] -level=DEBUG -handlers=stream_handler -qualname=mars.services -propagate=0 - -[logger_tornado] -level=WARN -handlers=stream_handler -qualname=tornado -propagate=0 - -[handler_stream_handler] -class=StreamHandler -formatter=formatter -args=(sys.stderr,) - -[formatter_formatter] -format=%(asctime)s %(name)-12s %(process)d %(levelname)-8s %(message)s diff --git a/python/xorbits/_mars/deploy/kubernetes/tests/entrypoint.sh b/python/xorbits/_mars/deploy/kubernetes/tests/entrypoint.sh deleted file mode 100755 index 918d87bfd..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/tests/entrypoint.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -set -e -cd /mnt/mars -/opt/conda/bin/pip install -e ".[dev,extra]" - -mkdir -p .dist-coverage -export COVERAGE_FILE=.dist-coverage/.coverage - -COV_RUNNER="/opt/conda/bin/coverage run" - -if [[ $1 == *"supervisor"* ]]; then - $COV_RUNNER -m "$1" --log-conf /srv/logging.conf ${@:2} -elif [[ $1 == *"worker"* ]]; then - $COV_RUNNER -m "$1" --log-conf /srv/logging.conf ${@:2} -else - $COV_RUNNER -m "$1" --log-conf /srv/logging.conf ${@:2} -fi -while [[ -f /tmp/stopping.tmp ]]; do - sleep 1 -done diff --git a/python/xorbits/_mars/deploy/kubernetes/tests/graceful_stop.sh b/python/xorbits/_mars/deploy/kubernetes/tests/graceful_stop.sh deleted file mode 100644 index 5153fc3c2..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/tests/graceful_stop.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash -set -e -touch /tmp/stopping.tmp -if [[ -f /tmp/mars-service.pid ]]; then - SERVICE_PID="$(cat /tmp/mars-service.pid)" - kill -INT "$SERVICE_PID" || true - CNT=0 - while kill -0 "$SERVICE_PID"; do - sleep 0.5 - CNT=$((CNT+1)) - if [[ $CNT -gt 10 ]]; then - break - fi - done - kill -INT "$SERVICE_PID" || true -fi diff --git a/python/xorbits/_mars/deploy/kubernetes/tests/test_config.py b/python/xorbits/_mars/deploy/kubernetes/tests/test_config.py deleted file mode 100644 index 6fc203271..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/tests/test_config.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ..config import ( - EmptyDirVolumeConfig, - MarsSupervisorsConfig, - MarsWorkersConfig, - NamespaceConfig, - RoleBindingConfig, - RoleConfig, - ServiceConfig, -) - - -def test_simple_objects(): - ns_config_dict = NamespaceConfig("ns_name").build() - assert ns_config_dict["metadata"]["name"] == "ns_name" - - role_config_dict = RoleConfig( - "mars-pod-reader", "ns_name", "", "pods", "get,watch,list" - ).build() - assert role_config_dict["metadata"]["name"] == "mars-pod-reader" - assert "get" in role_config_dict["rules"][0]["verbs"] - - role_binding_config_dict = RoleBindingConfig( - "mars-pod-reader-binding", "ns_name", "mars-pod-reader", "default" - ).build() - assert role_binding_config_dict["metadata"]["name"] == "mars-pod-reader-binding" - - service_config_dict = ServiceConfig( - "mars-test-service", "NodePort", "mars/service-type=marssupervisor", 7103, 7103 - ).build() - assert service_config_dict["metadata"]["name"] == "mars-test-service" - - -def test_supervisor_object(): - supervisor_config = MarsSupervisorsConfig( - 1, cpu=2, memory="10g", limit_resources=False, modules=["mars.test_mod"] - ) - supervisor_config.add_simple_envs(dict(TEST_ENV="test_val")) - - supervisor_config_dict = supervisor_config.build() - assert supervisor_config_dict["metadata"]["name"] == "marssupervisor" - assert supervisor_config_dict["spec"]["replicas"] == 1 - - container_dict = supervisor_config_dict["spec"]["template"]["spec"]["containers"][0] - assert int(container_dict["resources"]["requests"]["memory"]) == 10 * 1024**3 - - container_envs = dict((p["name"], p) for p in container_dict["env"]) - assert container_envs["TEST_ENV"]["value"] == "test_val" - assert container_envs["MKL_NUM_THREADS"]["value"] == "2" - assert container_envs["MARS_CPU_TOTAL"]["value"] == "2" - assert int(container_envs["MARS_MEMORY_TOTAL"]["value"]) == 10 * 1024**3 - assert container_envs["MARS_LOAD_MODULES"]["value"] == "mars.test_mod" - - -def test_worker_object(): - worker_config_dict = MarsWorkersConfig( - 4, - cpu=2, - memory=10 * 1024**3, - limit_resources=True, - memory_limit_ratio=2, - spill_volumes=[ - "/tmp/spill_vol", - EmptyDirVolumeConfig("empty-dir", "/tmp/empty"), - ], - worker_cache_mem="20%", - min_cache_mem="10%", - modules="mars.test_mod", - mount_shm=True, - ).build() - assert worker_config_dict["metadata"]["name"] == "marsworker" - assert worker_config_dict["spec"]["replicas"] == 4 - - container_dict = worker_config_dict["spec"]["template"]["spec"]["containers"][0] - assert int(container_dict["resources"]["requests"]["memory"]) == 10 * 1024**3 - assert int(container_dict["resources"]["limits"]["memory"]) == 20 * 1024**3 - - container_envs = dict((p["name"], p) for p in container_dict["env"]) - assert container_envs["MKL_NUM_THREADS"]["value"] == "2" - assert container_envs["MARS_CPU_TOTAL"]["value"] == "2" - assert int(container_envs["MARS_MEMORY_TOTAL"]["value"]) == 10 * 1024**3 - assert container_envs["MARS_LOAD_MODULES"]["value"] == "mars.test_mod" - assert set(container_envs["MARS_SPILL_DIRS"]["value"].split(":")) == { - "/tmp/empty", - "/mnt/hostpath0", - } - assert container_envs["MARS_CACHE_MEM_SIZE"]["value"] == "20%" - - volume_list = worker_config_dict["spec"]["template"]["spec"]["volumes"] - volume_envs = dict((v["name"], v) for v in volume_list) - assert "empty-dir" in volume_envs - assert volume_envs["host-path-vol-0"]["hostPath"]["path"] == "/tmp/spill_vol" - - volume_mounts = dict((v["name"], v) for v in container_dict["volumeMounts"]) - assert volume_mounts["empty-dir"]["mountPath"] == "/tmp/empty" - assert volume_mounts["host-path-vol-0"]["mountPath"] == "/mnt/hostpath0" - - worker_config_dict = MarsWorkersConfig( - 4, - cpu=2, - memory=10 * 1024**3, - limit_resources=False, - spill_volumes=[ - "/tmp/spill_vol", - EmptyDirVolumeConfig("empty-dir", "/tmp/empty"), - ], - modules="mars.test_mod", - mount_shm=False, - ).build() - - volume_list = worker_config_dict["spec"]["template"]["spec"]["volumes"] - assert "shm-volume" not in volume_list - - container_dict = worker_config_dict["spec"]["template"]["spec"]["containers"][0] - volume_mounts = dict((v["name"], v) for v in container_dict["volumeMounts"]) - assert "shm-volume" not in volume_mounts diff --git a/python/xorbits/_mars/deploy/kubernetes/tests/test_kubernetes.py b/python/xorbits/_mars/deploy/kubernetes/tests/test_kubernetes.py deleted file mode 100644 index e6941fbd5..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/tests/test_kubernetes.py +++ /dev/null @@ -1,285 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import glob -import os -import shutil -import subprocess -import tempfile -import uuid -from contextlib import contextmanager -from distutils.spawn import find_executable - -import numpy as np -import pytest - -from .... import tensor as mt -from ....tests.core import mock -from .. import new_cluster -from ..config import HostPathVolumeConfig - -try: - from kubernetes import client as k8s_client - from kubernetes import config as k8s_config -except ImportError: - k8s_client = k8s_config = None - -MARS_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(mt.__file__))) -TEST_ROOT = os.path.dirname(os.path.abspath(__file__)) -DOCKER_ROOT = os.path.join(os.path.dirname(TEST_ROOT), "docker") - -kube_available = ( - find_executable("kubectl") is not None - and find_executable("docker") is not None - and k8s_config is not None -) - - -def _collect_coverage(): - dist_coverage_path = os.path.join(MARS_ROOT, ".dist-coverage") - if os.path.exists(dist_coverage_path): - # change ownership of coverage files - if find_executable("sudo"): - proc = subprocess.Popen( - [ - "sudo", - "-n", - "chown", - "-R", - f"{os.geteuid()}:{os.getegid()}", - dist_coverage_path, - ], - shell=False, - ) - proc.wait() - - # rewrite paths in coverage result files - for fn in glob.glob(os.path.join(dist_coverage_path, ".coverage.*")): - if "COVERAGE_FILE" in os.environ: - new_cov_file = os.environ["COVERAGE_FILE"] + os.path.basename( - fn - ).replace(".coverage", "") - else: - new_cov_file = fn.replace(".dist-coverage" + os.sep, "") - shutil.copyfile(fn, new_cov_file) - shutil.rmtree(dist_coverage_path) - - -def _build_docker_images(use_test_docker_file=True): - image_name = "mars-test-image:" + uuid.uuid1().hex - try: - if use_test_docker_file: - proc = subprocess.Popen( - ["docker", "build", "-f", "Dockerfile.test", "-t", image_name, "."], - cwd=TEST_ROOT, - ) - else: - proc = subprocess.Popen( - [ - "docker", - "build", - "-f", - os.path.join(DOCKER_ROOT, "Dockerfile"), - "-t", - image_name, - ".", - ], - cwd=MARS_ROOT, - ) - if proc.wait() != 0: - raise SystemError("Executing docker build failed.") - - if use_test_docker_file: - proc = subprocess.Popen( - [ - "docker", - "run", - "-v", - MARS_ROOT + ":/mnt/mars", - image_name, - "/srv/build_ext.sh", - ] - ) - if proc.wait() != 0: - raise SystemError("Executing docker run failed.") - except: # noqa: E722 - _remove_docker_image(image_name) - raise - return image_name - - -def _remove_docker_image(image_name, raises=True): - if "CI" not in os.environ: - # delete image iff in CI environment - return - proc = subprocess.Popen(["docker", "rmi", "-f", image_name]) - if proc.wait() != 0 and raises: - raise SystemError("Executing docker rmi failed.") - - -def _load_docker_env(): - if os.path.exists("/var/run/docker.sock") or not shutil.which("minikube"): - return - - proc = subprocess.Popen(["minikube", "docker-env"], stdout=subprocess.PIPE) - proc.wait(30) - for line in proc.stdout: - line = line.decode().split("#", 1)[0] - line = line.strip() # type: str | bytes - export_pos = line.find("export") - if export_pos < 0: - continue - line = line[export_pos + 6 :].strip() - var, value = line.split("=", 1) - os.environ[var] = value.strip('"') - - -@contextmanager -def _start_kube_cluster(use_test_docker_file=True, **kwargs): - _load_docker_env() - image_name = _build_docker_images(use_test_docker_file=use_test_docker_file) - - temp_spill_dir = tempfile.mkdtemp(prefix="test-mars-k8s-") - api_client = k8s_config.new_client_from_config() - kube_api = k8s_client.CoreV1Api(api_client) - - cluster_client = None - try: - if use_test_docker_file: - extra_volumes = [ - HostPathVolumeConfig("mars-src-path", "/mnt/mars", MARS_ROOT) - ] - pre_stop_command = ["rm", "/tmp/stopping.tmp"] - else: - extra_volumes = [] - pre_stop_command = None - - cluster_client = new_cluster( - api_client, - image=image_name, - worker_spill_paths=[temp_spill_dir], - extra_volumes=extra_volumes, - pre_stop_command=pre_stop_command, - timeout=600, - log_when_fail=True, - **kwargs, - ) - - assert cluster_client.endpoint is not None - - pod_items = kube_api.list_namespaced_pod(cluster_client.namespace).to_dict() - - log_processes = [] - for item in pod_items["items"]: - log_processes.append( - subprocess.Popen( - [ - "kubectl", - "logs", - "-f", - "-n", - cluster_client.namespace, - item["metadata"]["name"], - ] - ) - ) - - yield - - if use_test_docker_file: - # turn off service processes with grace to get coverage data - procs = [] - pod_items = kube_api.list_namespaced_pod(cluster_client.namespace).to_dict() - for item in pod_items["items"]: - p = subprocess.Popen( - [ - "kubectl", - "exec", - "-n", - cluster_client.namespace, - item["metadata"]["name"], - "--", - "/srv/graceful_stop.sh", - ] - ) - procs.append(p) - for p in procs: - p.wait() - - [p.terminate() for p in log_processes] - finally: - shutil.rmtree(temp_spill_dir) - if cluster_client: - try: - cluster_client.stop(wait=True, timeout=20) - except TimeoutError: - pass - _collect_coverage() - _remove_docker_image(image_name, False) - - -@pytest.mark.parametrize("use_test_docker_file", [True, False]) -@pytest.mark.skipif(not kube_available, reason="Cannot run without kubernetes") -def test_run_in_kubernetes(use_test_docker_file): - with _start_kube_cluster( - supervisor_cpu=0.5, - supervisor_mem="1G", - worker_cpu=0.5, - worker_mem="1G", - worker_cache_mem="64m", - extra_labels={"mars-test/group": "test-label-name"}, - extra_env={"MARS_K8S_GROUP_LABELS": "mars-test/group"}, - use_test_docker_file=use_test_docker_file, - ): - a = mt.ones((100, 100), chunk_size=30) * 2 * 1 + 1 - b = mt.ones((100, 100), chunk_size=20) * 2 * 1 + 1 - c = (a * b * 2 + 1).sum() - r = c.execute().fetch() - - expected = (np.ones(a.shape) * 2 * 1 + 1) ** 2 * 2 + 1 - np.testing.assert_array_equal(r, expected.sum()) - - -@pytest.mark.skipif(not kube_available, reason="Cannot run without kubernetes") -@mock.patch( - "kubernetes.client.CoreV1Api.create_namespaced_replication_controller", - new=lambda *_, **__: None, -) -@mock.patch( - "kubernetes.client.AppsV1Api.create_namespaced_deployment", - new=lambda *_, **__: None, -) -def test_create_timeout(): - _load_docker_env() - api_client = k8s_config.new_client_from_config() - - cluster = None - try: - extra_vol_config = HostPathVolumeConfig("mars-src-path", "/mnt/mars", MARS_ROOT) - with pytest.raises(TimeoutError): - cluster = new_cluster( - api_client, - image="pseudo_image", - supervisor_cpu=0.5, - supervisor_mem="1G", - worker_cpu=0.5, - worker_mem="1G", - extra_volumes=[extra_vol_config], - timeout=1, - ) - finally: - if cluster: - cluster.stop(wait=True) - _collect_coverage() diff --git a/python/xorbits/_mars/deploy/kubernetes/worker.py b/python/xorbits/_mars/deploy/kubernetes/worker.py deleted file mode 100644 index c47af9568..000000000 --- a/python/xorbits/_mars/deploy/kubernetes/worker.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2022-2023 XProbe Inc. -# derived from copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from ..oscar.worker import WorkerCommandRunner -from .core import K8SServiceMixin - -logger = logging.getLogger(__name__) - - -class K8SWorkerCommandRunner(K8SServiceMixin, WorkerCommandRunner): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - async def start_services(self): - from ...services.cluster import ClusterAPI - from ..oscar.worker import start_worker - - self.write_pid_file() - await start_worker( - self.pool.external_address, - self.args.supervisors, - self.band_to_resource, - list(self.args.load_modules), - self.config, - mark_ready=False, - ) - await self.wait_all_supervisors_ready() - - cluster_api = await ClusterAPI.create(self.args.endpoint) - await cluster_api.mark_node_ready() - - await self.start_readiness_server() - - async def stop_services(self): - await self.stop_readiness_server() - await super().stop_services() - - -main = K8SWorkerCommandRunner() - -if __name__ == "__main__": # pragma: no branch - main()