diff --git a/.gitignore b/.gitignore index 1dce76f4b8..3e995826da 100644 --- a/.gitignore +++ b/.gitignore @@ -198,3 +198,6 @@ mlstacks_reset.sh .local/ # PLEASE KEEP THIS LINE AT THE EOF: never include here src/zenml/zen_server/dashboard, since it is affecting release flow + + +dashboard/ \ No newline at end of file diff --git a/docs/book/user-guide/advanced-guide/best-practices/debug-and-solve-issues.md b/docs/book/user-guide/advanced-guide/best-practices/debug-and-solve-issues.md index 3941d59cdc..da027b660a 100644 --- a/docs/book/user-guide/advanced-guide/best-practices/debug-and-solve-issues.md +++ b/docs/book/user-guide/advanced-guide/best-practices/debug-and-solve-issues.md @@ -77,7 +77,7 @@ ANALYTICS_CLIENT_ID: xxxxxxx-xxxxxxx-xxxxxxx ANALYTICS_USER_ID: xxxxxxx-xxxxxxx-xxxxxxx ANALYTICS_SERVER_ID: xxxxxxx-xxxxxxx-xxxxxxx INTEGRATIONS: ['airflow', 'aws', 'azure', 'dash', 'evidently', 'facets', 'feast', 'gcp', 'github', -'graphviz', 'huggingface', 'kaniko', 'kserve', 'kubeflow', 'kubernetes', 'lightgbm', 'mlflow', +'graphviz', 'huggingface', 'kaniko', 'kubeflow', 'kubernetes', 'lightgbm', 'mlflow', 'neptune', 'neural_prophet', 'pillow', 'plotly', 'pytorch', 'pytorch_lightning', 's3', 'scipy', 'sklearn', 'slack', 'spark', 'tensorboard', 'tensorflow', 'vault', 'wandb', 'whylogs', 'xgboost'] ``` diff --git a/docs/mocked_libs.json b/docs/mocked_libs.json index 0bd5108ae5..7316925c61 100644 --- a/docs/mocked_libs.json +++ b/docs/mocked_libs.json @@ -115,7 +115,6 @@ "kfp_tekton", "kfp_tekton.compiler", "kfp_tekton.compiler.pipeline_utils", - "kserve", "kubernetes", "kubernetes.client", "kubernetes.client.rest", diff --git a/pyproject.toml b/pyproject.toml index 025e0c9a00..7ab18995a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -416,7 +416,6 @@ module = [ "flask.*", "kfp.*", "kubernetes.*", - "kserve.*", "urllib3.*", "kfp_server_api.*", "sagemaker.*", diff --git a/scripts/install-zenml-dev.sh b/scripts/install-zenml-dev.sh index 71ff086dbf..d6f61cb41b 100755 --- a/scripts/install-zenml-dev.sh +++ b/scripts/install-zenml-dev.sh @@ -40,7 +40,7 @@ install_integrations() { # figure out the python version python_version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:2])))") - ignore_integrations="feast label_studio bentoml seldon kserve pycaret skypilot_aws skypilot_gcp skypilot_azure" + ignore_integrations="feast label_studio bentoml seldon pycaret skypilot_aws skypilot_gcp skypilot_azure" # if python version is 3.11, exclude all integrations depending on kfp # because they are not yet compatible with python 3.11 if [ "$python_version" = "3.11" ]; then diff --git a/src/zenml/cli/stack.py b/src/zenml/cli/stack.py index 090c1db88b..07459499b1 100644 --- a/src/zenml/cli/stack.py +++ b/src/zenml/cli/stack.py @@ -1366,7 +1366,7 @@ def _get_deployment_params_interactively( "-md", "model_deployer", required=False, - type=click.Choice(["kserve", "seldon"]), + type=click.Choice(["mlflow", "seldon"]), help="The flavor of model deployer to use. ", ) @click.option( diff --git a/src/zenml/integrations/__init__.py b/src/zenml/integrations/__init__.py index 786e4d86f7..3b2e37ca37 100644 --- a/src/zenml/integrations/__init__.py +++ b/src/zenml/integrations/__init__.py @@ -37,7 +37,6 @@ from zenml.integrations.huggingface import HuggingfaceIntegration # noqa from zenml.integrations.hyperai import HyperAIIntegration # noqa from zenml.integrations.kaniko import KanikoIntegration # noqa -from zenml.integrations.kserve import KServeIntegration # noqa from zenml.integrations.kubeflow import KubeflowIntegration # noqa from zenml.integrations.kubernetes import KubernetesIntegration # noqa from zenml.integrations.label_studio import LabelStudioIntegration # noqa diff --git a/src/zenml/integrations/bentoml/__init__.py b/src/zenml/integrations/bentoml/__init__.py index 24702f4288..bd05689562 100644 --- a/src/zenml/integrations/bentoml/__init__.py +++ b/src/zenml/integrations/bentoml/__init__.py @@ -42,7 +42,7 @@ def activate(cls) -> None: @classmethod def flavors(cls) -> List[Type[Flavor]]: - """Declare the stack component flavors for KServe. + """Declare the stack component flavors for BentoML. Returns: List of stack component flavors for this integration. diff --git a/src/zenml/integrations/constants.py b/src/zenml/integrations/constants.py index a833d08e2d..cb800d9b25 100644 --- a/src/zenml/integrations/constants.py +++ b/src/zenml/integrations/constants.py @@ -28,7 +28,6 @@ GITHUB = "github" GITLAB = "gitlab" GRAPHVIZ = "graphviz" -KSERVE = "kserve" HUGGINGFACE = "huggingface" HYPERAI = "hyperai" GREAT_EXPECTATIONS = "great_expectations" diff --git a/src/zenml/integrations/kserve/__init__.py b/src/zenml/integrations/kserve/__init__.py deleted file mode 100644 index fb0e5af3b2..0000000000 --- a/src/zenml/integrations/kserve/__init__.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Initialization of the KServe integration for ZenML. - -The KServe integration allows you to use the KServe model serving -platform to implement continuous model deployment. -""" -from typing import List, Type - -from zenml.enums import StackComponentType -from zenml.integrations.constants import KSERVE -from zenml.integrations.integration import Integration -from zenml.stack import Flavor - -KSERVE_MODEL_DEPLOYER_FLAVOR = "kserve" - - -class KServeIntegration(Integration): - """Definition of KServe integration for ZenML.""" - - NAME = KSERVE - REQUIREMENTS = [ - "kserve>=0.9.0,<=10", - "torch-model-archiver", - ] - - @classmethod - def activate(cls) -> None: - """Activate the KServe integration.""" - from zenml.integrations.kserve import model_deployers # noqa - from zenml.integrations.kserve import secret_schemas # noqa - from zenml.integrations.kserve import services # noqa - - @classmethod - def flavors(cls) -> List[Type[Flavor]]: - """Declare the stack component flavors for KServe. - - Returns: - List of stack component flavors for this integration. - """ - from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor - - return [KServeModelDeployerFlavor] - - -KServeIntegration.check_installation() diff --git a/src/zenml/integrations/kserve/constants.py b/src/zenml/integrations/kserve/constants.py deleted file mode 100644 index 567d09c689..0000000000 --- a/src/zenml/integrations/kserve/constants.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""KServe constants.""" - -KSERVE_DOCKER_IMAGE_KEY = "kserve_model_deployer" -KSERVE_CUSTOM_DEPLOYMENT = "kserve_custom_deployment" diff --git a/src/zenml/integrations/kserve/custom_deployer/__init__.py b/src/zenml/integrations/kserve/custom_deployer/__init__.py deleted file mode 100644 index 6e09d0497b..0000000000 --- a/src/zenml/integrations/kserve/custom_deployer/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Initialization of ZenML custom deployer.""" - -from zenml.integrations.kserve.custom_deployer.zenml_custom_model import ( # noqa - ZenMLCustomModel, -) diff --git a/src/zenml/integrations/kserve/custom_deployer/zenml_custom_model.py b/src/zenml/integrations/kserve/custom_deployer/zenml_custom_model.py deleted file mode 100644 index ee883d51a7..0000000000 --- a/src/zenml/integrations/kserve/custom_deployer/zenml_custom_model.py +++ /dev/null @@ -1,176 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Implements a custom model for the Kserve integration.""" - -from typing import Any, Dict - -import click -import kserve - -from zenml.logger import get_logger -from zenml.utils import source_utils - -logger = get_logger(__name__) - -DEFAULT_MODEL_NAME = "model" -DEFAULT_LOCAL_MODEL_DIR = "/mnt/models" - - -class ZenMLCustomModel(kserve.Model): # type: ignore[misc] - """Custom model class for ZenML and Kserve. - - This class is used to implement a custom model for the Kserve integration, - which is used as the main entry point for custom code execution. - - Attributes: - model_name: The name of the model. - model_uri: The URI of the model. - predict_func: The predict function of the model. - """ - - def __init__( - self, - model_name: str, - model_uri: str, - predict_func: str, - ): - """Initializes a ZenMLCustomModel object. - - Args: - model_name: The name of the model. - model_uri: The URI of the model. - predict_func: The predict function of the model. - """ - super().__init__(model_name) - self.name = model_name - self.model_uri = model_uri - self.predict_func = source_utils.load(predict_func) - self.model = None - self.ready = False - - def load(self) -> bool: - """Load the model. - - This function loads the model into memory and sets the ready flag to True. - - The model is loaded using the materializer, by saving the information of - the artifact to a YAML file in the same path as the model artifacts at - the preparing time and loading it again at the prediction time by - the materializer. - - Returns: - True if the model was loaded successfully, False otherwise. - - """ - try: - from zenml.artifacts.utils import load_model_from_metadata - - self.model = load_model_from_metadata(self.model_uri) - except Exception as e: - logger.error("Failed to load model: {}".format(e)) - return False - self.ready = True - return self.ready - - def predict(self, request: Dict[str, Any]) -> Dict[str, Any]: - """Predict the given request. - - The main predict function of the model. This function is called by the - KServe server when a request is received. Then inside this function, - the user-defined predict function is called. - - Args: - request: The request to predict in a dictionary. e.g. {"instances": []} - - Returns: - The prediction dictionary. - - Raises: - RuntimeError: If function could not be called. - NotImplementedError: If the model is not ready. - TypeError: If the request is not a dictionary. - """ - if self.predict_func is not None: - try: - prediction = { - "predictions": self.predict_func( - self.model, request["instances"] - ) - } - except RuntimeError as err: - raise RuntimeError("Failed to predict: {}".format(err)) - if isinstance(prediction, dict): - return prediction - else: - raise TypeError( - f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}" - ) - else: - raise NotImplementedError("Predict function is not implemented") - - -@click.command() -@click.option( - "--model_uri", - default=DEFAULT_LOCAL_MODEL_DIR, - type=click.STRING, - help="The directory where the model is stored locally.", -) -@click.option( - "--model_name", - default=DEFAULT_MODEL_NAME, - required=True, - type=click.STRING, - help="The name of the model to deploy. This is important for the KServe server.", -) -@click.option( - "--predict_func", - required=True, - type=click.STRING, - help="The path to the custom predict function defined by the user.", -) -def main(model_name: str, model_uri: str, predict_func: str) -> None: - """Main function responsible for starting the KServe server. - - The way the custom deployment server works with the KServe server is by - implementing a custom model class and passing it to the KServe server and then - starting the server. Because custom classes usually need some parameters to - be passed to the model, the parameters are passed from the entry point to the - main function as arguments and then passed to the model class constructor. - - The following is an example of the entry point: - ``` - entrypoint_command = [ - "python", - "-m", - "zenml.integrations.kserve.custom_deployer.zenml_custom_model", - "--model_name", - config.service_config.model_name, - "--predict_func", - config.custom_deploy_parameters.predict_function, - ] - ``` - - Args: - model_name: The name of the model. - model_uri: The URI of the model. - predict_func: The path to the predict function defined by the user. - """ - model = ZenMLCustomModel(model_name, model_uri, predict_func) - model.load() - kserve.ModelServer().start([model]) - - -if __name__ == "__main__": - main() diff --git a/src/zenml/integrations/kserve/flavors/__init__.py b/src/zenml/integrations/kserve/flavors/__init__.py deleted file mode 100644 index f29fd33e08..0000000000 --- a/src/zenml/integrations/kserve/flavors/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""KServe integration flavors.""" - -from zenml.integrations.kserve.flavors.kserve_model_deployer_flavor import ( - KServeModelDeployerConfig, - KServeModelDeployerFlavor, -) - -__all__ = [ - "KServeModelDeployerFlavor", - "KServeModelDeployerConfig", -] diff --git a/src/zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py b/src/zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py deleted file mode 100644 index 1080e5d7a9..0000000000 --- a/src/zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""KServe model deployer flavor.""" - -from typing import TYPE_CHECKING, Optional, Type - -from zenml.constants import KUBERNETES_CLUSTER_RESOURCE_TYPE -from zenml.integrations.kserve import KSERVE_MODEL_DEPLOYER_FLAVOR -from zenml.model_deployers.base_model_deployer import ( - BaseModelDeployerConfig, - BaseModelDeployerFlavor, -) -from zenml.models import ServiceConnectorRequirements - -if TYPE_CHECKING: - from zenml.integrations.kserve.model_deployers import KServeModelDeployer - - -class KServeModelDeployerConfig(BaseModelDeployerConfig): - """Configuration for the KServeModelDeployer. - - Attributes: - kubernetes_context: the Kubernetes context to use to contact the remote - KServe installation. If not specified, the current - configuration is used. Depending on where the KServe model deployer - is being used, this can be either a locally active context or an - in-cluster Kubernetes configuration (if running inside a pod). - If the model deployer stack component is linked to a Kubernetes - service connector, this field is ignored. - kubernetes_namespace: the Kubernetes namespace where the KServe - inference service CRDs are provisioned and managed by ZenML. If not - specified, the namespace set in the current configuration is used. - Depending on where the KServe model deployer is being used, this can - be either the current namespace configured in the locally active - context or the namespace in the context of which the pod is running - (if running inside a pod). - base_url: the base URL of the Kubernetes ingress used to expose the - KServe inference services. - secret: the name of the secret containing the credentials for the - KServe inference services. - """ - - kubernetes_context: Optional[str] = None - kubernetes_namespace: Optional[str] = None - base_url: str # TODO: unused? - secret: Optional[str] - custom_domain: Optional[str] # TODO: unused? - - -class KServeModelDeployerFlavor(BaseModelDeployerFlavor): - """Flavor for the KServe model deployer.""" - - @property - def name(self) -> str: - """Name of the flavor. - - Returns: - Name of the flavor. - """ - return KSERVE_MODEL_DEPLOYER_FLAVOR - - @property - def service_connector_requirements( - self, - ) -> Optional[ServiceConnectorRequirements]: - """Service connector resource requirements for service connectors. - - Specifies resource requirements that are used to filter the available - service connector types that are compatible with this flavor. - - Returns: - Requirements for compatible service connectors, if a service - connector is required for this flavor. - """ - return ServiceConnectorRequirements( - resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE, - ) - - @property - def docs_url(self) -> Optional[str]: - """A url to point at docs explaining this flavor. - - Returns: - A flavor docs url. - """ - return self.generate_default_docs_url() - - @property - def sdk_docs_url(self) -> Optional[str]: - """A url to point at SDK docs explaining this flavor. - - Returns: - A flavor SDK docs url. - """ - return self.generate_default_sdk_docs_url() - - @property - def logo_url(self) -> str: - """A url to represent the flavor in the dashboard. - - Returns: - The flavor logo. - """ - return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/kserve.png" - - @property - def config_class(self) -> Type[KServeModelDeployerConfig]: - """Returns `KServeModelDeployerConfig` config class. - - Returns: - The config class. - """ - return KServeModelDeployerConfig - - @property - def implementation_class(self) -> Type["KServeModelDeployer"]: - """Implementation class for this flavor. - - Returns: - The implementation class. - """ - from zenml.integrations.kserve.model_deployers import ( - KServeModelDeployer, - ) - - return KServeModelDeployer diff --git a/src/zenml/integrations/kserve/model_deployers/__init__.py b/src/zenml/integrations/kserve/model_deployers/__init__.py deleted file mode 100644 index fa2ceb5747..0000000000 --- a/src/zenml/integrations/kserve/model_deployers/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Initialization of the KServe Model Deployer.""" - -from zenml.integrations.kserve.model_deployers.kserve_model_deployer import ( # noqa - KServeModelDeployer, -) - -__all__ = ["KServeModelDeployer"] diff --git a/src/zenml/integrations/kserve/model_deployers/kserve_model_deployer.py b/src/zenml/integrations/kserve/model_deployers/kserve_model_deployer.py deleted file mode 100644 index f4c536f777..0000000000 --- a/src/zenml/integrations/kserve/model_deployers/kserve_model_deployer.py +++ /dev/null @@ -1,1004 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Implementation of the KServe Model Deployer.""" - -import base64 -import json -import re -from typing import ( - TYPE_CHECKING, - Any, - ClassVar, - Dict, - List, - Optional, - Type, - cast, -) -from uuid import UUID, uuid4 - -from kserve import KServeClient, V1beta1InferenceService, constants, utils -from kubernetes import client as k8s_client - -from zenml.analytics.enums import AnalyticsEvent -from zenml.analytics.utils import track_handler -from zenml.client import Client -from zenml.config.build_configuration import BuildConfiguration -from zenml.enums import StackComponentType -from zenml.integrations.kserve.constants import ( - KSERVE_CUSTOM_DEPLOYMENT, - KSERVE_DOCKER_IMAGE_KEY, -) -from zenml.integrations.kserve.flavors.kserve_model_deployer_flavor import ( - KServeModelDeployerConfig, - KServeModelDeployerFlavor, -) -from zenml.integrations.kserve.secret_schemas.secret_schemas import ( - KServeAzureSecretSchema, - KServeGSSecretSchema, - KServeS3SecretSchema, -) -from zenml.integrations.kserve.services.kserve_deployment import ( - KServeDeploymentConfig, - KServeDeploymentService, -) -from zenml.logger import get_logger -from zenml.model_deployers import BaseModelDeployer, BaseModelDeployerFlavor -from zenml.secret.base_secret import BaseSecretSchema -from zenml.services.service import BaseService, ServiceConfig -from zenml.stack import StackValidator - -if TYPE_CHECKING: - from zenml.models import PipelineDeploymentBase - -logger = get_logger(__name__) - -DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT = 300 - - -class KubeClientKServeClient(KServeClient): # type: ignore[misc] - """KServe client initialized from a Kubernetes client. - - This is a workaround for the fact that the native KServe client does not - support initialization from an existing Kubernetes client. - """ - - def __init__( - self, kube_client: k8s_client.ApiClient, *args: Any, **kwargs: Any - ) -> None: - """Initializes the KServe client from a Kubernetes client. - - Args: - kube_client: pre-configured Kubernetes client. - *args: standard KServe client positional arguments. - **kwargs: standard KServe client keyword arguments. - """ - from kubernetes import client - - self.core_api = client.CoreV1Api(kube_client) - self.app_api = client.AppsV1Api(kube_client) - self.api_instance = client.CustomObjectsApi(kube_client) - - -class KServeModelDeployer(BaseModelDeployer): - """KServe model deployer stack component implementation.""" - - NAME: ClassVar[str] = "KServe" - FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = KServeModelDeployerFlavor - - _client: Optional[KServeClient] = None - - @property - def config(self) -> KServeModelDeployerConfig: - """Returns the `KServeModelDeployerConfig` config. - - Returns: - The configuration. - """ - return cast(KServeModelDeployerConfig, self._config) - - @property - def validator(self) -> Optional[StackValidator]: - """Ensures there is a container registry and image builder in the stack. - - Returns: - A `StackValidator` instance. - """ - # Log deprecation warning - logger.warning( - "The KServe model deployer is deprecated and is no longer " - "being maintained by the ZenML core team. If you are looking for a " - "scalable Kubernetes-based model deployment solution, consider " - "using Seldon instead: " - "https://docs.zenml.io/stacks-and-components/component-guide/model-deployers/seldon", - ) - return StackValidator( - required_components={ - StackComponentType.IMAGE_BUILDER, - } - ) - - @staticmethod - def get_model_server_info( # type: ignore[override] - service_instance: "KServeDeploymentService", - ) -> Dict[str, Optional[str]]: - """Return implementation specific information on the model server. - - Args: - service_instance: KServe deployment service object - - Returns: - A dictionary containing the model server information. - """ - return { - "PREDICTION_URL": service_instance.prediction_url, - "PREDICTION_HOSTNAME": service_instance.prediction_hostname, - "MODEL_URI": service_instance.config.model_uri, - "MODEL_NAME": service_instance.config.model_name, - "KSERVE_INFERENCE_SERVICE": service_instance.crd_name, - } - - @property - def kserve_client(self) -> KServeClient: - """Get the KServe client associated with this model deployer. - - Returns: - The KServeclient. - - Raises: - RuntimeError: If the Kubernetes namespace is not configured in the - stack component when using a service connector to deploy models - with KServe. - """ - # Refresh the client also if the connector has expired - if self._client and not self.connector_has_expired(): - return self._client - - connector = self.get_connector() - if connector: - if not self.config.kubernetes_namespace: - raise RuntimeError( - "The Kubernetes namespace must be explicitly configured in " - "the stack component when using a service connector to " - "deploy models with KServe." - ) - client = connector.connect() - if not isinstance(client, k8s_client.ApiClient): - raise RuntimeError( - f"Expected a k8s_client.ApiClient while trying to use the " - f"linked connector, but got {type(client)}." - ) - self._client = KubeClientKServeClient( - kube_client=client, - ) - else: - self._client = KServeClient( - context=self.config.kubernetes_context, - ) - return self._client - - def get_docker_builds( - self, deployment: "PipelineDeploymentBase" - ) -> List["BuildConfiguration"]: - """Gets the Docker builds required for the component. - - Args: - deployment: The pipeline deployment for which to get the builds. - - Returns: - The required Docker builds. - """ - builds = [] - for step_name, step in deployment.step_configurations.items(): - if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True: - build = BuildConfiguration( - key=KSERVE_DOCKER_IMAGE_KEY, - settings=step.config.docker_settings, - step_name=step_name, - ) - builds.append(build) - - return builds - - def deploy_model( - self, - config: ServiceConfig, - replace: bool = False, - timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT, - ) -> BaseService: - """Create a new KServe deployment or update an existing one. - - This method has two modes of operation, depending on the `replace` - argument value: - - * if `replace` is False, calling this method will create a new KServe - deployment server to reflect the model and other configuration - parameters specified in the supplied KServe deployment `config`. - - * if `replace` is True, this method will first attempt to find an - existing KServe deployment that is *equivalent* to the supplied - configuration parameters. Two or more KServe deployments are - considered equivalent if they have the same `pipeline_name`, - `pipeline_step_name` and `model_name` configuration parameters. To - put it differently, two KServe deployments are equivalent if - they serve versions of the same model deployed by the same pipeline - step. If an equivalent KServe deployment is found, it will be - updated in place to reflect the new configuration parameters. This - allows an existing KServe deployment to retain its prediction - URL while performing a rolling update to serve a new model version. - - Callers should set `replace` to True if they want a continuous model - deployment workflow that doesn't spin up a new KServe deployment - server for each new model version. If multiple equivalent KServe - deployments are found, the most recently created deployment is selected - to be updated and the others are deleted. - - Args: - config: the configuration of the model to be deployed with KServe. - replace: set this flag to True to find and update an equivalent - KServeDeployment server with the new model instead of - starting a new deployment server. - timeout: the timeout in seconds to wait for the KServe server - to be provisioned and successfully started or updated. If set - to 0, the method will return immediately after the KServe - server is provisioned, without waiting for it to fully start. - - Returns: - The ZenML KServe deployment service object that can be used to - interact with the remote KServe server. - - Raises: - RuntimeError: if the KServe deployment server could not be stopped. - """ - with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler: - config = cast(KServeDeploymentConfig, config) - service = None - - # if replace is True, find equivalent KServe deployments - if replace is True: - equivalent_services = self.find_model_server( - running=False, - pipeline_name=config.pipeline_name, - pipeline_step_name=config.pipeline_step_name, - model_name=config.model_name, - ) - - for equivalent_service in equivalent_services: - if service is None: - # keep the most recently created service - service = equivalent_service - else: - try: - # delete the older services and don't wait for - # them to be deprovisioned - service.stop() - except RuntimeError as e: - raise RuntimeError( - "Failed to stop the KServe deployment " - "server:\n", - f"{e}\n", - "Please stop it manually and try again.", - ) - - if service: - # Reuse the service account and secret from the existing - # service. - assert isinstance(service, KServeDeploymentService) - config.k8s_service_account = service.config.k8s_service_account - config.k8s_secret = service.config.k8s_secret - - # configure the credentials for the KServe model server - self._create_or_update_kserve_credentials(config) - if service: - # update an equivalent service in place - service.update(config) - logger.info( - f"Updating an existing KServe deployment service: {service}" - ) - else: - # create a new service - service = KServeDeploymentService(config=config) - logger.info( - f"Creating a new KServe deployment service: {service}" - ) - - # start the service which in turn provisions the KServe - # deployment server and waits for it to reach a ready state - service.start(timeout=timeout) - - # Add telemetry with metadata that gets the stack metadata and - # differentiates between pure model and custom code deployments - stack = Client().active_stack - stack_metadata = { - component_type.value: component.flavor - for component_type, component in stack.components.items() - } - analytics_handler.metadata = { - "store_type": Client().zen_store.type.value, - **stack_metadata, - "is_custom_code_deployment": config.container is not None, - } - return service - - def get_kserve_deployments( - self, labels: Dict[str, str] - ) -> List[V1beta1InferenceService]: - """Get a list of KServe deployments that match the supplied labels. - - Args: - labels: a dictionary of labels to match against KServe deployments. - - Returns: - A list of KServe deployments that match the supplied labels. - - Raises: - RuntimeError: if an operational failure is encountered while - """ - label_selector = ( - ",".join(f"{k}={v}" for k, v in labels.items()) if labels else None - ) - - namespace = ( - self.config.kubernetes_namespace - or utils.get_default_target_namespace() - ) - - try: - response = ( - self.kserve_client.api_instance.list_namespaced_custom_object( - constants.KSERVE_GROUP, - constants.KSERVE_V1BETA1_VERSION, - namespace, - constants.KSERVE_PLURAL, - label_selector=label_selector, - ) - ) - except k8s_client.rest.ApiException as e: - raise RuntimeError( - "Exception when retrieving KServe inference services\ - %s\n" - % e - ) - - # TODO[CRITICAL]: de-serialize each item into a complete - # V1beta1InferenceService object recursively using the OpenApi - # schema (this doesn't work right now) - inference_services: List[V1beta1InferenceService] = [] - for item in response.get("items", []): - snake_case_item = self._camel_to_snake(item) - inference_service = V1beta1InferenceService(**snake_case_item) - inference_services.append(inference_service) - return inference_services - - def _camel_to_snake(self, obj: Dict[str, Any]) -> Dict[str, Any]: - """Convert a camelCase dictionary to snake_case. - - Args: - obj: a dictionary with camelCase keys - - Returns: - a dictionary with snake_case keys - """ - if isinstance(obj, (str, int, float)): - return obj - if isinstance(obj, dict): - assert obj is not None - new = obj.__class__() - for k, v in obj.items(): - new[self._convert_to_snake(k)] = self._camel_to_snake(v) - elif isinstance(obj, (list, set, tuple)): - assert obj is not None - new = obj.__class__(self._camel_to_snake(v) for v in obj) - else: - return obj - return new - - def _convert_to_snake(self, k: str) -> str: - return re.sub(r"(? List[BaseService]: - """Find one or more KServe model services that match the given criteria. - - Args: - running: If true, only running services will be returned. - service_uuid: The UUID of the service that was originally used - to deploy the model. - pipeline_name: name of the pipeline that the deployed model was part - of. - run_name: name of the pipeline run which the deployed model was - part of. - pipeline_step_name: the name of the pipeline model deployment step - that deployed the model. - model_name: the name of the deployed model. - model_uri: URI of the deployed model. - predictor: the name of the predictor that was used to deploy the model. - - Returns: - One or more Service objects representing model servers that match - the input search criteria. - """ - config = KServeDeploymentConfig( - pipeline_name=pipeline_name or "", - run_name=run_name or "", - pipeline_run_id=run_name or "", - pipeline_step_name=pipeline_step_name or "", - model_uri=model_uri or "", - model_name=model_name or "", - predictor=predictor or "", - resources={}, - ) - labels = config.get_kubernetes_labels() - - if service_uuid: - labels["zenml.service_uuid"] = str(service_uuid) - - deployments = self.get_kserve_deployments(labels=labels) - - services: List[BaseService] = [] - for deployment in deployments: - # recreate the KServe deployment service object from the KServe - # deployment resource - service = KServeDeploymentService.create_from_deployment( - deployment=deployment - ) - if running and not service.is_running: - # skip non-running services - continue - services.append(service) - - return services - - def stop_model_server( - self, - uuid: UUID, - timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT, - force: bool = False, - ) -> None: - """Stop a KServe model server. - - Args: - uuid: UUID of the model server to stop. - timeout: timeout in seconds to wait for the service to stop. - force: if True, force the service to stop. - - Raises: - NotImplementedError: stopping on KServe model servers is not - supported. - """ - raise NotImplementedError( - "Stopping KServe model servers is not implemented. Try " - "deleting the KServe model server instead." - ) - - def start_model_server( - self, - uuid: UUID, - timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT, - ) -> None: - """Start a KServe model deployment server. - - Args: - uuid: UUID of the model server to start. - timeout: timeout in seconds to wait for the service to become - active. . If set to 0, the method will return immediately after - provisioning the service, without waiting for it to become - active. - - Raises: - NotImplementedError: since we don't support starting KServe - model servers - """ - raise NotImplementedError( - "Starting KServe model servers is not implemented" - ) - - def delete_model_server( - self, - uuid: UUID, - timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT, - force: bool = False, - ) -> None: - """Delete a KServe model deployment server. - - Args: - uuid: UUID of the model server to delete. - timeout: timeout in seconds to wait for the service to stop. If - set to 0, the method will return immediately after - deprovisioning the service, without waiting for it to stop. - force: if True, force the service to stop. - """ - services = self.find_model_server(service_uuid=uuid) - if len(services) == 0: - return - service = services[0] - service.stop(timeout=timeout, force=force) - assert isinstance(service, KServeDeploymentService) - if service.config.k8s_service_account: - self.delete_k8s_service_account(service.config.k8s_service_account) - if service.config.k8s_secret: - self.delete_k8s_secret(service.config.k8s_secret) - - def _create_or_update_kserve_credentials( - self, config: KServeDeploymentConfig - ) -> None: - """Create or update the KServe credentials used to access the artifact store. - - The way KServe allows configured credentials to be passed to the - model servers is a bit convoluted: - - * we need to create a Kubernetes secret object with credentials - in the correct format supported by KServe (only AWS, GCP, Azure are - supported) - * we need to create a Kubernetes service account object that - references the secret object - * we need to use the service account object in the KServe - deployment configuration - - This method will use a random name for every model server. This ensures - that we can create multiple KServe deployments with different - credentials without running into naming conflicts. - - If a ZenML secret is not explicitly configured for the model deployment - or the model deployer, this method attempts to fetch credentials from - the active artifact store and convert them into the appropriate secret - format expected by KServe. - - Args: - config: KServe deployment configuration. - - Raises: - RuntimeError: if the configured secret object is not found. - """ - secret_name = config.secret_name or self.config.secret - - if secret_name: - if config.secret_name: - secret_source = "model deployment" - else: - secret_source = "Model Deployer" - - logger.warning( - f"Your KServe {secret_source} is configured to use a " - f"ZenML secret `{secret_name}` that holds credentials needed " - "to access the artifact store. The recommended authentication " - "method is to configure credentials for the artifact store " - "stack component instead. The KServe model deployer will use " - "those credentials to authenticate to the artifact store " - "automatically." - ) - - try: - zenml_secret = Client().get_secret_by_name_and_scope( - secret_name - ) - except KeyError as e: - raise RuntimeError( - f"The ZenML secret '{secret_name}' specified in the " - f"KServe {secret_source} configuration was not found " - f"in the secrets store: {e}." - ) - - credentials = zenml_secret.secret_values - - else: - # if no secret is configured, try to fetch credentials from the - # active artifact store and convert them into the appropriate format - # expected by KServe - converted_secret = self._convert_artifact_store_secret() - - if not converted_secret: - # If a secret and service account were previously configured, we - # need to delete them before we can proceed - if config.k8s_service_account: - self.delete_k8s_service_account(config.k8s_service_account) - config.k8s_service_account = None - if config.k8s_secret: - self.delete_k8s_secret(config.k8s_secret) - config.k8s_secret = None - return - - credentials = converted_secret.get_values() - - # S3 credentials are special because part of them need to be passed - # as annotations - annotations: Dict[str, str] = {} - if "aws_access_key_id" in credentials: - if credentials.get("s3_region"): - annotations["serving.kubeflow.org/s3-region"] = ( - credentials.pop("s3_region") - ) - if credentials.get("s3_endpoint"): - annotations["serving.kubeflow.org/s3-endpoint"] = ( - credentials.pop("s3_endpoint") - ) - if credentials.get("s3_use_https"): - annotations["serving.kubeflow.org/s3-usehttps"] = ( - credentials.pop("s3_use_https") - ) - if credentials.get("s3_verify_ssl"): - annotations["serving.kubeflow.org/s3-verifyssl"] = ( - credentials.pop("s3_verify_ssl") - ) - - # Convert all keys to uppercase - credentials = {k.upper(): v for k, v in credentials.items()} - - # The GCP credentials need to use a specific key name - if "GOOGLE_APPLICATION_CREDENTIALS" in credentials: - credentials["gcloud-application-credentials.json"] = ( - credentials.pop("GOOGLE_APPLICATION_CREDENTIALS") - ) - - # Create or update the Kubernetes secret object - config.k8s_secret = self.create_or_update_k8s_secret( - name=config.k8s_secret, - annotations=annotations, - secret_values=credentials, - ) - - # Create or update the Kubernetes service account object - config.k8s_service_account = self.create_or_update_k8s_service_account( - name=config.k8s_service_account, - secret_name=config.k8s_secret, - ) - - def _convert_artifact_store_secret(self) -> Optional[BaseSecretSchema]: - """Convert the credentials configured for the artifact store into a ZenML secret. - - Returns: - The KServe credentials in the format expected by KServe or None if - no credentials are configured for the artifact store or if they - cannot be converted into the KServe format. - """ - artifact_store = Client().active_stack.artifact_store - - zenml_secret: BaseSecretSchema - - if artifact_store.flavor == "s3": - from zenml.integrations.s3.artifact_stores import S3ArtifactStore - - assert isinstance(artifact_store, S3ArtifactStore) - - ( - aws_access_key_id, - aws_secret_access_key, - _, - ) = artifact_store.get_credentials() - - if aws_access_key_id and aws_secret_access_key: - # Convert the credentials into the format expected by KServe - zenml_secret = KServeS3SecretSchema( - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - ) - if artifact_store.config.client_kwargs: - if "endpoint_url" in artifact_store.config.client_kwargs: - zenml_secret.s3_endpoint = str( - artifact_store.config.client_kwargs["endpoint_url"] - ) - if "region_name" in artifact_store.config.client_kwargs: - zenml_secret.s3_region = str( - artifact_store.config.client_kwargs["region_name"] - ) - if "use_ssl" in artifact_store.config.client_kwargs: - zenml_secret.s3_use_https = str( - artifact_store.config.client_kwargs["use_ssl"] - ) - - return zenml_secret - - logger.warning( - "No credentials are configured for the active S3 artifact " - "store. The KServe model deployer will assume an " - "implicit form of authentication is available in the " - "target Kubernetes cluster, but the served model may not " - "be able to access the model artifacts." - ) - - # Assume implicit in-cluster IAM authentication - return None - - elif artifact_store.flavor == "gcp": - from zenml.integrations.gcp.artifact_stores import GCPArtifactStore - - assert isinstance(artifact_store, GCPArtifactStore) - - gcp_credentials = artifact_store.get_credentials() - - if gcp_credentials: - # Convert the credentials into the format expected by KServe - return KServeGSSecretSchema( - google_application_credentials=json.dumps(gcp_credentials), - ) - - logger.warning( - "No credentials are configured for the active GCS artifact " - "store. The KServe model deployer will assume an " - "implicit form of authentication is available in the " - "target Kubernetes cluster, but the served model may not " - "be able to access the model artifacts." - ) - return None - - elif artifact_store.flavor == "azure": - from zenml.integrations.azure.artifact_stores import ( - AzureArtifactStore, - ) - - assert isinstance(artifact_store, AzureArtifactStore) - - azure_credentials = artifact_store.get_credentials() - - if azure_credentials: - # Convert the credentials into the format expected by KServe - if ( - azure_credentials.client_id is not None - and azure_credentials.client_secret is not None - and azure_credentials.tenant_id is not None - ): - return KServeAzureSecretSchema( - azure_client_id=azure_credentials.client_id, - azure_client_secret=azure_credentials.client_secret, - azure_tenant_id=azure_credentials.tenant_id, - ) - else: - logger.warning( - "The KServe model deployer could not use the " - "credentials currently configured in the active Azure " - "artifact store because it only supports service " - "principal Azure credentials. " - "Please configure Azure principal credentials for your " - "artifact store or specify a custom ZenML secret in " - "the model deployer configuration that holds the " - "credentials required to access the model artifacts. " - "The KServe model deployer will assume an implicit " - "form of authentication is available in the target " - "Kubernetes cluster, but the served model " - "may not be able to access the model artifacts." - ) - - return None - - logger.warning( - "No credentials are configured for the active Azure " - "artifact store. The Seldon Core model deployer will " - "assume an implicit form of authentication is available " - "in the target Kubernetes cluster, but the served model " - "may not be able to access the model artifacts." - ) - return None - - logger.warning( - "The KServe model deployer doesn't know how to configure " - f"credentials automatically for the `{artifact_store.flavor}` " - "active artifact store flavor. " - "Please use one of the supported artifact stores (S3 or GCP) " - "or specify a ZenML secret in the model deployer " - "configuration that holds the credentials required to access " - "the model artifacts. The KServe model deployer will " - "assume an implicit form of authentication is available " - "in the target Kubernetes cluster, but the served model " - "may not be able to access the model artifacts." - ) - - return None - - def create_or_update_k8s_secret( - self, - name: Optional[str] = None, - secret_values: Dict[str, Any] = {}, - annotations: Dict[str, str] = {}, - ) -> str: - """Create or update a Kubernetes Secret resource. - - Args: - name: the name of the Secret resource to create. If not - specified, a random name will be generated. - secret_values: secret key-values that should be - stored in the Secret resource. - annotations: optional annotations to add to the Secret resource. - - Returns: - The name of the created Secret resource. - - Raises: - RuntimeError: if an unknown error occurs during the creation of - the secret. - """ - name = name or f"zenml-kserve-{uuid4().hex}" - - try: - logger.debug(f"Creating Secret resource: {name}") - - core_api = k8s_client.CoreV1Api() - - secret_data = { - k: base64.b64encode(str(v).encode("utf-8")).decode("ascii") - for k, v in secret_values.items() - if v is not None - } - - secret = k8s_client.V1Secret( - metadata=k8s_client.V1ObjectMeta( - name=name, - labels={"app": "zenml"}, - annotations=annotations, - ), - type="Opaque", - data=secret_data, - ) - - try: - # check if the secret is already present - core_api.read_namespaced_secret( - name=name, - namespace=self.config.kubernetes_namespace, - ) - # if we got this far, the secret is already present, update it - # in place - response = core_api.replace_namespaced_secret( - name=name, - namespace=self.config.kubernetes_namespace, - body=secret, - ) - except k8s_client.rest.ApiException as e: - if e.status != 404: - # if an error other than 404 is raised here, treat it - # as an unexpected error - raise RuntimeError( - "Exception when reading Secret resource: %s", str(e) - ) - response = core_api.create_namespaced_secret( - namespace=self.config.kubernetes_namespace, - body=secret, - ) - logger.debug("Kubernetes API response: %s", response) - except k8s_client.rest.ApiException as e: - raise RuntimeError( - "Exception when creating Secret resource %s", str(e) - ) - - return name - - def delete_k8s_secret( - self, - name: str, - ) -> None: - """Delete a Kubernetes Secret resource managed by ZenML. - - Args: - name: the name of the Kubernetes Secret resource to delete. - - Raises: - RuntimeError: if an unknown error occurs during the removal - of the secret. - """ - try: - logger.debug(f"Deleting Secret resource: {name}") - - core_api = k8s_client.CoreV1Api() - - response = core_api.delete_namespaced_secret( - name=name, - namespace=self.config.kubernetes_namespace, - ) - logger.debug("Kubernetes API response: %s", response) - except k8s_client.rest.ApiException as e: - if e.status == 404: - # the secret is no longer present, nothing to do - return - raise RuntimeError( - f"Exception when deleting Secret resource {name}: {e}" - ) - - def create_or_update_k8s_service_account( - self, name: Optional[str] = None, secret_name: Optional[str] = None - ) -> str: - """Create or update a Kubernetes ServiceAccount resource with a secret managed by ZenML. - - Args: - name: the name of the ServiceAccount resource to create. If not - specified, a random name will be generated. - secret_name: the name of a secret to attach to the ServiceAccount. - - Returns: - The name of the created ServiceAccount resource. - - Raises: - RuntimeError: if an unknown error occurs during the creation of - the service account. - """ - name = name or f"zenml-kserve-{uuid4().hex}" - - service_account = k8s_client.V1ServiceAccount( - metadata=k8s_client.V1ObjectMeta( - name=name, - ), - ) - - if secret_name: - service_account.secrets = [ - k8s_client.V1ObjectReference(kind="Secret", name=secret_name) - ] - - core_api = k8s_client.CoreV1Api() - - try: - # check if the service account is already present - core_api.read_namespaced_service_account( - name=name, - namespace=self.config.kubernetes_namespace, - ) - # if we got this far, the service account is already present, update - # it in place - core_api.replace_namespaced_service_account( - name=name, - namespace=self.config.kubernetes_namespace, - body=service_account, - ) - except k8s_client.rest.ApiException as e: - if e.status != 404: - # if an error other than 404 is raised here, treat it - # as an unexpected error - raise RuntimeError( - "Exception when reading ServiceAccount resource: %s", - str(e), - ) - core_api.create_namespaced_service_account( - namespace=self.config.kubernetes_namespace, - body=service_account, - ) - - return name - - def delete_k8s_service_account( - self, - name: str, - ) -> None: - """Delete a Kubernetes ServiceAccount resource managed by ZenML. - - Args: - name: the name of the Kubernetes ServiceAccount resource to delete. - - Raises: - RuntimeError: if an unknown error occurs during the removal - of the service account. - """ - try: - logger.debug(f"Deleting ServiceAccount resource: {name}") - - core_api = k8s_client.CoreV1Api() - - response = core_api.delete_namespaced_service_account( - name=name, - namespace=self.config.kubernetes_namespace, - ) - logger.debug("Kubernetes API response: %s", response) - except k8s_client.rest.ApiException as e: - if e.status == 404: - # the service account is no longer present, nothing to do - return - raise RuntimeError( - f"Exception when deleting ServiceAccount resource {name}: {e}" - ) diff --git a/src/zenml/integrations/kserve/secret_schemas/__init__.py b/src/zenml/integrations/kserve/secret_schemas/__init__.py deleted file mode 100644 index b5d29156c6..0000000000 --- a/src/zenml/integrations/kserve/secret_schemas/__init__.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Initialization of Kserve Secret Schemas. - -These are secret schemas that can be used to authenticate Kserve to the -Artifact Store used to store served ML models. -""" -from zenml.integrations.kserve.secret_schemas.secret_schemas import ( - KServeAzureSecretSchema, - KServeGSSecretSchema, - KServeS3SecretSchema, -) - -__all__ = [ - "KServeAzureSecretSchema", - "KServeGSSecretSchema", - "KServeS3SecretSchema", -] diff --git a/src/zenml/integrations/kserve/secret_schemas/secret_schemas.py b/src/zenml/integrations/kserve/secret_schemas/secret_schemas.py deleted file mode 100644 index 2ef28b95b0..0000000000 --- a/src/zenml/integrations/kserve/secret_schemas/secret_schemas.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Implementation for KServe secret schemas.""" - -from typing import Optional - -from zenml.secret.base_secret import BaseSecretSchema - - -class KServeS3SecretSchema(BaseSecretSchema): - """KServe S3 credentials. - - Attributes: - aws_access_key_id: the AWS access key ID. - aws_secret_access_key: the AWS secret access key. - s3_endpoint: the S3 endpoint. - s3_region: the S3 region. - s3_use_https: whether to use HTTPS. - s3_verify_ssl: whether to verify SSL. - """ - - aws_access_key_id: Optional[str] = None - aws_secret_access_key: Optional[str] = None - s3_endpoint: Optional[str] = None - s3_region: Optional[str] = None - s3_use_https: Optional[str] = None - s3_verify_ssl: Optional[str] = None - - -class KServeGSSecretSchema(BaseSecretSchema): - """KServe GCS credentials. - - Attributes: - google_application_credentials: the GCP application credentials to use, - in JSON format. - """ - - google_application_credentials: Optional[str] - - -class KServeAzureSecretSchema(BaseSecretSchema): - """KServe Azure Blob Storage credentials. - - Attributes: - azure_client_id: the Azure client ID. - azure_client_secret: the Azure client secret. - azure_tenant_id: the Azure tenant ID. - azure_subscription_id: the Azure subscription ID. - """ - - azure_client_id: Optional[str] = None - azure_client_secret: Optional[str] = None - azure_tenant_id: Optional[str] = None - azure_subscription_id: Optional[str] = None diff --git a/src/zenml/integrations/kserve/services/__init__.py b/src/zenml/integrations/kserve/services/__init__.py deleted file mode 100644 index ce74e315f3..0000000000 --- a/src/zenml/integrations/kserve/services/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Initialization for KServe services.""" - -from zenml.integrations.kserve.services.kserve_deployment import ( # noqa - KServeDeploymentConfig, - KServeDeploymentService, -) diff --git a/src/zenml/integrations/kserve/services/kserve_deployment.py b/src/zenml/integrations/kserve/services/kserve_deployment.py deleted file mode 100644 index 65ce3ceeab..0000000000 --- a/src/zenml/integrations/kserve/services/kserve_deployment.py +++ /dev/null @@ -1,596 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Implementation for the KServe inference service.""" - -import json -import os -import re -from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Tuple, cast -from uuid import UUID - -import requests -from kserve import ( - KServeClient, - V1beta1InferenceService, - V1beta1InferenceServiceSpec, - V1beta1PredictorExtensionSpec, - V1beta1PredictorSpec, - constants, -) -from kubernetes import client as k8s_client -from pydantic import Field, ValidationError - -from zenml import __version__ -from zenml.logger import get_logger -from zenml.services import ( - ServiceConfig, - ServiceState, - ServiceStatus, - ServiceType, -) -from zenml.services.service import BaseDeploymentService - -if TYPE_CHECKING: - from zenml.integrations.kserve.model_deployers.kserve_model_deployer import ( # noqa - KServeModelDeployer, - ) - -logger = get_logger(__name__) - - -class KServeDeploymentConfig(ServiceConfig): - """KServe deployment service configuration. - - Attributes: - model_uri: URI of the model (or models) to serve. - model_name: the name of the model. Multiple versions of the same model - should use the same model name. Model name must use only lowercase - alphanumeric characters and dashes. - secret_name: the name of the ZenML secret containing credentials - required to authenticate to the artifact store. - k8s_secret: the name of the Kubernetes secret to use for the prediction - service. - k8s_service_account: the name of the Kubernetes service account to use - for the prediction service. - predictor: the KServe predictor used to serve the model. The - predictor type can be one of the following: `tensorflow`, `pytorch`, - `sklearn`, `xgboost`, `custom`. - replicas: number of replicas to use for the prediction service. - resources: the Kubernetes resources to allocate for the prediction service. - container: the container to use for the custom prediction services. - """ - - model_uri: str = "" - model_name: str - secret_name: Optional[str] = None - k8s_secret: Optional[str] = None - k8s_service_account: Optional[str] = None - predictor: str - replicas: int = 1 - container: Optional[Dict[str, Any]] = None - resources: Optional[Dict[str, Any]] = None - - @staticmethod - def sanitize_labels(labels: Dict[str, str]) -> None: - """Update the label values to be valid Kubernetes labels. - - See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set - - Args: - labels: The labels to sanitize. - """ - # TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils. - for key, value in labels.items(): - # Kubernetes labels must be alphanumeric, no longer than - # 63 characters, and must begin and end with an alphanumeric - # character ([a-z0-9A-Z]) - labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip( - "-_." - ) - - def get_kubernetes_labels(self) -> Dict[str, str]: - """Generate the labels for the KServe inference CRD from the service configuration. - - These labels are attached to the KServe inference service CRD - and may be used as label selectors in lookup operations. - - Returns: - The labels for the KServe inference service CRD. - """ - labels = {"app": "zenml"} - if self.pipeline_name: - labels["zenml.pipeline_name"] = self.pipeline_name - if self.run_name: - labels["zenml.run_name"] = self.run_name - if self.pipeline_step_name: - labels["zenml.pipeline_step_name"] = self.pipeline_step_name - if self.model_name: - labels["zenml.model_name"] = self.model_name - if self.model_uri: - labels["zenml.model_uri"] = self.model_uri - if self.predictor: - labels["zenml.model_type"] = self.predictor - self.sanitize_labels(labels) - return labels - - def get_kubernetes_annotations(self) -> Dict[str, str]: - """Generate the annotations for the KServe inference CRD the service configuration. - - The annotations are used to store additional information about the - KServe ZenML service associated with the deployment that is - not available on the labels. One annotation is particularly important - is the serialized Service configuration itself, which is used to - recreate the service configuration from a remote KServe inference - service CRD. - - Returns: - The annotations for the KServe inference service CRD. - """ - annotations = { - "zenml.service_config": self.json(), - "zenml.version": __version__, - } - return annotations - - @classmethod - def create_from_deployment( - cls, deployment: V1beta1InferenceService - ) -> "KServeDeploymentConfig": - """Recreate a KServe service from a KServe deployment resource. - - Args: - deployment: the KServe inference service CRD. - - Returns: - The KServe ZenML service configuration corresponding to the given - KServe inference service CRD. - - Raises: - ValueError: if the given deployment resource does not contain - the expected annotations or it contains an invalid or - incompatible KServe ZenML service configuration. - """ - config_data = deployment.metadata.get("annotations").get( - "zenml.service_config" - ) - if not config_data: - raise ValueError( - f"The given deployment resource does not contain a " - f"'zenml.service_config' annotation: {deployment}" - ) - try: - service_config = cls.parse_raw(config_data) - except ValidationError as e: - raise ValueError( - f"The loaded KServe Inference Service resource contains an " - f"invalid or incompatible KServe ZenML service configuration: " - f"{config_data}" - ) from e - return service_config - - -class KServeDeploymentService(BaseDeploymentService): - """A ZenML service that represents a KServe inference service CRD. - - Attributes: - config: service configuration. - status: service status. - """ - - SERVICE_TYPE = ServiceType( - name="kserve-deployment", - type="model-serving", - flavor="kserve", - description="KServe inference service", - ) - - config: KServeDeploymentConfig - status: ServiceStatus = Field(default_factory=lambda: ServiceStatus()) - - def _get_model_deployer(self) -> "KServeModelDeployer": - """Get the active KServe model deployer. - - Returns: - The active KServeModelDeployer. - """ - from zenml.integrations.kserve.model_deployers.kserve_model_deployer import ( - KServeModelDeployer, - ) - - return cast( - KServeModelDeployer, - KServeModelDeployer.get_active_model_deployer(), - ) - - def _get_client(self) -> KServeClient: - """Get the KServe client from the active KServe model deployer. - - Returns: - The KServe client. - """ - return self._get_model_deployer().kserve_client - - def _get_namespace(self) -> Optional[str]: - """Get the Kubernetes namespace from the active KServe model deployer. - - Returns: - The Kubernetes namespace, or None, if the default namespace is - used. - """ - return self._get_model_deployer().config.kubernetes_namespace - - def check_status(self) -> Tuple[ServiceState, str]: - """Check the state of the KServe inference service. - - This method Checks the current operational state of the external KServe - inference service and translate it into a `ServiceState` value and a printable message. - - This method should be overridden by subclasses that implement concrete service tracking functionality. - - Returns: - The operational state of the external service and a message - providing additional information about that state (e.g. a - description of the error if one is encountered while checking the - service status). - """ - client = self._get_client() - namespace = self._get_namespace() - - name = self.crd_name - try: - deployment = client.get(name=name, namespace=namespace) - except RuntimeError: - return (ServiceState.INACTIVE, "") - - # TODO[MEDIUM]: Implement better operational status checking that also - # cover errors - if "status" not in deployment: - return (ServiceState.INACTIVE, "No operational status available") - status = "Unknown" - for condition in deployment["status"].get("conditions", {}): - if condition.get("type", "") == "PredictorReady": - status = condition.get("status", "Unknown") - if status.lower() == "true": - return ( - ServiceState.ACTIVE, - f"Inference service '{name}' is available", - ) - - elif status.lower() == "false": - return ( - ServiceState.PENDING_STARTUP, - f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}", - ) - return ( - ServiceState.PENDING_STARTUP, - f"Inference service '{name}' still starting up", - ) - - @property - def crd_name(self) -> str: - """Get the name of the KServe inference service CRD that uniquely corresponds to this service instance. - - Returns: - The name of the KServe inference service CRD. - """ - return ( - self._get_kubernetes_labels().get("zenml.model_name") - or f"zenml-{str(self.uuid)[:8]}" - ) - - def _get_kubernetes_labels(self) -> Dict[str, str]: - """Generate the labels for the KServe inference service CRD from the service configuration. - - Returns: - The labels for the KServe inference service. - """ - labels = self.config.get_kubernetes_labels() - labels["zenml.service_uuid"] = str(self.uuid) - KServeDeploymentConfig.sanitize_labels(labels) - return labels - - @classmethod - def create_from_deployment( - cls, deployment: V1beta1InferenceService - ) -> "KServeDeploymentService": - """Recreate the configuration of a KServe Service from a deployed instance. - - Args: - deployment: the KServe deployment resource. - - Returns: - The KServe service configuration corresponding to the given - KServe deployment resource. - - Raises: - ValueError: if the given deployment resource does not contain - the expected annotations or it contains an invalid or - incompatible KServe service configuration. - """ - config = KServeDeploymentConfig.create_from_deployment(deployment) - uuid = deployment.metadata.get("labels").get("zenml.service_uuid") - if not uuid: - raise ValueError( - f"The given deployment resource does not contain a valid " - f"'zenml.service_uuid' label: {deployment}" - ) - service = cls(uuid=UUID(uuid), config=config) - service.update_status() - return service - - def provision(self) -> None: - """Provision or update remote KServe deployment instance. - - This should then match the current configuration. - """ - client = self._get_client() - namespace = self._get_namespace() - - api_version = constants.KSERVE_GROUP + "/" + "v1beta1" - name = self.crd_name - - # All supported model specs seem to have the same fields - # so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec) - if self.config.container is not None: - predictor_kwargs = { - "containers": [ - k8s_client.V1Container( - name=self.config.container.get("name"), - image=self.config.container.get("image"), - command=self.config.container.get("command"), - args=self.config.container.get("args"), - env=[ - k8s_client.V1EnvVar( - name="STORAGE_URI", - value=self.config.container.get("storage_uri"), - ) - ], - ) - ], - "service_account_name": self.config.k8s_service_account, - } - else: - predictor_kwargs = { - self.config.predictor: V1beta1PredictorExtensionSpec( - storage_uri=self.config.model_uri, - resources=self.config.resources, - ), - "service_account_name": self.config.k8s_service_account, - } - - isvc = V1beta1InferenceService( - api_version=api_version, - kind=constants.KSERVE_KIND, - metadata=k8s_client.V1ObjectMeta( - name=name, - namespace=namespace, - labels=self._get_kubernetes_labels(), - annotations=self.config.get_kubernetes_annotations(), - ), - spec=V1beta1InferenceServiceSpec( - predictor=V1beta1PredictorSpec(**predictor_kwargs) - ), - ) - - # TODO[HIGH]: better error handling when provisioning KServe instances - try: - client.get(name=name, namespace=namespace) - # update the existing deployment - client.replace(name, isvc, namespace=namespace) - except RuntimeError: - client.create(isvc) - - def deprovision(self, force: bool = False) -> None: - """Deprovisions all resources used by the service. - - Args: - force: if True, the service will be deprovisioned even if it is - still in use. - - Raises: - ValueError: if the service is still in use and force is False. - """ - client = self._get_client() - namespace = self._get_namespace() - name = self.crd_name - - # TODO[HIGH]: catch errors if deleting a KServe instance that is no - # longer available - try: - client.delete(name=name, namespace=namespace) - except RuntimeError: - raise ValueError( - f"Could not delete KServe instance '{name}' from namespace: '{namespace}'." - ) - - def _get_deployment_logs( - self, - name: str, - follow: bool = False, - tail: Optional[int] = None, - ) -> Generator[str, bool, None]: - """Get the logs of a KServe deployment resource. - - Args: - name: the name of the KServe deployment to get logs for. - follow: if True, the logs will be streamed as they are written - tail: only retrieve the last NUM lines of log output. - - Returns: - A generator that can be accessed to get the service logs. - - Raises: - Exception: if an unknown error occurs while fetching the logs. - - Yields: - The logs of the given deployment. - """ - client = self._get_client() - namespace = self._get_namespace() - - logger.debug(f"Retrieving logs for InferenceService resource: {name}") - try: - response = client.core_api.list_namespaced_pod( - namespace=namespace, - label_selector=f"zenml.service_uuid={self.uuid}", - ) - logger.debug("Kubernetes API response: %s", response) - pods = response.items - if not pods: - raise Exception( - f"The KServe deployment {name} is not currently " - f"running: no Kubernetes pods associated with it were found" - ) - pod = pods[0] - pod_name = pod.metadata.name - - containers = [c.name for c in pod.spec.containers] - init_containers = [c.name for c in pod.spec.init_containers] - container_statuses = { - c.name: c.started or c.restart_count - for c in pod.status.container_statuses - } - - container = "default" - if container not in containers: - container = containers[0] - - if not container_statuses[container]: - container = init_containers[0] - - logger.info( - f"Retrieving logs for pod: `{pod_name}` and container " - f"`{container}` in namespace `{namespace}`" - ) - response = client.core_api.read_namespaced_pod_log( - name=pod_name, - namespace=namespace, - container=container, - follow=follow, - tail_lines=tail, - _preload_content=False, - ) - except k8s_client.rest.ApiException as e: - logger.error( - "Exception when fetching logs for InferenceService resource " - "%s: %s", - name, - str(e), - ) - raise Exception( - f"Unexpected exception when fetching logs for InferenceService " - f"resource: {name}" - ) from e - - try: - while True: - line = response.readline().decode("utf-8").rstrip("\n") - if not line: - return - stop = yield line - if stop: - return - finally: - response.release_conn() - - def get_logs( - self, follow: bool = False, tail: Optional[int] = None - ) -> Generator[str, bool, None]: - """Retrieve the logs from the remote KServe inference service instance. - - Args: - follow: if True, the logs will be streamed as they are written. - tail: only retrieve the last NUM lines of log output. - - Returns: - A generator that can be accessed to get the service logs. - """ - return self._get_deployment_logs( - self.crd_name, - follow=follow, - tail=tail, - ) - - @property - def prediction_url(self) -> Optional[str]: - """The prediction URI exposed by the prediction service. - - Returns: - The prediction URI exposed by the prediction service, or None if - the service is not yet ready. - """ - if not self.is_running: - return None - - model_deployer = self._get_model_deployer() - return os.path.join( - model_deployer.config.base_url, - "v1/models", - f"{self.crd_name}:predict", - ) - - @property - def prediction_hostname(self) -> Optional[str]: - """The prediction hostname exposed by the prediction service. - - Returns: - The prediction hostname exposed by the prediction service status - that will be used in the headers of the prediction request. - """ - if not self.is_running: - return None - - namespace = self._get_namespace() - - model_deployer = self._get_model_deployer() - custom_domain = model_deployer.config.custom_domain or "example.com" - return f"{self.crd_name}.{namespace}.{custom_domain}" - - def predict(self, request: str) -> Any: - """Make a prediction using the service. - - Args: - request: a NumPy array representing the request - - Returns: - A NumPy array represents the prediction returned by the service. - - Raises: - Exception: if the service is not yet ready. - ValueError: if the prediction_url is not set. - """ - if not self.is_running: - raise Exception( - "KServe prediction service is not running. " - "Please start the service before making predictions." - ) - - if self.prediction_url is None: - raise ValueError("`self.prediction_url` is not set, cannot post.") - if self.prediction_hostname is None: - raise ValueError( - "`self.prediction_hostname` is not set, cannot post." - ) - headers = {"Host": self.prediction_hostname} - if isinstance(request, str): - request = json.loads(request) - else: - raise ValueError("Request must be a json string.") - response = requests.post( # nosec - self.prediction_url, - headers=headers, - json={"instances": request}, - ) - response.raise_for_status() - return response.json() diff --git a/src/zenml/integrations/kserve/steps/__init__.py b/src/zenml/integrations/kserve/steps/__init__.py deleted file mode 100644 index 8570f0769e..0000000000 --- a/src/zenml/integrations/kserve/steps/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Initialization for KServe steps.""" - -from zenml.integrations.kserve.steps.kserve_deployer import ( - CustomDeployParameters, - KServeDeployerStepParameters, - TorchServeParameters, - kserve_custom_model_deployer_step, - kserve_model_deployer_step, -) diff --git a/src/zenml/integrations/kserve/steps/kserve_deployer.py b/src/zenml/integrations/kserve/steps/kserve_deployer.py deleted file mode 100644 index 43ab47d8d7..0000000000 --- a/src/zenml/integrations/kserve/steps/kserve_deployer.py +++ /dev/null @@ -1,473 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Implementation of the KServe Deployer step.""" - -import os -from typing import List, Optional, cast - -from pydantic import BaseModel, validator - -from zenml import get_step_context -from zenml.artifacts.unmaterialized_artifact import UnmaterializedArtifact -from zenml.artifacts.utils import save_model_metadata -from zenml.client import Client -from zenml.constants import MODEL_METADATA_YAML_FILE_NAME -from zenml.exceptions import DoesNotExistException -from zenml.integrations.kserve.constants import ( - KSERVE_CUSTOM_DEPLOYMENT, - KSERVE_DOCKER_IMAGE_KEY, -) -from zenml.integrations.kserve.model_deployers.kserve_model_deployer import ( - DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT, - KServeModelDeployer, -) -from zenml.integrations.kserve.services.kserve_deployment import ( - KServeDeploymentConfig, - KServeDeploymentService, -) -from zenml.io import fileio -from zenml.logger import get_logger -from zenml.steps import ( - BaseParameters, - StepContext, - step, -) -from zenml.utils import io_utils, source_utils - -logger = get_logger(__name__) - -TORCH_HANDLERS = [ - "image_classifier", - "image_segmenter", - "object_detector", - "text_classifier", -] - - -class TorchServeParameters(BaseModel): - """KServe PyTorch model deployer step configuration. - - Attributes: - model_class: Path to Python file containing model architecture. - handler: TorchServe's handler file to handle custom TorchServe inference - logic. - extra_files: Comma separated path to extra dependency files. - model_version: Model version. - requirements_file: Path to requirements file. - torch_config: TorchServe configuration file path. - """ - - model_class: str - handler: str - extra_files: Optional[List[str]] = None - requirements_file: Optional[str] = None - model_version: Optional[str] = "1.0" - torch_config: Optional[str] = None - - @validator("model_class") - def model_class_validate(cls, v: str) -> str: - """Validate model class file path. - - Args: - v: model class file path - - Returns: - model class file path - - Raises: - ValueError: if model class file path is not valid - """ - if not v: - raise ValueError("Model class file path is required.") - if not Client.is_inside_repository(v): - raise ValueError( - "Model class file path must be inside the repository." - ) - return v - - @validator("handler") - def handler_validate(cls, v: str) -> str: - """Validate handler. - - Args: - v: handler file path - - Returns: - handler file path - - Raises: - ValueError: if handler file path is not valid - """ - if v: - if v in TORCH_HANDLERS: - return v - elif Client.is_inside_repository(v): - return v - else: - raise ValueError( - "Handler must be one of the TorchServe handlers", - "or a file that exists inside the repository.", - ) - else: - raise ValueError("Handler is required.") - - @validator("extra_files") - def extra_files_validate( - cls, v: Optional[List[str]] - ) -> Optional[List[str]]: - """Validate extra files. - - Args: - v: extra files path - - Returns: - extra files path - - Raises: - ValueError: if the extra files path is not valid - """ - extra_files = [] - if v is not None: - for file_path in v: - if Client.is_inside_repository(file_path): - extra_files.append(file_path) - else: - raise ValueError( - "Extra file path must be inside the repository." - ) - return extra_files - return v - - @validator("torch_config") - def torch_config_validate(cls, v: Optional[str]) -> Optional[str]: - """Validate torch config file. - - Args: - v: torch config file path - - Returns: - torch config file path - - Raises: - ValueError: if torch config file path is not valid. - """ - if v: - if Client.is_inside_repository(v): - return v - else: - raise ValueError( - "Torch config file path must be inside the repository." - ) - return v - - -class CustomDeployParameters(BaseModel): - """Custom model deployer step extra parameters. - - Attributes: - predict_function: Path to Python file containing predict function. - """ - - predict_function: str - - @validator("predict_function") - def predict_function_validate(cls, predict_func_path: str) -> str: - """Validate predict function. - - Args: - predict_func_path: predict function path - - Returns: - predict function path - - Raises: - ValueError: if predict function path is not valid - TypeError: if predict function path is not a callable function - """ - try: - predict_function = source_utils.load(predict_func_path) - except AttributeError: - raise ValueError("Predict function can't be found.") - if not callable(predict_function): - raise TypeError("Predict function must be callable.") - return predict_func_path - - -class KServeDeployerStepParameters(BaseParameters): - """KServe model deployer step parameters. - - Attributes: - service_config: KServe deployment service configuration. - torch_serve_params: TorchServe set of parameters to deploy model. - timeout: Timeout for model deployment. - """ - - service_config: KServeDeploymentConfig - custom_deploy_parameters: Optional[CustomDeployParameters] = None - torch_serve_parameters: Optional[TorchServeParameters] = None - timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT - - -@step(enable_cache=False) -def kserve_model_deployer_step( - deploy_decision: bool, - params: KServeDeployerStepParameters, - context: StepContext, - model: UnmaterializedArtifact, -) -> KServeDeploymentService: - """KServe model deployer pipeline step. - - This step can be used in a pipeline to implement continuous - deployment for an ML model with KServe. - - Args: - deploy_decision: whether to deploy the model or not - params: parameters for the deployer step - model: the model artifact to deploy - context: the step context - - Returns: - KServe deployment service - """ - model_deployer = cast( - KServeModelDeployer, KServeModelDeployer.get_active_model_deployer() - ) - - # get pipeline name, step name and run id - step_context = get_step_context() - pipeline_name = step_context.pipeline.name - run_name = step_context.pipeline_run.name - step_name = step_context.step_run.name - - # update the step configuration with the real pipeline runtime information - params.service_config.pipeline_name = pipeline_name - params.service_config.run_name = run_name - params.service_config.pipeline_step_name = step_name - - # fetch existing services with same pipeline name, step name and - # model name - existing_services = model_deployer.find_model_server( - pipeline_name=pipeline_name, - pipeline_step_name=step_name, - model_name=params.service_config.model_name, - ) - - # even when the deploy decision is negative if an existing model server - # is not running for this pipeline/step, we still have to serve the - # current model, to ensure that a model server is available at all times - if not deploy_decision and existing_services: - logger.info( - f"Skipping model deployment because the model quality does not " - f"meet the criteria. Reusing the last model server deployed by step " - f"'{step_name}' and pipeline '{pipeline_name}' for model " - f"'{params.service_config.model_name}'..." - ) - service = cast(KServeDeploymentService, existing_services[0]) - # even when the deploy decision is negative, we still need to start - # the previous model server if it is no longer running, to ensure that - # a model server is available at all times - if not service.is_running: - service.start(timeout=params.timeout) - return service - - # invoke the KServe model deployer to create a new service - # or update an existing one that was previously deployed for the same - # model - if params.service_config.predictor == "pytorch": - # import the prepare function from the step utils - from zenml.integrations.kserve.steps.kserve_step_utils import ( - prepare_torch_service_config, - ) - - # prepare the service config - service_config = prepare_torch_service_config( - model_uri=model.uri, - output_artifact_uri=context.get_output_artifact_uri(), - params=params, - ) - else: - # import the prepare function from the step utils - from zenml.integrations.kserve.steps.kserve_step_utils import ( - prepare_service_config, - ) - - # prepare the service config - service_config = prepare_service_config( - model_uri=model.uri, - output_artifact_uri=context.get_output_artifact_uri(), - params=params, - ) - service = cast( - KServeDeploymentService, - model_deployer.deploy_model( - service_config, replace=True, timeout=params.timeout - ), - ) - - logger.info( - f"KServe deployment service started and reachable at:\n" - f" {service.prediction_url}\n" - f" With the hostname: {service.prediction_hostname}." - ) - - return service - - -@step(enable_cache=False, extra={KSERVE_CUSTOM_DEPLOYMENT: True}) -def kserve_custom_model_deployer_step( - deploy_decision: bool, - params: KServeDeployerStepParameters, - context: StepContext, - model: UnmaterializedArtifact, -) -> KServeDeploymentService: - """KServe custom model deployer pipeline step. - - This step can be used in a pipeline to implement the - process required to deploy a custom model with KServe. - - Args: - deploy_decision: whether to deploy the model or not - params: parameters for the deployer step - model: the model artifact to deploy - context: the step context - - Raises: - ValueError: if the custom deployer parameters is not defined - DoesNotExistException: if no active stack is found - RuntimeError: if the build is missing for the pipeline run - - - Returns: - KServe deployment service - """ - # verify that a custom deployer is defined - if not params.custom_deploy_parameters: - raise ValueError( - "Custom deploy parameter which contains the path of the ", - "custom predict function is required for custom model deployment.", - ) - - # get the active model deployer - model_deployer = cast( - KServeModelDeployer, KServeModelDeployer.get_active_model_deployer() - ) - - # get pipeline name, step name, run id - step_context = get_step_context() - pipeline_name = step_context.pipeline.name - run_name = step_context.pipeline_run.name - step_name = step_context.step_run.name - - # update the step configuration with the real pipeline runtime information - params.service_config.pipeline_name = pipeline_name - params.service_config.run_name = run_name - params.service_config.pipeline_step_name = step_name - - # fetch existing services with same pipeline name, step name and - # model name - existing_services = model_deployer.find_model_server( - pipeline_name=pipeline_name, - pipeline_step_name=step_name, - model_name=params.service_config.model_name, - ) - - # even when the deploy decision is negative if an existing model server - # is not running for this pipeline/step, we still have to serve the - # current model, to ensure that a model server is available at all times - if not deploy_decision and existing_services: - logger.info( - f"Skipping model deployment because the model quality does not " - f"meet the criteria. Reusing the last model server deployed by step " - f"'{step_name}' and pipeline '{pipeline_name}' for model " - f"'{params.service_config.model_name}'..." - ) - service = cast(KServeDeploymentService, existing_services[0]) - # even when the deploy decision is negative, we still need to start - # the previous model server if it is no longer running, to ensure that - # a model server is available at all times - if not service.is_running: - service.start(timeout=params.timeout) - return service - - # entrypoint for starting KServe server deployment for custom model - entrypoint_command = [ - "python", - "-m", - "zenml.integrations.kserve.custom_deployer.zenml_custom_model", - "--model_name", - params.service_config.model_name, - "--predict_func", - params.custom_deploy_parameters.predict_function, - ] - - # verify if there is an active stack before starting the service - if not Client().active_stack: - raise DoesNotExistException( - "No active stack is available. " - "Please make sure that you have registered and set a stack." - ) - - pipeline_run = step_context.pipeline_run - if not pipeline_run.build: - raise RuntimeError( - f"Missing build for run {pipeline_run.id}. This is probably " - "because the build was manually deleted." - ) - - image_name = pipeline_run.build.get_image( - component_key=KSERVE_DOCKER_IMAGE_KEY, step=step_name - ) - - # copy the model files to a new specific directory for the deployment - served_model_uri = os.path.join( - context.get_output_artifact_uri(), "kserve" - ) - fileio.makedirs(served_model_uri) - io_utils.copy_dir(model.uri, served_model_uri) - - # save the model artifact metadata to the YAML file and copy it to the - # deployment directory - model_metadata_file = save_model_metadata(model) - fileio.copy( - model_metadata_file, - os.path.join(served_model_uri, MODEL_METADATA_YAML_FILE_NAME), - ) - - # prepare the service configuration for the deployment - service_config = params.service_config.copy() - service_config.model_uri = served_model_uri - - # Prepare container config for custom model deployment - service_config.container = { - "name": service_config.model_name, - "image": image_name, - "command": entrypoint_command, - "storage_uri": service_config.model_uri, - } - - # deploy the service - service = cast( - KServeDeploymentService, - model_deployer.deploy_model( - service_config, replace=True, timeout=params.timeout - ), - ) - - logger.info( - f"KServe deployment service started and reachable at:\n" - f" {service.prediction_url}\n" - f" With the hostname: {service.prediction_hostname}." - ) - - return service diff --git a/src/zenml/integrations/kserve/steps/kserve_step_utils.py b/src/zenml/integrations/kserve/steps/kserve_step_utils.py deleted file mode 100644 index 359c3217a3..0000000000 --- a/src/zenml/integrations/kserve/steps/kserve_step_utils.py +++ /dev/null @@ -1,294 +0,0 @@ -# Copyright (c) ZenML GmbH 2022. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -# or implied. See the License for the specific language governing -# permissions and limitations under the License. -"""Utility functions used by the KServe deployer step.""" - -import os -import re -import tempfile -from typing import List, Optional - -from model_archiver.model_packaging import package_model -from model_archiver.model_packaging_utils import ModelExportUtils -from pydantic import BaseModel - -from zenml.exceptions import ValidationError -from zenml.integrations.kserve.services.kserve_deployment import ( - KServeDeploymentConfig, -) -from zenml.integrations.kserve.steps.kserve_deployer import ( - KServeDeployerStepParameters, -) -from zenml.io import fileio -from zenml.utils import io_utils - - -def is_valid_model_name(model_name: str) -> bool: - """Checks if the model name is valid. - - Args: - model_name: the model name to check - - Returns: - True if the model name is valid, False otherwise. - """ - pattern = re.compile("^[a-z0-9-]+$") - return pattern.match(model_name) is not None - - -def prepare_service_config( - model_uri: str, - output_artifact_uri: str, - params: KServeDeployerStepParameters, -) -> KServeDeploymentConfig: - """Prepare the model files for model serving. - - This function ensures that the model files are in the correct format - and file structure required by the KServe server implementation - used for model serving. - - Args: - model_uri: the URI of the model artifact being served - output_artifact_uri: the URI of the output artifact - params: the KServe deployer step parameters - - Returns: - The URL to the model is ready for serving. - - Raises: - RuntimeError: if the model files cannot be prepared. - ValidationError: if the model name is invalid. - """ - served_model_uri = os.path.join(output_artifact_uri, "kserve") - fileio.makedirs(served_model_uri) - - # TODO [ENG-773]: determine how to formalize how models are organized into - # folders and sub-folders depending on the model type/format and the - # KServe protocol used to serve the model. - - # TODO [ENG-791]: an auto-detect built-in KServe server implementation - # from the model artifact type - - # TODO [ENG-792]: validate the model artifact type against the - # supported built-in KServe server implementations - if params.service_config.predictor == "tensorflow": - # the TensorFlow server expects model artifacts to be - # stored in numbered subdirectories, each representing a model - # version - served_model_uri = os.path.join( - served_model_uri, - params.service_config.predictor, - params.service_config.model_name, - ) - fileio.makedirs(served_model_uri) - io_utils.copy_dir(model_uri, os.path.join(served_model_uri, "1")) - elif params.service_config.predictor == "sklearn": - # the sklearn server expects model artifacts to be - # stored in a file called model.joblib - model_uri = os.path.join(model_uri, "model") - if not fileio.exists(model_uri): - raise RuntimeError( - f"Expected sklearn model artifact was not found at " - f"{model_uri}" - ) - served_model_uri = os.path.join( - served_model_uri, - params.service_config.predictor, - params.service_config.model_name, - ) - fileio.makedirs(served_model_uri) - fileio.copy(model_uri, os.path.join(served_model_uri, "model.joblib")) - elif not is_valid_model_name(params.service_config.model_name): - raise ValidationError( - f"Model name '{params.service_config.model_name}' is invalid. " - f"The model name can only include lowercase alphanumeric " - "characters and hyphens. Please rename your model and try again." - ) - else: - # default treatment for all other server implementations is to - # simply reuse the model from the artifact store path where it - # is originally stored - served_model_uri = os.path.join( - served_model_uri, - params.service_config.predictor, - params.service_config.model_name, - ) - fileio.makedirs(served_model_uri) - fileio.copy(model_uri, served_model_uri) - - service_config = params.service_config.copy() - service_config.model_uri = served_model_uri - return service_config - - -def prepare_torch_service_config( - model_uri: str, - output_artifact_uri: str, - params: KServeDeployerStepParameters, -) -> KServeDeploymentConfig: - """Prepare the PyTorch model files for model serving. - - This function ensures that the model files are in the correct format - and file structure required by the KServe server implementation - used for model serving. - - Args: - model_uri: the URI of the model artifact being served - output_artifact_uri: the URI of the output artifact - params: the KServe deployer step parameters - - Returns: - The URL to the model is ready for serving. - - Raises: - RuntimeError: if the model files cannot be prepared. - """ - deployment_folder_uri = os.path.join(output_artifact_uri, "kserve") - served_model_uri = os.path.join(deployment_folder_uri, "model-store") - config_properties_uri = os.path.join(deployment_folder_uri, "config") - fileio.makedirs(served_model_uri) - fileio.makedirs(config_properties_uri) - - if params.torch_serve_parameters is None: - raise RuntimeError("No torch serve parameters provided") - else: - # Create a temporary folder - temp_dir = tempfile.mkdtemp(prefix="zenml-pytorch-temp-") - tmp_model_uri = os.path.join( - str(temp_dir), f"{params.service_config.model_name}.pt" - ) - - # Copy from artifact store to temporary file - fileio.copy(f"{model_uri}/checkpoint.pt", tmp_model_uri) - - torch_archiver_args = TorchModelArchiver( - model_name=params.service_config.model_name, - serialized_file=tmp_model_uri, - model_file=params.torch_serve_parameters.model_class, - handler=params.torch_serve_parameters.handler, - export_path=temp_dir, - version=params.torch_serve_parameters.model_version, - ) - - manifest = ModelExportUtils.generate_manifest_json(torch_archiver_args) - package_model(torch_archiver_args, manifest=manifest) - - # Copy from temporary file to artifact store - archived_model_uri = os.path.join( - temp_dir, f"{params.service_config.model_name}.mar" - ) - if not fileio.exists(archived_model_uri): - raise RuntimeError( - f"Expected torch archived model artifact was not found at " - f"{archived_model_uri}" - ) - - # Copy the torch model archive artifact to the model store - fileio.copy( - archived_model_uri, - os.path.join( - served_model_uri, f"{params.service_config.model_name}.mar" - ), - ) - - # Get or Generate the config file - if params.torch_serve_parameters.torch_config: - # Copy the torch model config to the model store - fileio.copy( - params.torch_serve_parameters.torch_config, - os.path.join(config_properties_uri, "config.properties"), - ) - else: - # Generate the config file - config_file_uri = generate_model_deployer_config( - model_name=params.service_config.model_name, - directory=temp_dir, - ) - # Copy the torch model config to the model store - fileio.copy( - config_file_uri, - os.path.join(config_properties_uri, "config.properties"), - ) - - service_config = params.service_config.copy() - service_config.model_uri = deployment_folder_uri - return service_config - - -class TorchModelArchiver(BaseModel): - """Model Archiver for PyTorch models. - - Attributes: - model_name: Model name. - model_version: Model version. - serialized_file: Serialized model file. - handler: TorchServe's handler file to handle custom TorchServe inference logic. - extra_files: Comma separated path to extra dependency files. - requirements_file: Path to requirements file. - export_path: Path to export model. - runtime: Runtime of the model. - force: Force export of the model. - archive_format: Archive format. - """ - - model_name: str - serialized_file: str - model_file: str - handler: str - export_path: str - extra_files: Optional[List[str]] = None - version: Optional[str] = None - requirements_file: Optional[str] = None - runtime: Optional[str] = "python" - force: Optional[bool] = None - archive_format: Optional[str] = "default" - - -def generate_model_deployer_config( - model_name: str, - directory: str, -) -> str: - """Generate a model deployer config. - - Args: - model_name: the name of the model - directory: the directory where the model is stored - - Returns: - None - """ - config_lines = [ - "inference_address=http://0.0.0.0:8085", - "management_address=http://0.0.0.0:8085", - "metrics_address=http://0.0.0.0:8082", - "grpc_inference_port=7070", - "grpc_management_port=7071", - "enable_metrics_api=true", - "metrics_format=prometheus", - "number_of_netty_threads=4", - "job_queue_size=10", - "enable_envvars_config=true", - "install_py_dep_per_model=true", - "model_store=/mnt/models/model-store", - ] - - with tempfile.NamedTemporaryFile( - suffix=".properties", mode="w+", dir=directory, delete=False - ) as f: - for line in config_lines: - f.write(line + "\n") - f.write( - f'model_snapshot={{"name":"startup.cfg","modelCount":1,"models":{{"{model_name}":{{"1.0":{{"defaultVersion":true,"marName":"{model_name}.mar","minWorkers":1,"maxWorkers":5,"batchSize":1,"maxBatchDelay":10,"responseTimeout":120}}}}}}}}' - ) - f.close() - return f.name diff --git a/src/zenml/utils/mlstacks_utils.py b/src/zenml/utils/mlstacks_utils.py index 9806acd2b2..046dd5894c 100644 --- a/src/zenml/utils/mlstacks_utils.py +++ b/src/zenml/utils/mlstacks_utils.py @@ -193,7 +193,6 @@ def _add_extra_config_to_components( "mlflow-password", ], ("model_deployer", "seldon", "k3d"): ["seldon-secret-name"], - ("model_deployer", "kserve", "k3d"): ["kserve-secret-name"], } def _add_config(