diff --git a/src/deepsparse/pipeline.py b/src/deepsparse/pipeline.py index 4ffca2af4a..6e210a721c 100644 --- a/src/deepsparse/pipeline.py +++ b/src/deepsparse/pipeline.py @@ -18,11 +18,13 @@ """ +import os from abc import ABC, abstractmethod +from pathlib import Path from typing import Any, Dict, List, Optional, Union import numpy -from pydantic import BaseModel +from pydantic import BaseModel, Field from deepsparse import Engine, Scheduler from deepsparse.benchmark import ORTEngine @@ -34,6 +36,7 @@ "ORT_ENGINE", "SUPPORTED_PIPELINE_ENGINES", "Pipeline", + "PipelineConfig", ] @@ -171,7 +174,7 @@ def create( input_shapes: List[List[int]] = None, alias: Optional[str] = None, **kwargs, - ): + ) -> "Pipeline": """ :param task: name of task to create a pipeline for :param model_path: path on local system or SparseZoo stub to load the model @@ -185,10 +188,10 @@ def create( Pass None for the default :param input_shapes: list of shapes to set ONNX the inputs to. Pass None to use model as-is. Default is None - :param kwargs: extra task specific kwargs to be passed to task Pipeline - implementation :param alias: optional name to give this pipeline instance, useful when inferencing with multiple models. Default is None + :param kwargs: extra task specific kwargs to be passed to task Pipeline + implementation :return: pipeline object initialized for the given task """ task = task.lower().replace("-", "_") @@ -264,6 +267,34 @@ def _register_pipeline_tasks_decorator(pipeline_class: Pipeline): return _register_pipeline_tasks_decorator + @classmethod + def from_config(cls, config: Union["PipelineConfig", str, Path]) -> "Pipeline": + """ + :param config: PipelineConfig object, filepath to a json serialized + PipelineConfig, or raw string of a json serialized PipelineConfig + :return: loaded Pipeline object from the config + """ + if isinstance(config, Path) or ( + isinstance(config, str) and os.path.exists(config) + ): + if isinstance(config, str): + config = Path(config) + config = PipelineConfig.parse_file(config) + if isinstance(config, str): + config = PipelineConfig.parse_raw(config) + + return cls.create( + task=config.task, + model_path=config.model_path, + engine_type=config.engine_type, + batch_size=config.batch_size, + num_cores=config.num_cores, + scheduler=config.scheduler, + input_shapes=config.input_shapes, + alias=config.alias, + **config.kwargs, + ) + @abstractmethod def setup_onnx_file_path(self) -> str: """ @@ -363,6 +394,37 @@ def onnx_file_path(self) -> str: """ return self._onnx_file_path + def to_config(self) -> "PipelineConfig": + """ + :return: PipelineConfig that can be used to reload this object + """ + + if not hasattr(self, "task"): + raise RuntimeError( + f"{self.__class__} instance has no attribute task. Pipeline objects " + "must have a task to be serialized to a config. Pipeline objects " + "must be declared with the Pipeline.register object to be assigned a " + "task" + ) + + # parse any additional properties as kwargs + kwargs = {} + for attr_name, attr in self.__class__.__dict__.items(): + if isinstance(attr, property) and attr_name not in dir(PipelineConfig): + kwargs[attr_name] = getattr(self, attr_name) + + return PipelineConfig( + task=self.task, + model_path=self.model_path_orig, + engine_type=self.engine_type, + batch_size=self.batch_size, + num_cores=self.num_cores, + scheduler=self.scheduler, + input_shapes=self.input_shapes, + alias=self.alias, + kwargs=kwargs, + ) + def _initialize_engine(self) -> Union[Engine, ORTEngine]: engine_type = self.engine_type.lower() @@ -375,3 +437,65 @@ def _initialize_engine(self) -> Union[Engine, ORTEngine]: f"Unknown engine_type {self.engine_type}. Supported values include: " f"{SUPPORTED_PIPELINE_ENGINES}" ) + + +class PipelineConfig(BaseModel): + """ + Configuration for creating a Pipeline object + + Can be used to create a Pipeline from a config object or file with + Pipeline.from_config(), or used as a building block for other configs + such as for deepsparse.server + """ + + task: str = Field( + description="name of task to create a pipeline for", + ) + model_path: str = Field( + description="path on local system or SparseZoo stub to load the model from", + ) + engine_type: str = Field( + default=DEEPSPARSE_ENGINE, + description=( + "inference engine to use. Currently supported values include " + "'deepsparse' and 'onnxruntime'. Default is 'deepsparse'" + ), + ) + batch_size: int = Field( + default=1, + description=("static batch size to use for inference. Default is 1"), + ) + num_cores: int = Field( + default=None, + description=( + "number of CPU cores to allocate for inference engine. None" + "specifies all available cores. Default is None" + ), + ) + scheduler: str = Field( + default="async", + description=( + "(deepsparse only) kind of scheduler to execute with. Defaults to async" + ), + ) + input_shapes: List[List[int]] = Field( + default=None, + description=( + "list of shapes to set ONNX the inputs to. Pass None to use model as-is. " + "Default is None" + ), + ) + alias: str = Field( + default=None, + description=( + "optional name to give this pipeline instance, useful when inferencing " + "with multiple models. Default is None" + ), + ) + kwargs: Dict[str, Any] = Field( + default={}, + description=( + "Additional arguments for inference with the model that will be passed " + "into the pipeline as kwargs" + ), + )