Skip to content
This repository was archived by the owner on Jun 3, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ deepsparse.benchmark [-h] [-b BATCH_SIZE] [-shapes INPUT_SHAPES]
## 👩‍💻 NLP Inference Example

```python
from deepsparse.transformers import pipeline
from deepsparse import Pipeline

# SparseZoo model stub or path to ONNX file
model_path = "zoo:nlp/question_answering/bert-base/pytorch/huggingface/squad/12layer_pruned80_quant-none-vnni"

qa_pipeline = pipeline(
qa_pipeline = Pipeline.create(
task="question-answering",
model_path=model_path,
)
Expand Down
1 change: 1 addition & 0 deletions src/deepsparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
cpu_vnni_compatible,
)
from .engine import *
from .pipeline import *
from .version import __version__, is_release


Expand Down
82 changes: 56 additions & 26 deletions src/deepsparse/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import os
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Type, Union

import numpy
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -72,7 +72,8 @@ class Pipeline(ABC):
* `engine` <- `_initialize_engine`

- on __call__:
* `pre_processed_inputs` <- `process_inputs(inputs: input_model)`
* `parsed_inputs: input_model` <- `parse_inputs(*args, **kwargs)`
* `pre_processed_inputs` <- `process_inputs(parsed_inputs)`
* `engine_outputs` <- `engine(pre_processed_inputs)`
* `outputs: output_model` <- `process_engine_outputs(engine_outputs)`

Expand Down Expand Up @@ -133,25 +134,29 @@ def __init__(
self._engine_args["scheduler"] = scheduler

self.onnx_file_path = self.setup_onnx_file_path()
self._engine = self._initialize_engine()
self.engine = self._initialize_engine()

def __call__(self, pipeline_inputs: BaseModel = None, **kwargs) -> BaseModel:
if pipeline_inputs is None and kwargs:
# parse kwarg inputs into the expected input format
pipeline_inputs = self.input_model(**kwargs)

# validate inputs format
def __call__(self, *args, **kwargs) -> BaseModel:
# parse inputs into input_model schema if necessary
pipeline_inputs = self.parse_inputs(*args, **kwargs)
if not isinstance(pipeline_inputs, self.input_model):
raise ValueError(
f"Calling {self.__class__} requires passing inputs as an "
f"{self.input_model} object or a list of kwargs used to create "
f"a {self.input_model} object"
raise RuntimeError(
f"Unable to parse {self.__class__} inputs into a "
f"{self.input_model} object. Inputs parsed to {type(pipeline_inputs)}"
)

# run pipeline
engine_inputs: List[numpy.ndarray] = self.process_inputs(pipeline_inputs)

if isinstance(engine_inputs, tuple):
engine_inputs, postprocess_kwargs = engine_inputs
else:
postprocess_kwargs = {}

engine_outputs: List[numpy.ndarray] = self.engine(engine_inputs)
pipeline_outputs = self.process_engine_outputs(engine_outputs)
pipeline_outputs = self.process_engine_outputs(
engine_outputs, **postprocess_kwargs
)

# validate outputs format
if not isinstance(pipeline_outputs, self.output_model):
Expand Down Expand Up @@ -306,17 +311,27 @@ class properties into an inference ready onnx file to be compiled by the
raise NotImplementedError()

@abstractmethod
def process_inputs(self, inputs: BaseModel) -> List[numpy.ndarray]:
def process_inputs(
self,
inputs: BaseModel,
) -> Union[List[numpy.ndarray], Tuple[List[numpy.ndarray], Dict[str, Any]]]:
"""
:param inputs: inputs to the pipeline. Must be the type of the `input_model`
of this pipeline
:return: inputs of this model processed into a list of numpy arrays that
can be directly passed into the forward pass of the pipeline engine
can be directly passed into the forward pass of the pipeline engine. Can
also include a tuple with engine inputs and special key word arguments
to pass to process_engine_outputs to facilitate information from the raw
inputs to postprocessing that may not be included in the engine inputs
"""
raise NotImplementedError()

@abstractmethod
def process_engine_outputs(self, engine_outputs: List[numpy.ndarray]) -> BaseModel:
def process_engine_outputs(
self,
engine_outputs: List[numpy.ndarray],
**kwargs,
) -> BaseModel:
"""
:param engine_outputs: list of numpy arrays that are the output of the engine
forward pass
Expand All @@ -327,15 +342,15 @@ def process_engine_outputs(self, engine_outputs: List[numpy.ndarray]) -> BaseMod

@property
@abstractmethod
def input_model(self) -> BaseModel:
def input_model(self) -> Type[BaseModel]:
"""
:return: pydantic model class that inputs to this pipeline must comply to
"""
raise NotImplementedError()

@property
@abstractmethod
def output_model(self) -> BaseModel:
def output_model(self) -> Type[BaseModel]:
"""
:return: pydantic model class that outputs of this pipeline must comply to
"""
Expand Down Expand Up @@ -365,13 +380,6 @@ def model_path(self) -> str:
"""
return self._model_path

@property
def engine(self) -> Union[Engine, ORTEngine]:
"""
:return: engine instance used for model forward pass in pipeline
"""
return self._engine

@property
def engine_args(self) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -417,6 +425,28 @@ def to_config(self) -> "PipelineConfig":
kwargs=kwargs,
)

def parse_inputs(self, *args, **kwargs) -> BaseModel:
"""
:param args: ordered arguments to pipeline, only an input_model object
is supported as an arg for this function
:param kwargs: keyword arguments to pipeline
:return: pipeline arguments parsed into the given `input_model`
schema if necessary. If an instance of the `input_model` is provided
it will be returned
"""
# passed input_model schema directly
if len(args) == 1 and isinstance(args[0], self.input_model) and not kwargs:
return args[0]

if args:
raise ValueError(
f"pipeline {self.__class__} only supports either only a "
f"{self.input_model} object. or keyword arguments to be construct one. "
f"Found {len(args)} args and {len(kwargs)} kwargs"
)

return self.input_model(**kwargs)

def _initialize_engine(self) -> Union[Engine, ORTEngine]:
engine_type = self.engine_type.lower()

Expand Down
2 changes: 1 addition & 1 deletion src/deepsparse/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _add_pipeline_route(
async def _predict_func(request: pipeline.input_model):
results = await execute_async(
pipeline,
**vars(request),
request,
)
return serializable_response(results)

Expand Down
1 change: 0 additions & 1 deletion src/deepsparse/transformers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,3 @@ def _check_transformers_install():
from .helpers import *
from .loaders import *
from .pipelines import *
from .server import *
34 changes: 17 additions & 17 deletions src/deepsparse/transformers/eval_downstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

from tqdm.auto import tqdm

from deepsparse.transformers import pipeline
from deepsparse import Pipeline


from datasets import load_dataset, load_metric # isort: skip
Expand All @@ -79,14 +79,14 @@ def squad_eval(args):
squad_metrics = load_metric("squad")

# load QA pipeline
question_answer = pipeline(
question_answer = Pipeline.create(
task="question-answering",
model_path=args.onnx_filepath,
engine_type=args.engine,
num_cores=args.num_cores,
max_length=args.max_sequence_length,
sequence_length=args.max_sequence_length,
)
print(f"Engine info: {question_answer.model}")
print(f"Engine info: {question_answer.engine}")

for idx, sample in enumerate(tqdm(squad)):
pred = question_answer(
Expand All @@ -96,7 +96,7 @@ def squad_eval(args):
)

squad_metrics.add_batch(
predictions=[{"prediction_text": pred["answer"], "id": sample["id"]}],
predictions=[{"prediction_text": pred.answer, "id": sample["id"]}],
references=[{"answers": sample["answers"], "id": sample["id"]}],
)

Expand All @@ -114,21 +114,21 @@ def mnli_eval(args):
mnli_metrics = load_metric("glue", "mnli")

# load pipeline
text_classify = pipeline(
text_classify = Pipeline.create(
task="text-classification",
model_path=args.onnx_filepath,
engine_type=args.engine,
num_cores=args.num_cores,
max_length=args.max_sequence_length,
sequence_length=args.max_sequence_length,
)
print(f"Engine info: {text_classify.model}")
print(f"Engine info: {text_classify.engine}")

label_map = {"entailment": 0, "neutral": 1, "contradiction": 2}

for idx, sample in enumerate(tqdm(mnli_matched)):
pred = text_classify([[sample["premise"], sample["hypothesis"]]])
mnli_metrics.add_batch(
predictions=[label_map.get(pred[0]["label"])],
predictions=[label_map.get(pred.labels[0])],
references=[sample["label"]],
)

Expand All @@ -154,22 +154,22 @@ def qqp_eval(args):
qqp_metrics = load_metric("glue", "qqp")

# load pipeline
text_classify = pipeline(
text_classify = Pipeline.create(
task="text-classification",
model_path=args.onnx_filepath,
engine_type=args.engine,
num_cores=args.num_cores,
max_length=args.max_sequence_length,
sequence_length=args.max_sequence_length,
)
print(f"Engine info: {text_classify.model}")
print(f"Engine info: {text_classify.engine}")

label_map = {"not_duplicate": 0, "duplicate": 1}

for idx, sample in enumerate(tqdm(qqp)):
pred = text_classify([[sample["question1"], sample["question2"]]])

qqp_metrics.add_batch(
predictions=[label_map.get(pred[0]["label"])],
predictions=[label_map.get(pred.labels[0])],
references=[sample["label"]],
)

Expand All @@ -185,14 +185,14 @@ def sst2_eval(args):
sst2_metrics = load_metric("glue", "sst2")

# load pipeline
text_classify = pipeline(
text_classify = Pipeline.create(
task="text-classification",
model_path=args.onnx_filepath,
engine_type=args.engine,
num_cores=args.num_cores,
max_length=args.max_sequence_length,
sequence_length=args.max_sequence_length,
)
print(f"Engine info: {text_classify.model}")
print(f"Engine info: {text_classify.engine}")

label_map = {"negative": 0, "positive": 1}

Expand All @@ -202,7 +202,7 @@ def sst2_eval(args):
)

sst2_metrics.add_batch(
predictions=[label_map.get(pred[0]["label"])],
predictions=[label_map.get(pred.labels[0])],
references=[sample["label"]],
)

Expand Down
Loading