Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
"""


import os
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import numpy
from pydantic import BaseModel
from pydantic import BaseModel, Field

from deepsparse import Engine, Scheduler
from deepsparse.benchmark import ORTEngine
Expand All @@ -34,6 +36,7 @@
"ORT_ENGINE",
"SUPPORTED_PIPELINE_ENGINES",
"Pipeline",
"PipelineConfig",
]


Expand Down Expand Up @@ -171,7 +174,7 @@ def create(
input_shapes: List[List[int]] = None,
alias: Optional[str] = None,
**kwargs,
):
) -> "Pipeline":
"""
:param task: name of task to create a pipeline for
:param model_path: path on local system or SparseZoo stub to load the model
Expand All @@ -185,10 +188,10 @@ def create(
Pass None for the default
:param input_shapes: list of shapes to set ONNX the inputs to. Pass None
to use model as-is. Default is None
:param kwargs: extra task specific kwargs to be passed to task Pipeline
implementation
:param alias: optional name to give this pipeline instance, useful when
inferencing with multiple models. Default is None
:param kwargs: extra task specific kwargs to be passed to task Pipeline
implementation
:return: pipeline object initialized for the given task
"""
task = task.lower().replace("-", "_")
Expand Down Expand Up @@ -264,6 +267,34 @@ def _register_pipeline_tasks_decorator(pipeline_class: Pipeline):

return _register_pipeline_tasks_decorator

@classmethod
def from_config(cls, config: Union["PipelineConfig", str, Path]) -> "Pipeline":
"""
:param config: PipelineConfig object, filepath to a json serialized
PipelineConfig, or raw string of a json serialized PipelineConfig
:return: loaded Pipeline object from the config
"""
if isinstance(config, Path) or (
isinstance(config, str) and os.path.exists(config)
):
if isinstance(config, str):
config = Path(config)
config = PipelineConfig.parse_file(config)
if isinstance(config, str):
config = PipelineConfig.parse_raw(config)

return cls.create(
task=config.task,
model_path=config.model_path,
engine_type=config.engine_type,
batch_size=config.batch_size,
num_cores=config.num_cores,
scheduler=config.scheduler,
input_shapes=config.input_shapes,
alias=config.alias,
**config.kwargs,
)

@abstractmethod
def setup_onnx_file_path(self) -> str:
"""
Expand Down Expand Up @@ -363,6 +394,37 @@ def onnx_file_path(self) -> str:
"""
return self._onnx_file_path

def to_config(self) -> "PipelineConfig":
"""
:return: PipelineConfig that can be used to reload this object
"""

if not hasattr(self, "task"):
raise RuntimeError(
f"{self.__class__} instance has no attribute task. Pipeline objects "
"must have a task to be serialized to a config. Pipeline objects "
"must be declared with the Pipeline.register object to be assigned a "
"task"
)

# parse any additional properties as kwargs
kwargs = {}
for attr_name, attr in self.__class__.__dict__.items():
if isinstance(attr, property) and attr_name not in dir(PipelineConfig):
kwargs[attr_name] = getattr(self, attr_name)

return PipelineConfig(
task=self.task,
model_path=self.model_path_orig,
engine_type=self.engine_type,
batch_size=self.batch_size,
num_cores=self.num_cores,
scheduler=self.scheduler,
input_shapes=self.input_shapes,
alias=self.alias,
kwargs=kwargs,
)

def _initialize_engine(self) -> Union[Engine, ORTEngine]:
engine_type = self.engine_type.lower()

Expand All @@ -375,3 +437,65 @@ def _initialize_engine(self) -> Union[Engine, ORTEngine]:
f"Unknown engine_type {self.engine_type}. Supported values include: "
f"{SUPPORTED_PIPELINE_ENGINES}"
)


class PipelineConfig(BaseModel):
"""
Configuration for creating a Pipeline object

Can be used to create a Pipeline from a config object or file with
Pipeline.from_config(), or used as a building block for other configs
such as for deepsparse.server
"""

task: str = Field(
description="name of task to create a pipeline for",
)
model_path: str = Field(
description="path on local system or SparseZoo stub to load the model from",
)
engine_type: str = Field(
default=DEEPSPARSE_ENGINE,
description=(
"inference engine to use. Currently supported values include "
"'deepsparse' and 'onnxruntime'. Default is 'deepsparse'"
),
)
batch_size: int = Field(
default=1,
description=("static batch size to use for inference. Default is 1"),
)
num_cores: int = Field(
default=None,
description=(
"number of CPU cores to allocate for inference engine. None"
"specifies all available cores. Default is None"
),
)
scheduler: str = Field(
default="async",
description=(
"(deepsparse only) kind of scheduler to execute with. Defaults to async"
),
)
input_shapes: List[List[int]] = Field(
default=None,
description=(
"list of shapes to set ONNX the inputs to. Pass None to use model as-is. "
"Default is None"
),
)
alias: str = Field(
default=None,
description=(
"optional name to give this pipeline instance, useful when inferencing "
"with multiple models. Default is None"
),
)
kwargs: Dict[str, Any] = Field(
default={},
description=(
"Additional arguments for inference with the model that will be passed "
"into the pipeline as kwargs"
),
)