From 635ffbd98a3ea9703d1eec74ecbfc4afbfd31321 Mon Sep 17 00:00:00 2001 From: Heiru Wu Date: Wed, 27 Mar 2024 05:33:13 +0800 Subject: [PATCH] feat(ray): use env for resource and deprecate deploy/undeploy --- instill/helpers/Dockerfile | 2 +- instill/helpers/const.py | 20 ++-- instill/helpers/ray_config.py | 167 +++++++++++++--------------------- 3 files changed, 74 insertions(+), 115 deletions(-) diff --git a/instill/helpers/Dockerfile b/instill/helpers/Dockerfile index 9d487d9..5095285 100644 --- a/instill/helpers/Dockerfile +++ b/instill/helpers/Dockerfile @@ -11,5 +11,5 @@ RUN for package in ${PACKAGES}; do \ pip install --default-timeout=1000 --no-cache-dir $package; \ done; -WORKDIR /home/ray +WORKDIR /home/ray/model COPY . . diff --git a/instill/helpers/const.py b/instill/helpers/const.py index 7b71469..416ee7f 100644 --- a/instill/helpers/const.py +++ b/instill/helpers/const.py @@ -81,7 +81,7 @@ class VisualQuestionAnsweringInput: extra_params: Dict[str, str] = {} -DEFAULT_RAY_ACTOR_OPRTIONS = { +DEFAULT_RAY_ACTOR_OPTIONS = { "num_cpus": 2, } DEFAULT_AUTOSCALING_CONFIG = { @@ -109,14 +109,12 @@ class VisualQuestionAnsweringInput: VRAM_MINIMUM_RESERVE = 2 # GB VRAM_UPSCALE_FACTOR = 1.25 -MODEL_VRAM_OVERRIDE_LIST = { - "stable-diffusion-xl": 0.375, - "controlnet-canny": 0.375, - "llava-1-6-7b": 0.2, - "llava-1-6-13b": 0.7, - "llama2-7b-chat": 0.3, - "llama2-7b": 0.4, - "zephyr-7b": 0.4, -} - DEFAULT_DEPENDENCIES = ["protobuf==4.25.3", "grpcio-tools==1.62.0"] + +ENV_MEMORY = "RAY_MEMORY" +ENV_TOTAL_VRAM = "RAY_TOTAL_VRAM" +ENV_RAY_ACCELERATOR_TYPE = "RAY_ACCELERATOR_TYPE" +ENV_NUM_OF_GPUS = "RAY_NUM_OF_GPUS" +ENV_NUM_OF_CPUS = "RAY_NUM_OF_CPUS" +ENV_NUM_OF_MIN_REPLICAS = "RAY_NUM_OF_MIN_REPLICAS" +ENV_NUM_OF_MAX_REPLICAS = "RAY_NUM_OF_MAX_REPLICAS" diff --git a/instill/helpers/ray_config.py b/instill/helpers/ray_config.py index 1abea32..c00223f 100644 --- a/instill/helpers/ray_config.py +++ b/instill/helpers/ray_config.py @@ -1,19 +1,21 @@ -# pylint: disable=unused-argument import os from typing import Callable, Optional from warnings import warn -import ray -from ray import serve from ray.serve import Deployment from ray.serve import deployment as ray_deployment from instill.helpers.const import ( DEFAULT_AUTOSCALING_CONFIG, DEFAULT_MAX_CONCURRENT_QUERIES, - DEFAULT_RAY_ACTOR_OPRTIONS, - DEFAULT_RUNTIME_ENV, - MODEL_VRAM_OVERRIDE_LIST, + DEFAULT_RAY_ACTOR_OPTIONS, + ENV_MEMORY, + ENV_NUM_OF_CPUS, + ENV_NUM_OF_GPUS, + ENV_NUM_OF_MAX_REPLICAS, + ENV_NUM_OF_MIN_REPLICAS, + ENV_RAY_ACCELERATOR_TYPE, + ENV_TOTAL_VRAM, RAM_MINIMUM_RESERVE, RAM_UPSCALE_FACTOR, VRAM_MINIMUM_RESERVE, @@ -24,58 +26,36 @@ class InstillDeployable: - def __init__( - self, - deployable: Deployment, - model_weight_or_folder_name: str, # kept for backward compatibility - use_gpu: bool, - ) -> None: + def __init__(self, deployable: Deployment) -> None: self._deployment: Deployment = deployable - self.use_gpu = use_gpu - # params - if use_gpu: - self.update_num_cpus(0.25) - self.update_num_gpus(0.2) - else: - self.update_num_cpus(0.25) - - accelerator_type = os.getenv("RAY_ACCELERATOR_TYPE") + + accelerator_type = os.getenv(ENV_RAY_ACCELERATOR_TYPE) if accelerator_type is not None and accelerator_type != "": self.update_accelerator_type(accelerator_type) - def update_num_cpus(self, num_cpus: float): - if self._deployment.ray_actor_options is not None: - self._deployment.ray_actor_options.update({"num_cpus": num_cpus}) + num_of_gpus = os.getenv(ENV_NUM_OF_GPUS) + if num_of_gpus is not None and num_of_gpus != "": + self.update_num_gpus(float(num_of_gpus)) - return self + num_of_cpus = os.getenv(ENV_NUM_OF_CPUS) + if num_of_cpus is not None and num_of_cpus != "": + self.update_num_cpus(float(num_of_cpus)) - def update_memory(self, memory: float): - if self._deployment.ray_actor_options is not None: - self._deployment.ray_actor_options.update({"memory": memory}) + memory = os.getenv(ENV_MEMORY) + if memory is not None and memory != "": + self.update_memory(float(memory)) - return self + num_of_min_replicas = os.getenv(ENV_NUM_OF_MIN_REPLICAS) + if num_of_min_replicas is not None and num_of_min_replicas != "": + self.update_min_replicas(int(num_of_min_replicas)) - def update_num_gpus(self, num_gpus: float): - if self._deployment.ray_actor_options is not None: - self._deployment.ray_actor_options.update({"num_gpus": num_gpus}) + num_of_max_replicas = os.getenv(ENV_NUM_OF_MAX_REPLICAS) + if num_of_max_replicas is not None and num_of_max_replicas != "": + self.update_max_replicas(int(num_of_max_replicas)) - return self - - def update_accelerator_type(self, accelerator_type: str): - if self._deployment.ray_actor_options is not None: - self._deployment.ray_actor_options.update( - {"accelerator_type": accelerator_type} - ) - - return self - - def update_num_custom_resource(self, resource_name: str, num: float): - if self._deployment.ray_actor_options is not None: - self._deployment.ray_actor_options.update( - {"resources": {resource_name: num}} - ) - - return self + vram = os.getenv(ENV_TOTAL_VRAM) + if vram is not None and vram != "": + self.update_num_gpus(self._determine_vram_usage(os.getcwd(), vram)) def _determine_vram_usage(self, model_path: str, total_vram: str): warn( @@ -123,6 +103,40 @@ def _determine_ram_usage(self, model_path: str): ) raise ModelPathException + def update_num_cpus(self, num_cpus: float): + if self._deployment.ray_actor_options is not None: + self._deployment.ray_actor_options.update({"num_cpus": num_cpus}) + + return self + + def update_memory(self, memory: float): + if self._deployment.ray_actor_options is not None: + self._deployment.ray_actor_options.update({"memory": memory}) + + return self + + def update_num_gpus(self, num_gpus: float): + if self._deployment.ray_actor_options is not None: + self._deployment.ray_actor_options.update({"num_gpus": num_gpus}) + + return self + + def update_accelerator_type(self, accelerator_type: str): + if self._deployment.ray_actor_options is not None: + self._deployment.ray_actor_options.update( + {"accelerator_type": accelerator_type} + ) + + return self + + def update_num_custom_resource(self, resource_name: str, num: float): + if self._deployment.ray_actor_options is not None: + self._deployment.ray_actor_options.update( + {"resources": {resource_name: num}} + ) + + return self + def update_min_replicas(self, num_replicas: int): new_autoscaling_config = DEFAULT_AUTOSCALING_CONFIG new_autoscaling_config["min_replicas"] = num_replicas @@ -144,66 +158,13 @@ def update_max_replicas(self, num_replicas: int): def get_deployment_handle(self): return self._deployment.bind() - def deploy(self, model_folder_path: str, ray_addr: str, total_vram: str): - warn( - "Deploy/Undeploy will soon be remove from the scope of SDK", - PendingDeprecationWarning, - ) - if not ray.is_initialized(): - ray_addr = "ray://" + ray_addr.replace("9000", "10001") - ray.init(address=ray_addr, runtime_env=DEFAULT_RUNTIME_ENV) - - # /model-repository/{owner_type}/{owner_uid}/{model_id} - model_path_string_parts = model_folder_path.split("/") - application_name = "_".join(model_path_string_parts[3:]) - model_name = application_name.split("_")[1] - - if self.use_gpu: - if model_name in MODEL_VRAM_OVERRIDE_LIST: - self.update_num_gpus(MODEL_VRAM_OVERRIDE_LIST[model_name]) - else: - self.update_num_gpus( - self._determine_vram_usage(model_folder_path, total_vram) - ) - else: - self.update_memory(self._determine_ram_usage(model_folder_path)) - - if model_name in MODEL_VRAM_OVERRIDE_LIST: - self.update_min_replicas(1) - self.update_max_replicas(1) - - serve.run( - # kept model_folder_path for backward compatibility - self._deployment.options(name=model_name).bind(model_folder_path), - name=application_name, - route_prefix=f"/{application_name}", - ) - - def undeploy(self, model_folder_path: str, ray_addr: str): - warn( - "Deploy/Undeploy will soon be remove from the scope of SDK", - PendingDeprecationWarning, - ) - if not ray.is_initialized(): - ray_addr = "ray://" + ray_addr.replace("9000", "10001") - ray.init(address=ray_addr, runtime_env=DEFAULT_RUNTIME_ENV) - # /model-repository/{owner_type}/{owner_uid}/{model_id} - model_path_string_parts = model_folder_path.split("/") - application_name = "_".join(model_path_string_parts[3:]) - serve.delete(application_name) - - def __call__(self): - raise RuntimeError( - "Deployments cannot be constructed directly. Use `deploy()` instead." - ) - def instill_deployment( _func_or_class: Optional[Callable] = None, ) -> Callable[[Callable], InstillDeployable]: return ray_deployment( _func_or_class=_func_or_class, - ray_actor_options=DEFAULT_RAY_ACTOR_OPRTIONS, + ray_actor_options=DEFAULT_RAY_ACTOR_OPTIONS, autoscaling_config=DEFAULT_AUTOSCALING_CONFIG, max_concurrent_queries=DEFAULT_MAX_CONCURRENT_QUERIES, )