diff --git a/.gitignore b/.gitignore index dfcfd56..4708147 100644 --- a/.gitignore +++ b/.gitignore @@ -1,350 +1,146 @@ -## Ignore Visual Studio temporary files, build results, and -## files generated by popular Visual Studio add-ons. -## -## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore - -# User-specific files -*.rsuser -*.suo -*.user -*.userosscache -*.sln.docstates - -# User-specific files (MonoDevelop/Xamarin Studio) -*.userprefs - -# Mono auto generated files -mono_crash.* - -# Build results -[Dd]ebug/ -[Dd]ebugPublic/ -[Rr]elease/ -[Rr]eleases/ -x64/ -x86/ -[Aa][Rr][Mm]/ -[Aa][Rr][Mm]64/ -bld/ -[Bb]in/ -[Oo]bj/ -[Ll]og/ -[Ll]ogs/ - -# Visual Studio 2015/2017 cache/options directory -.vs/ -# Uncomment if you have tasks that create the project's static files in wwwroot -#wwwroot/ - -# Visual Studio 2017 auto generated files -Generated\ Files/ - -# MSTest test Results -[Tt]est[Rr]esult*/ -[Bb]uild[Ll]og.* - -# NUnit -*.VisualState.xml -TestResult.xml -nunit-*.xml - -# Build Results of an ATL Project -[Dd]ebugPS/ -[Rr]eleasePS/ -dlldata.c - -# Benchmark Results -BenchmarkDotNet.Artifacts/ - -# .NET Core -project.lock.json -project.fragment.lock.json -artifacts/ - -# StyleCop -StyleCopReport.xml - -# Files built by Visual Studio -*_i.c -*_p.c -*_h.h -*.ilk -*.meta -*.obj -*.iobj -*.pch -*.pdb -*.ipdb -*.pgc -*.pgd -*.rsp -*.sbr -*.tlb -*.tli -*.tlh -*.tmp -*.tmp_proj -*_wpftmp.csproj +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: *.log -*.vspscc -*.vssscc -.builds -*.pidb -*.svclog -*.scc - -# Chutzpah Test files -_Chutzpah* - -# Visual C++ cache files -ipch/ -*.aps -*.ncb -*.opendb -*.opensdf -*.sdf -*.cachefile -*.VC.db -*.VC.VC.opendb - -# Visual Studio profiler -*.psess -*.vsp -*.vspx -*.sap - -# Visual Studio Trace Files -*.e2e - -# TFS 2012 Local Workspace -$tf/ - -# Guidance Automation Toolkit -*.gpState - -# ReSharper is a .NET coding add-in -_ReSharper*/ -*.[Rr]e[Ss]harper -*.DotSettings.user - -# TeamCity is a build add-in -_TeamCity* - -# DotCover is a Code Coverage Tool -*.dotCover - -# AxoCover is a Code Coverage Tool -.axoCover/* -!.axoCover/settings.json - -# Visual Studio code coverage results -*.coverage -*.coveragexml +local_settings.py +db.sqlite3 +db.sqlite3-journal -# NCrunch -_NCrunch_* -.*crunch*.local.xml -nCrunchTemp_* +# Flask stuff: +instance/ +.webassets-cache -# MightyMoose -*.mm.* -AutoTest.Net/ +# Scrapy stuff: +.scrapy -# Web workbench (sass) -.sass-cache/ +# Sphinx documentation +docs/_build/ -# Installshield output folder -[Ee]xpress/ +# PyBuilder +target/ -# DocProject is a documentation generator add-in -DocProject/buildhelp/ -DocProject/Help/*.HxT -DocProject/Help/*.HxC -DocProject/Help/*.hhc -DocProject/Help/*.hhk -DocProject/Help/*.hhp -DocProject/Help/Html2 -DocProject/Help/html +# Jupyter Notebook +.ipynb_checkpoints -# Click-Once directory -publish/ +# IPython +profile_default/ +ipython_config.py -# Publish Web Output -*.[Pp]ublish.xml -*.azurePubxml -# Note: Comment the next line if you want to checkin your web deploy settings, -# but database connection strings (with potential passwords) will be unencrypted -*.pubxml -*.publishproj +# pyenv +.python-version -# Microsoft Azure Web App publish settings. Comment the next line if you want to -# checkin your Azure Web App publish settings, but sensitive information contained -# in these scripts will be unencrypted -PublishScripts/ - -# NuGet Packages -*.nupkg -# NuGet Symbol Packages -*.snupkg -# The packages folder can be ignored because of Package Restore -**/[Pp]ackages/* -# except build/, which is used as an MSBuild target. -!**/[Pp]ackages/build/ -# Uncomment if necessary however generally it will be regenerated when needed -#!**/[Pp]ackages/repositories.config -# NuGet v3's project.json files produces more ignorable files -*.nuget.props -*.nuget.targets - -# Microsoft Azure Build Output -csx/ -*.build.csdef - -# Microsoft Azure Emulator -ecf/ -rcf/ - -# Windows Store app package directories and files -AppPackages/ -BundleArtifacts/ -Package.StoreAssociation.xml -_pkginfo.txt -*.appx -*.appxbundle -*.appxupload - -# Visual Studio cache files -# files ending in .cache can be ignored -*.[Cc]ache -# but keep track of directories ending in .cache -!?*.[Cc]ache/ - -# Others -ClientBin/ -~$* -*~ -*.dbmdl -*.dbproj.schemaview -*.jfm -*.pfx -*.publishsettings -orleans.codegen.cs - -# Including strong name files can present a security risk -# (https://github.com/github/gitignore/pull/2483#issue-259490424) -#*.snk - -# Since there are multiple workflows, uncomment next line to ignore bower_components -# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) -#bower_components/ - -# RIA/Silverlight projects -Generated_Code/ - -# Backup & report files from converting an old project file -# to a newer Visual Studio version. Backup files are not needed, -# because we have git ;-) -_UpgradeReport_Files/ -Backup*/ -UpgradeLog*.XML -UpgradeLog*.htm -ServiceFabricBackup/ -*.rptproj.bak - -# SQL Server files -*.mdf -*.ldf -*.ndf - -# Business Intelligence projects -*.rdl.data -*.bim.layout -*.bim_*.settings -*.rptproj.rsuser -*- [Bb]ackup.rdl -*- [Bb]ackup ([0-9]).rdl -*- [Bb]ackup ([0-9][0-9]).rdl - -# Microsoft Fakes -FakesAssemblies/ - -# GhostDoc plugin setting file -*.GhostDoc.xml - -# Node.js Tools for Visual Studio -.ntvs_analysis.dat -node_modules/ - -# Visual Studio 6 build log -*.plg - -# Visual Studio 6 workspace options file -*.opt - -# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) -*.vbw - -# Visual Studio LightSwitch build output -**/*.HTMLClient/GeneratedArtifacts -**/*.DesktopClient/GeneratedArtifacts -**/*.DesktopClient/ModelManifest.xml -**/*.Server/GeneratedArtifacts -**/*.Server/ModelManifest.xml -_Pvt_Extensions - -# Paket dependency manager -.paket/paket.exe -paket-files/ - -# FAKE - F# Make -.fake/ - -# CodeRush personal settings -.cr/personal - -# Python Tools for Visual Studio (PTVS) -__pycache__/ -*.pyc +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock -# Cake - Uncomment if you are using it -# tools/** -# !tools/packages.config +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ -# Tabs Studio -*.tss +# Celery stuff +celerybeat-schedule +celerybeat.pid -# Telerik's JustMock configuration file -*.jmconfig +# SageMath parsed files +*.sage.py -# BizTalk build output -*.btp.cs -*.btm.cs -*.odx.cs -*.xsd.cs +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ -# OpenCover UI analysis results -OpenCover/ +# Spyder project settings +.spyderproject +.spyproject -# Azure Stream Analytics local run output -ASALocalRun/ +# Rope project settings +.ropeproject -# MSBuild Binary and Structured Log -*.binlog +# mkdocs documentation +/site -# NVidia Nsight GPU debugger configuration file -*.nvuser +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json -# MFractors (Xamarin productivity tool) working folder -.mfractor/ +# Pyre type checker +.pyre/ -# Local History for Visual Studio -.localhistory/ +# VS Code +.vscode/ -# BeatPulse healthcheck temp database -healthchecksdb +# temp lock files +lockfile_* -# Backup folder for Package Reference Convert tool in Visual Studio 2017 -MigrationBackup/ +mlperf_log_* +log.* +.idea/ -# Ionide (cross platform F# VS Code tools) working folder -.ionide/ +runs/ +log/ +output* +*.out +pyflame* +*.cprofile \ No newline at end of file diff --git a/.markdownlint.yaml b/.markdownlint.yaml new file mode 100644 index 0000000..f30749b --- /dev/null +++ b/.markdownlint.yaml @@ -0,0 +1,8 @@ +MD013: + code_blocks: false + headers: false + line_length: 120 + tables: false + +MD046: + style: fenced diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..ccb15c8 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,69 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: check-ast + - id: check-builtin-literals + - id: check-docstring-first + - id: check-merge-conflict + - id: check-yaml + - id: check-toml + - id: debug-statements + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/asottile/pyupgrade + rev: v3.3.1 + hooks: + - id: pyupgrade + args: ["--py37-plus"] + - repo: https://github.com/PyCQA/isort + rev: 5.12.0 + hooks: + - id: isort + - repo: https://github.com/psf/black + rev: 23.1.0 + hooks: + - id: black + args: [--safe] + - repo: https://github.com/asottile/blacken-docs + rev: 1.13.0 + hooks: + - id: blacken-docs + additional_dependencies: [black==23.1] + # - repo: https://github.com/pre-commit/pygrep-hooks + # rev: v1.10.0 + # hooks: + # - id: rst-backticks + - repo: https://github.com/tox-dev/pyproject-fmt + rev: "0.9.2" + hooks: + - id: pyproject-fmt + # - repo: https://github.com/PyCQA/flake8 + # rev: 6.0.0 + # hooks: + # - id: flake8 + # additional_dependencies: + # - flake8-bugbear==23.3.12 + # - flake8-comprehensions==3.11.1 + # - flake8-pytest-style==1.7.2 + # - flake8-spellcheck==0.28 + # - flake8-unused-arguments==0.0.13 + # - flake8-noqa==1.3.1 + # - pep8-naming==0.13.3 + # - flake8-pyproject==1.2.3 + - repo: https://github.com/pre-commit/mirrors-prettier + rev: "v2.7.1" + hooks: + - id: prettier + additional_dependencies: + - prettier@2.7.1 + - "@prettier/plugin-xml@2.2" + args: ["--print-width=120", "--prose-wrap=always"] + - repo: https://github.com/igorshubovych/markdownlint-cli + rev: v0.33.0 + hooks: + - id: markdownlint + - repo: meta + hooks: + - id: check-hooks-apply + - id: check-useless-excludes diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..ab30113 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,15 @@ +version: 2 +build: + os: ubuntu-22.04 + tools: + python: "3" +python: + install: + - method: pip + path: . + extra_requirements: + - docs +sphinx: + builder: html + configuration: docs/conf.py + fail_on_warning: true diff --git a/README.md b/README.md index 5cd7cec..6bf08be 100644 --- a/README.md +++ b/README.md @@ -1,33 +1,68 @@ -# Project +# Batch Inference Toolkit -> This repo has been populated by an initial template to help get you started. Please -> make sure to update the content to build a great experience for community-building. +Batch Inference Toolkit(batch-inference) is a Python package that batches model input tensors coming from multiple users dynamically, executes the model, un-batches output tensors and then returns them back to each user respectively. This will improve system throughput because of a better cache locality. The entire process is transparent to developers. -As the maintainer of this project, please make a few updates: +## Installation -- Improving this README.MD file to provide a great experience -- Updating SUPPORT.MD with content about this project's support experience -- Understanding the security reporting process in SECURITY.MD -- Remove this section from the README +**Install from Pip** _(Coming Soon)_ -## Contributing +```bash +python -m pip install batch-inference --upgrade +``` -This project welcomes contributions and suggestions. Most contributions require you to agree to a -Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us -the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com. +**Build and Install from Source** _(for developers)_ -When you submit a pull request, a CLA bot will automatically determine whether you need to provide -a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions -provided by the bot. You will only need to do this once across all repos using our CLA. +```bash +git clone https://github.com/microsoft/batch-inference.git +python -m pip install -e .[docs,testing] -This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). -For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or -contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. +# if you want to format the code before commit +pip install pre-commit +pre-commit install -## Trademarks +# run unittests +python -m unittest discover tests +``` -This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft -trademarks or logos is subject to and must follow -[Microsoft's Trademark & Brand Guidelines](https://www.microsoft.com/en-us/legal/intellectualproperty/trademarks/usage/general). -Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. -Any use of third-party trademarks or logos are subject to those third-party's policies. +## Example + +```python +import threading +import numpy as np +from batch_inference import batching + + +@batching(max_batch_size=32) +class MyModel: + def __init__(self, k, n): + self.weights = np.random.randn((k, n)).astype("f") + + # x: [batch_size, m, k], self.weights: [k, n] + def predict_batch(self, x): + y = np.matmul(x, self.weights) + return y + + +with MyModel.host(3, 3) as host: + def send_requests(): + for _ in range(0, 10): + x = np.random.randn(1, 3, 3).astype("f") + y = host.predict(x) + + threads = [threading.Thread(target=send_requests) for i in range(0, 32)] + [th.start() for th in threads] + [th.join() for th in threads] + +``` + +## Build the Docs + +Run the following commands and open `docs/_build/html/index.html` in browser. + +```bash +pip install sphinx myst-parser sphinx-rtd-theme sphinxemoji +cd docs/ + +make html # for linux +.\make.bat html # for windows +``` diff --git a/SUPPORT.md b/SUPPORT.md deleted file mode 100644 index 291d4d4..0000000 --- a/SUPPORT.md +++ /dev/null @@ -1,25 +0,0 @@ -# TODO: The maintainer of this repo has not yet edited this file - -**REPO OWNER**: Do you want Customer Service & Support (CSS) support for this product/project? - -- **No CSS support:** Fill out this template with information about how to file issues and get help. -- **Yes CSS support:** Fill out an intake form at [aka.ms/onboardsupport](https://aka.ms/onboardsupport). CSS will work with/help you to determine next steps. -- **Not sure?** Fill out an intake as though the answer were "Yes". CSS will help you decide. - -*Then remove this first heading from this SUPPORT.MD file before publishing your repo.* - -# Support - -## How to file issues and get help - -This project uses GitHub Issues to track bugs and feature requests. Please search the existing -issues before filing new issues to avoid duplicates. For new issues, file your bug or -feature request as a new Issue. - -For help and questions about using this project, please **REPO MAINTAINER: INSERT INSTRUCTIONS HERE -FOR HOW TO ENGAGE REPO OWNERS OR COMMUNITY FOR HELP. COULD BE A STACK OVERFLOW TAG OR OTHER -CHANNEL. WHERE WILL YOU HELP PEOPLE?**. - -## Microsoft Support Policy - -Support for this **PROJECT or PRODUCT** is limited to the resources listed above. diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/README.rst b/docs/README.rst new file mode 100644 index 0000000..e16d4ba --- /dev/null +++ b/docs/README.rst @@ -0,0 +1,78 @@ +============================= +Batch Inference Toolkit +============================= + +Batch Inference Toolkit(batch-inference) is a Python package that batches model input tensors coming from multiple users dynamically, executes the model, un-batches output tensors and then returns them back to each user respectively. This will improve system throughput because of a better cache locality. The entire process is transparent to developers. + +.. figure:: figures/batching_overview.png + :width: 500 + :align: center + :alt: How Batching Inference Works + +Installation +============================ + +**Install from Pip** *(Coming Soon)* + +.. code:: bash + + python -m pip install batch-inference --upgrade + +**Build and Install from Source** + +.. code:: bash + + git clone https://github.com/microsoft/batch-inference.git + cd python + python -m pip install -e . + + # if you want to format the code before commit + pip install pre-commit + pre-commit install + + # run unittests + python -m unittest discover tests + +Example +============================ + +.. code:: python + + import threading + import numpy as np + from batch_inference import batching + + + @batching(max_batch_size=32) + class MyModel: + def __init__(self, k, n): + self.weights = np.random.randn((k, n)).astype("f") + + # x: [batch_size, m, k], self.weights: [k, n] + def predict_batch(self, x): + y = np.matmul(x, self.weights) + return y + + + with MyModel.host(3, 3) as host: + def send_requests(): + for _ in range(0, 10): + x = np.random.randn(1, 3, 3).astype("f") + y = host.predict(x) + + threads = [threading.Thread(target=send_requests) for i in range(0, 32)] + [th.start() for th in threads] + [th.join() for th in threads] + +Build the Docs +============================= + +Run the following commands and open ``docs/_build/html/index.html`` in browser. + +.. code:: bash + + pip install sphinx myst-parser sphinx-rtd-theme sphinxemoji + cd docs/ + + make html # for linux + .\make.bat html # for windows diff --git a/docs/batcher/bucket_seq_batcher.rst b/docs/batcher/bucket_seq_batcher.rst new file mode 100644 index 0000000..ba3e2d4 --- /dev/null +++ b/docs/batcher/bucket_seq_batcher.rst @@ -0,0 +1,10 @@ +========================== +Bucket Sequence Batcher +========================== + +Similar to `SeqBatcher`, `BucketSeqBatcher` provides batching support for sequence inputs with variant lengths. The difference is that it will group sequences with similar lengths into the same batch, instead of having all input sequences into a single batch, to reduce the padding cost. This is useful for sequences with significantly different lengths, where some of the sequences are short, but the others are very long. + +The following example defines 4 buckets to accommodate sequence of different lenghts: `<=1024`, `(1024, 2048]`, `(2048, 4096]` and `>4096`. The `BucketSeqBatcher` will sort input sequences by lengths, put them in corresponding buckets and then batch sequences within the same bucket. For example, if the sequence lenght is 2000, the `BucketSeqBatcher` will put it into the 2nd bucket. It won't be batched with a sequence of length 500, which is in the 1st bucket. + +.. literalinclude:: ./bucket_seq_batcher_example.py + :language: python diff --git a/docs/batcher/bucket_seq_batcher_example.py b/docs/batcher/bucket_seq_batcher_example.py new file mode 100644 index 0000000..58bb064 --- /dev/null +++ b/docs/batcher/bucket_seq_batcher_example.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from batch_inference import ModelHost +from batch_inference.batcher import BucketSeqBatcher + + +class MyModel: + def __init__(self): + pass + + # input: [batch_size, n] + def predict_batch(self, seq): + res = seq + return res + + +model_host = ModelHost( + MyModel, + batcher=BucketSeqBatcher(padding_tokens=[0, 0], buckets=[1024, 2048, 4096]), + max_batch_size=32, +)() diff --git a/docs/batcher/concat_batcher.rst b/docs/batcher/concat_batcher.rst new file mode 100644 index 0000000..dc8798c --- /dev/null +++ b/docs/batcher/concat_batcher.rst @@ -0,0 +1,8 @@ +========================== +Concatenate Batcher +========================== + +The `ConcatBatcher` simply concatenates input numpy arrays into larger ones. It requires the input arrays to have campatible shapes. No padding will be performed. + +.. literalinclude:: ./concat_batcher_example.py + :language: python diff --git a/docs/batcher/concat_batcher_example.py b/docs/batcher/concat_batcher_example.py new file mode 100644 index 0000000..8f7c046 --- /dev/null +++ b/docs/batcher/concat_batcher_example.py @@ -0,0 +1,24 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import numpy as np + +from batch_inference import ModelHost +from batch_inference.batcher import ConcatBatcher + + +class MyModel: + def __init__(self): + self.op = np.matmul + + # x.shape: [batch_size, m, k], y.shape: [batch_size, k, n] + def predict_batch(self, x, y): + res = self.op(x, y) + return res + + +model_host = ModelHost( + MyModel, + batcher=ConcatBatcher(), + max_batch_size=32, +)() diff --git a/docs/batcher/customized_batcher.py b/docs/batcher/customized_batcher.py new file mode 100644 index 0000000..7b0e640 --- /dev/null +++ b/docs/batcher/customized_batcher.py @@ -0,0 +1,72 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any, List, Tuple + +import numpy as np + +from batch_inference import ModelHost +from batch_inference.batcher import Batcher + + +class MyModel: + def __init__(self): + self.op = np.matmul + + # x.shape: [batch_size, m, k], y.shape: [batch_size, k, n] + def predict_batch(self, x, y): + res = self.op(x, y) + return res + + +class MyBatcher(Batcher): + def __init__(self): + super().__init__() + + def batch(self, requests: List[Tuple[np.ndarray]]): + """Batch n requests into 1 batched request + + Args: + requests: [(x, y)], each request is a (x, y) from predict method + + Returns: + batched requests: (x_batched, y_batched) for predict_batch method + context for unbatch: List[int], the batch sizes of each original (x, y) + """ + + x_batched = np.concatenate([item[0] for item in requests], axis=0) + y_batched = np.concatenate([item[1] for item in requests], axis=0) + batch_sizes = [item[0].shape[0] for item in requests] + return (x_batched, y_batched), batch_sizes + + def unbatch( + self, + batched_response: np.ndarray, + unbatch_ctx: List[int], + ): + """Unbatch 1 batched response into n responses + + Args: + batched_responses: batched_res from predict_batch method, + batched_res=batched_x * batched_y + unbatch_ctx: batch_sizes of n original requests + + Returns: + responses: [res1, res2, ...], each res will be returned by predict method, + res=x * y + """ + + batch_sizes = unbatch_ctx + responses = [] + start = 0 + for n in batch_sizes: + responses.append(batched_response[start : start + n]) + start += n + return responses + + +model_host = ModelHost( + MyModel, + batcher=MyBatcher(), + max_batch_size=32, +)() diff --git a/docs/batcher/seq_batcher.rst b/docs/batcher/seq_batcher.rst new file mode 100644 index 0000000..cb1805e --- /dev/null +++ b/docs/batcher/seq_batcher.rst @@ -0,0 +1,8 @@ +========================== +Sequence Batcher +========================== + +The `SeqBatcher` provide batching support for sequence inputs with variant lengths, which is common in Natural Language Tasks. It is a wrapper of `ConcatBatcher`. It will first pad the inputs with padding tokens, then concatenate the batched inputs with `ConcatBatcher`. + +.. literalinclude:: ./seq_batcher_example.py + :language: python diff --git a/docs/batcher/seq_batcher_example.py b/docs/batcher/seq_batcher_example.py new file mode 100644 index 0000000..10835cd --- /dev/null +++ b/docs/batcher/seq_batcher_example.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from batch_inference import ModelHost +from batch_inference.batcher import SeqBatcher + + +class MyModel: + def __init__(self): + pass + + # input: [batch_size, n] + def predict_batch(self, input): + res = input + return res + + +model_host = ModelHost( + MyModel, + batcher=SeqBatcher(), + max_batch_size=32, +)() diff --git a/docs/batcher/what_is_batcher.rst b/docs/batcher/what_is_batcher.rst new file mode 100644 index 0000000..304b880 --- /dev/null +++ b/docs/batcher/what_is_batcher.rst @@ -0,0 +1,33 @@ +========================== +What is Batcher? +========================== + +The `Batcher` class defines how mutiple requests from users are merged into a batch request, and how the batch response is splitted into multiple responses for users. It exposes two interface functions for developers to implement their own batcher that fits a specific scenario. + +* `batch`. Merge multiple requests into a batch request, and attach a context object for unbatching. +* `unbatch`. Split a batch response into multiple responses, with the help of the context object. + +Built-in Batchers +=================================== + +The following built-in batchers are provided in `batch_inference.batcher` module. Both `numpy.ndarry` and `torch.Tensor` are supported as input date types. + +* :doc:`ConcatBatcher <./concat_batcher>`. Simply concatenate multiple requests into a single batch request. +* :doc:`SeqBatcher <./seq_batcher>`. Pad sequences of different lengths with padding tokens before concatenation. +* :doc:`BucketSeqBatcher <./bucket_seq_batcher>`. Group sequences of similar lengths, pad them with padding tokens and then concatenate. + +Implement Customized Batcher +====================================== + +The following example shows how to implement a customized batcher. The batcher merges multiple requests into a single batch request, and splits the batch response into multiple responses. + +.. literalinclude:: ./customized_batcher.py + :language: python + + +The MultiBatcher Class +======================================= + +In most cases, we merge multiple requests from users into a single batch request. :doc:`ConcatBatcher <./concat_batcher>` and :doc:`SeqBatcher <./seq_batcher>` are examples of this. + +Sometimes, we want to group multiple requests first and then merged requests in each group into a batch request. In this case, the Batcher should inherits the `batcher.MultiBatcher` class instead of `batche.Batcher`. :doc:`BucketSeqBatcher <./bucket_seq_batcher>` is an example of this. diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..d91e7fb --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,85 @@ +# Configuration file for the Sphinx documentation builder. +# +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Path setup -------------------------------------------------------------- + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +# import os +# import sys +# sys.path.insert(0, os.path.abspath('.')) +import sphinx_rtd_theme + +# -- Project information ----------------------------------------------------- + +project = "Batch Inference Toolkit" +copyright = "2023, AI Platform Team, STCA, Microsoft" +author = "Yong Huang, Xi Chen, Lu Ye, Ze Tao" + +# The full version, including alpha/beta/rc tags +release = "1.0rc0" + + +# -- General configuration --------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + "myst_parser", + "sphinx_rtd_theme", + "sphinx.ext.autodoc", + "sphinx.ext.napoleon", + "sphinxemoji.sphinxemoji", +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ["_templates"] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = "sphinx_rtd_theme" + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = [] + +html_theme_options = {} + +autoclass_content = "both" + + +def process_signature(app, what, name, obj, options, signature, return_annotation): + print(f">>>> {return_annotation} {name}{signature}") + + if isinstance(signature, str): + signature = signature.replace("pyis.python.lib.pyis_python.ops", "ops") + signature = signature.replace("pyis.python.ops", "ops") + + if isinstance(return_annotation, str): + return_annotation = return_annotation.replace( + "pyis.python.lib.pyis_python.ops", + "ops", + ) + return_annotation = return_annotation.replace("pyis.python.ops", "ops") + + return (signature, return_annotation) + + +def setup(app): + app.connect("autodoc-process-signature", process_signature) diff --git a/docs/examples/fc.py b/docs/examples/fc.py new file mode 100644 index 0000000..d28f3e4 --- /dev/null +++ b/docs/examples/fc.py @@ -0,0 +1,72 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +import torch + +from batch_inference import ModelHost +from batch_inference.batcher import ConcatBatcher + + +class BatchFCModule: + def __init__(self, input_size, output_size, batch): + self.batch = batch + self.weights = torch.randn(input_size, output_size) + self.host = ModelHost( + model_obj=self, + batcher=ConcatBatcher(tensor="pt"), + max_batch_size=32, + wait_ms=-1, + wait_n=8, + ) + self.compute_time = {} + self.lock = threading.Lock() + + def predict_batch(self, input): + start_time = time.perf_counter_ns() + res = torch.matmul(input, self.weights) + compute_time = time.perf_counter_ns() - start_time + # print(f"input size: {input.size()}, compute time: {compute_time:.6f} seconds, thread: {threading.get_ident()}") + with self.lock: + self.compute_time[threading.get_ident()] = ( + 0 + if threading.get_ident() not in self.compute_time + else self.compute_time[threading.get_ident()] + compute_time + ) + return res + + # input: [batch_size, input_size] + def predict(self, input): + if self.batch: + return self.host.predict(input) + else: + return self.predict_batch(input) + + +def main(): + input_size, output_size = 1024, 10240 + sut = BatchFCModule(input_size, output_size, batch=True) + sut.host.start() + + def request(): + input = torch.rand(2, input_size) + return sut.predict(input) + + print("Start Running") + start_time = time.time() + with ThreadPoolExecutor(max_workers=32) as executor: + futures = [executor.submit(request) for i in range(10000)] + result = [f.result() for f in as_completed(futures)] + end_time = time.time() + sut.host.stop() + + print(f"Total time: {end_time - start_time:.6f} seconds") + compute_time = {k: v / 1e9 for k, v in sut.compute_time.items()} + print(f"Compute time ({len(compute_time)}): {compute_time} seconds") + + +if __name__ == "__main__": + main() diff --git a/docs/examples/fc_async.py b/docs/examples/fc_async.py new file mode 100644 index 0000000..50f8483 --- /dev/null +++ b/docs/examples/fc_async.py @@ -0,0 +1,67 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import threading +import time + +import torch + +from batch_inference.aio import ModelHost +from batch_inference.batcher import ConcatBatcher + + +class BatchFCModule: + def __init__(self, input_size, output_size): + self.weights = torch.randn(input_size, output_size) + self.host = ModelHost( + model_obj=self, + batcher=ConcatBatcher(tensor="pt"), + max_batch_size=32, + wait_ms=-1, + wait_n=8, + ) + self.compute_time = {} + + def predict_batch(self, input): + start_time = time.perf_counter_ns() + res = torch.matmul(input, self.weights) + compute_time = time.perf_counter_ns() - start_time + # print(f"input size: {input.size()}, compute time: {compute_time:.6f} seconds, thread: {threading.get_ident()}") + self.compute_time[threading.get_ident()] = ( + 0 + if threading.get_ident() not in self.compute_time + else self.compute_time[threading.get_ident()] + compute_time + ) + return res + + async def predict(self, input): + await self.host.predict(input) + + +async def main(): + input_size, output_size = 1024, 10240 + sut = BatchFCModule(input_size, output_size) + await sut.host.start() + + async def request(): + input = torch.randn(2, input_size) + await sut.predict(input) + + print("Start Running") + start_time = time.time() + tasks = [asyncio.create_task(request()) for i in range(10000)] + await asyncio.wait(tasks) + end_time = time.time() + await sut.host.stop() + + print(f"Total time: {end_time - start_time:.6f} seconds") + compute_time = {k: v / 1e9 for k, v in sut.compute_time.items()} + print(f"Compute time ({len(compute_time)}): {compute_time} seconds") + + +if __name__ == "__main__": + # cProfile.run('asyncio.run(main())', 'fc_stats.cprofile') + # p = pstats.Stats('fc_stats.cprofile') + # p.strip_dirs().sort_stats(SortKey.TIME).print_stats() + asyncio.run(main()) diff --git a/docs/examples/llm_embedding.py b/docs/examples/llm_embedding.py new file mode 100644 index 0000000..fa07bdb --- /dev/null +++ b/docs/examples/llm_embedding.py @@ -0,0 +1,53 @@ +from typing import Tuple + +import torch +from flask import Flask, jsonify, request +from transformers import BertModel, BertTokenizer + +from batch_inference import batching +from batch_inference.batcher.bucket_seq_batcher import BucketSeqBatcher + + +@batching( + batcher=BucketSeqBatcher(padding_tokens=[0, 0], buckets=[25, 50, 100, 200, 500], tensor='pt'), + max_batch_size=5, +) +class BertEmbeddingModel: + def __init__(self): + self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") + self.model = BertModel.from_pretrained("bert-base-uncased") + + def preprocessing(self, text) -> Tuple[torch.Tensor, torch.Tensor]: + encoded_text = self.tokenizer.encode_plus( + text, return_attention_mask=True, return_tensors="pt" + ) + return encoded_text["input_ids"], encoded_text["attention_mask"] + + def predict_batch( + self, input_ids: torch.Tensor, attention_mask: torch.Tensor + ) -> Tuple[torch.Tensor]: + with torch.no_grad(): + outputs = self.model(input_ids, attention_mask) + embedding = outputs[0] + return embedding + + +bert_host = BertEmbeddingModel.host() +bert_host.start() +app = Flask(__name__) + + +@app.route("/embed", methods=["POST"]) +def embed(): + text = request.json["text"] + input_ids, attention_mask = bert_host.model_obj.preprocessing(text) + embedding = bert_host.predict(input_ids, attention_mask) + return jsonify({"embedding": embedding.tolist()}) + + +if __name__ == "__main__": + # send warm up query to verify + input_ids, attention_mask = bert_host.model_obj.preprocessing("hello world") + embedding = bert_host.predict(input_ids, attention_mask) + print(embedding) + app.run(threaded=True) diff --git a/docs/examples/llm_embedding.rst b/docs/examples/llm_embedding.rst new file mode 100644 index 0000000..fe09ba7 --- /dev/null +++ b/docs/examples/llm_embedding.rst @@ -0,0 +1,6 @@ +========================== +LLM Embedding +========================== + +.. literalinclude:: ./llm_embedding.py + :language: python diff --git a/docs/examples/test_gpt2.py b/docs/examples/test_gpt2.py new file mode 100644 index 0000000..9cc1183 --- /dev/null +++ b/docs/examples/test_gpt2.py @@ -0,0 +1,170 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import threading +import unittest +from typing import Any, List, Tuple + +import torch +from transformers import GPT2LMHeadModel, GPT2Tokenizer + +from batch_inference import batching +from batch_inference.batcher import Batcher + + +class Gpt2Batcher(Batcher): + def __init__(self) -> None: + super().__init__() + self.pad_token = [0] + + def batch(self, requests: List[Tuple]) -> Tuple[List[Tuple], Any]: + """Batch multiple M requests into 1 batched requests + Pad and concat input_ids from predict method, and create attention_masks based on input_ids + + Args: + requests: a list of M requests, each request is a tuple of input_id + + Returns: + batched requests: a list of 1 batched requests, each batched request is a tuple of input_ids and attention_masks + context for unbatch: will be passed to unbatch method + """ + lengths = [len(request[0]) for request in requests] + max_len = max(lengths) + input_ids = [] + attention_masks = [] + # pad input id and create attention mask + for request in requests: + ids = request[0] + pad_len = max_len - len(ids) + input_id = ids + pad_len * self.pad_token + attention_mask = [1] * len(ids) + [0] * pad_len + input_ids.append(input_id) + attention_masks.append(attention_mask) + return [input_ids, attention_masks], None + + def unbatch(self, batched_responses: List, unbatch_ctx: Any) -> List: + """Unbatch 1 batched responses into M responses + batched_responses is already a list of M responses, return it directly + """ + return batched_responses + + +# eos_token = 50256 # Token of <|endoftext|> +eos_token = 13 # Token of . Use for debugging + + +@batching(batcher=Gpt2Batcher(), max_batch_size=4) +class Gpt2Model: + def __init__(self): + self.model = GPT2LMHeadModel.from_pretrained("gpt2") + self.max_output_length = 64 + self.eos_token = eos_token + + def predict_batch(self, input_ids, attention_masks): + past_key_values = None + length = len(input_ids) + input_ids = torch.tensor(input_ids) + attention_masks = torch.tensor(attention_masks) + results = [] + for i in range(length): + results.append([]) + + processing = list(range(length)) + + for i in range(self.max_output_length): + output = self.model( + input_ids, + attention_mask=attention_masks, + past_key_values=past_key_values, + use_cache=True, + ) + past_key_values = output.past_key_values + # Simply take token with max prod as generated token + logits = output.logits[..., -1, :] + tokens = torch.argmax(logits, dim=1) + + tokens = tokens.tolist() + finished = [] + for i, actual_index in enumerate(processing): + results[actual_index].append(tokens[i]) + if tokens[i] == eos_token: + finished.append(i) + + if finished: + finished.reverse() + # Delete finished requests + for index in finished: + del processing[index] + del tokens[index] + past_key_values = self.delete_index_past_key_values( + past_key_values, index + ) + attention_masks = torch.cat( + [attention_masks[:index], attention_masks[index + 1 :]] + ) + if not processing: + break + + # input_ids will contain generated token id, while attention_masks contains historical masks + input_ids = torch.tensor(tokens, dtype=torch.int32).unsqueeze(1) + new_mask = torch.ones(len(processing), dtype=torch.int32).unsqueeze(1) + attention_masks = torch.cat([attention_masks, new_mask], dim=1) + + return results + + def delete_index_past_key_values(self, past_key_values, index): + # Shape: (layer, k&v, [batchsize, head, token length, head dim]), for example: (12, 2, [batchsize, 12, n, 64]) for GPT2 small + deleted = [] + for i, layer in enumerate(past_key_values): + deleted.append([]) + for tensor in layer: + deleted[i].append(torch.cat([tensor[:index], tensor[index + 1 :]])) + deleted[i] = tuple(deleted[i]) + return tuple(deleted) + + +class TestModelHost(unittest.TestCase): + def setUp(self) -> None: + self.model_host = Gpt2Model.host() + self.model_host.start() + + def tearDown(self) -> None: + self.model_host.stop() + + def test_simple(self): + text = "The Manhattan bridge" + tokenizer = GPT2Tokenizer.from_pretrained("gpt2") + input_ids = tokenizer.encode(text) + output_ids = self.model_host.predict(input_ids) + result = tokenizer.decode(output_ids) + self.assertTrue(len(result) > 0) + + def test_concurrent(self): + def send_requests(): + texts = [ + "The Manhattan bridge", + "Python lists are a data structure similar to dynamically", + "Tuples in Python are a data structure used to store multiple elements in a single variable. Just like list data structure, a tuple is", + "Even though List and Tuple are different data structures", + "An operating system (OS) is the program that", + "An operating system brings powerful benefits to computer software", + "As long as each application accesses the same resources and services", + "An operating system provides three essential capabilities: ", + "The GUI is most frequently used by casual or end users that are primarily", + "An operating system can", + ] + tokenizer = GPT2Tokenizer.from_pretrained("gpt2") + for i in range(0, 10): + input_ids = tokenizer.encode(texts[i]) + output_ids = self.model_host.predict(input_ids) + result = tokenizer.decode(output_ids) + # print("Input: " + texts[i] + "------- Output: " + result) + self.assertTrue(len(result) > 0) + + threads = [threading.Thread(target=send_requests) for i in range(0, 10)] + [th.start() for th in threads] + [th.join() for th in threads] + + +if __name__ == "__main__": + unittest.main() diff --git a/docs/figures/batching_overview.png b/docs/figures/batching_overview.png new file mode 100644 index 0000000..48fd2cd Binary files /dev/null and b/docs/figures/batching_overview.png differ diff --git a/docs/figures/model_host.png b/docs/figures/model_host.png new file mode 100644 index 0000000..2359123 Binary files /dev/null and b/docs/figures/model_host.png differ diff --git a/docs/figures/model_host_class.png b/docs/figures/model_host_class.png new file mode 100644 index 0000000..6185727 Binary files /dev/null and b/docs/figures/model_host_class.png differ diff --git a/docs/figures/remote_model_host.png b/docs/figures/remote_model_host.png new file mode 100644 index 0000000..17fbe58 Binary files /dev/null and b/docs/figures/remote_model_host.png differ diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..0b0dd80 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,33 @@ +.. include:: README.rst + +.. toctree:: + :maxdepth: 1 + :titlesonly: + :caption: Overview + + ./README.rst + +.. toctree:: + :maxdepth: 1 + :titlesonly: + :caption: Batching Modes + + ./model_host.rst + ./remote_model_host.rst + +.. toctree:: + :maxdepth: 1 + :titlesonly: + :caption: Batching Strategies + + ./batcher/what_is_batcher.rst + ./batcher/concat_batcher.rst + ./batcher/seq_batcher.rst + ./batcher/bucket_seq_batcher.rst + +.. toctree:: + :maxdepth: 1 + :titlesonly: + :caption: Examples + + ./examples/llm_embedding.rst diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..922152e --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +if "%1" == "" goto help + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/model_host.rst b/docs/model_host.rst new file mode 100644 index 0000000..8c3e75e --- /dev/null +++ b/docs/model_host.rst @@ -0,0 +1,46 @@ +========================== +Local Mode (ModelHost) +========================== + +.. figure:: figures/model_host_class.png + :width: 600 + :align: center + :alt: Model Host + + The ModelHost and aio.ModelHost classes + +Multi-Thread Workers +========================== + +To reach `max_batch_size`, you need to create at least the same `num_threads` as `max_batch_size`. For example, if the max batch size is 8, you need to create at least 8 worker threads. Otherwise, max batch size could never be reached. + +.. figure:: figures/model_host.png + :width: 600 + :align: center + :alt: Model Host + + ModelHost with Multi-Process + Multi-Thread Workers + +AsyncIO Workers +========================== + +Too many threads might be a performance bottleneck for you. For example, if you want the max batch size to be 1024, you need at least 1024 threads to support it. This won't a good idea especially when the model inference time is small and Python GIL overhead can't be ignored. + +In this case, you should use the async version of model host `aio.ModelHost` implemented by Python AsyncIO instead, which has the same APIs as `ModelHost`. Meanwhile, you should also consider to use an async worker in your web server, for example the `uvicorn `__ async worker. + +Multi-Process Workers +========================== + +When a significant amount of the system latency is contributed by pure Python code, such as pre-prcessing or post-processing, instead of model inference, multiple worker processes should be created to avoid the side effect of Python GIL. In this way, each process maintains its own `ModelHost` instance and handles requests separately. If you use Gunicorn to serve the model, the default `synchronous workers `__ are the corresponding choice here. The reason is that machine learning workload is alomost CPU-bound. To make the most efficient use of the CPU cores, you should create as many worker processes as `(2 x $num_cores) + 1`. This is the suggested setting of Gunicorn. Meanwhile, you need to consider the thread pool managed by the deep learning frameworks as well, such as PyTorch, ONNXRuntime or TensorFlow. + +.. code-block:: bash + + gunicorn --workers=5 main:app + +As discussed in previous section, you can still create multiple threads inside each worker process to help each work process reach the max batch size. If you use Gunicorn to serve the model, choose the sync worker type and set the threads setting to more than 1, the `gthread workers `__ type will be used. In this case, the Python application is loaded once per worker process, and each of the threads spawned by the same worker process shares the same memory space. + +.. code-block:: bash + + gunicorn --workers=5 --threads=2 --worker-class=gthread main:app + +Although `Python AsyncIO` is supported by `aio.ModelHost`, Gunicorn only supports gevent backed async workers. Python AsyncIO and gevent might not be compatible and we don't verify that. It is not recommended to use `aio.ModelHost` with Gunicorn. diff --git a/docs/remote_model_host.rst b/docs/remote_model_host.rst new file mode 100644 index 0000000..26d4151 --- /dev/null +++ b/docs/remote_model_host.rst @@ -0,0 +1,23 @@ +================================ +Remote Mode (RemoteModelHost) +================================ + +Setting the number of worker processes to be `(2 x $num_cores) + 1` is perfect for running preprocessing and postprocessing code which is in Python efficient. However, if you plan to run tensor compluations on a GPU device, you will want to have only one DNN model instance created on the GPU device. In this scenario, you should use `RemoteModelHost` instead of `ModelHost`. + +With `RemoteModelHost`, each worker process will still create its own `RemoteModelHost` instance. But internally, only one of the worker processes will really load the model onto GPU and start as the grpc server. All the other worker processes will be grpc clients and won't load the model. When `RemoteModelHost.predict` is called in client worker processes, the query is serialized and sent to the remote grpc server to process. + +Note that you should set `num_processes * num_threads_per_process` to be `max batch size`. Otherwise, it's unable to reach max batch size. + +.. figure:: figures/remote_model_host.png + :width: 600 + :align: center + :alt: Remote Model Host + + RemoteModelHost with Multi-Process + Multi-Thread Workers + +Multiple GPU Devices +========================== + +If you have many GPU cards, to make the best use of them, you will still want to create multiple model instances, with each GPU device serving one model instance. However, `RemoteModelHost` doesn't support the creation of multiple model instances on a single machine node. To reduce the implementaion complexity, it is singleton. + +In this case, we recommend you to apply virtulization techniques. For example, creating multiple VMs or dockers and bind one GPU device to each of them respectively. diff --git a/isort.cfg b/isort.cfg new file mode 100644 index 0000000..98a2542 --- /dev/null +++ b/isort.cfg @@ -0,0 +1,16 @@ +# This is config file for isort - python imports sorting tool +# See https://pycqa.github.io/isort/docs/configuration/options/ for reference +[settings] +profile=black +src_paths=. +line_length=140 +skip_gitignore=True + +# Note! FIRSTPARTY explicilty omiited to merge it with default section THIRDPARTY +# The reason for that is weak support of monorepos by isort. Isort detects firstparty modules +# by looking at src_paths, but we cannot specify all source roots, because it is too dynamic. +# Ideally we should run our custom hook that will detect project boundaries and run isort +# for each project individually. Untill then third-party and first-party imports are combined. +# The only exception is polymer which in separate section and overall good to keep it that way. +sections=FUTURE,STDLIB,POLYMER,THIRDPARTY,LOCALFOLDER +known_polymer=polymer diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..16a027d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,49 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = [ + "setuptools>=61", + "wheel", +] + +[project] +name = "batch-inference" +version = "1.0rc1" +description = "Batch Inference" +readme = "README.md" +authors = [{ name = "Xi Chen", email = "xichen5@microsoft.com" }, + { name = "Lu Ye", email = "luye@microsoft.com" }, + { name = "Yong Huang", email = "yohuan@microsoft.com" }, + { name = "Ze Tao", email = "zetao@microsoft.com" }] +requires-python = ">=3.7" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] +dependencies = [ + "filelock", + "grpcio", + "msgpack", + "msgpack-numpy", + "numpy", +] +optional-dependencies.docs = [ + "flask>=2.0.2", + "furo>=2022.12.7", + "sphinx>=6.1.3", + "sphinx-autodoc-typehints!=1.23.4,>=1.22", + "transformers>=4.27.4", +] +optional-dependencies.testing = [ + "onnxruntime", + "pytest>=7.2.2", + "torch", +] +[project.urls] +"Homepage" = "https://github.com/microsoft/batch-inference.git" + +[tool.setuptools] +include-package-data = false + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..5b7f7a9 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. diff --git a/src/batch_inference/__init__.py b/src/batch_inference/__init__.py new file mode 100644 index 0000000..fb7772f --- /dev/null +++ b/src/batch_inference/__init__.py @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +__version__ = "1.0rc1" + +from .decorators import batching, remote_batching +from .model_host import ModelHost +from .remote_model_host import RemoteModelHost diff --git a/src/batch_inference/aio/__init__.py b/src/batch_inference/aio/__init__.py new file mode 100644 index 0000000..89d5bb3 --- /dev/null +++ b/src/batch_inference/aio/__init__.py @@ -0,0 +1,2 @@ +from .model_host import ModelHost +from .remote_model_host.remote_model_host import RemoteModelHost diff --git a/src/batch_inference/aio/batch_context.py b/src/batch_inference/aio/batch_context.py new file mode 100644 index 0000000..3fa52e9 --- /dev/null +++ b/src/batch_inference/aio/batch_context.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +from typing import Any + + +class BatchContext: + def __init__(self) -> None: + self.requests = [] + self.responses = [] + self.result_ready = asyncio.Event() + self.error = None + + def size(self): + return len(self.requests) + + def add_request(self, request: Any): + self.requests.append(request) + return len(self.requests) - 1 + + def set_result_ready(self): + self.result_ready.set() + + def set_error(self, error: Exception): + self.error = error + self.result_ready.set() diff --git a/src/batch_inference/aio/model_host.py b/src/batch_inference/aio/model_host.py new file mode 100644 index 0000000..74a5379 --- /dev/null +++ b/src/batch_inference/aio/model_host.py @@ -0,0 +1,160 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# -------------------------------------------------------------------------- +import asyncio +import sys +import threading +from types import TracebackType +from typing import Any, Optional, Type + +from ..batcher.batcher import Batcher +from ..batcher.multi_batcher import MultiBatcher +from ..logger import logger +from .batch_context import BatchContext + + +class ModelHost: + def __init__( + self, + model_cls=None, + model_obj=None, + batcher: Batcher = None, + max_batch_size=32, + wait_ms: int = 5, + wait_n: int = 16, + event_loop=None, + ): + if model_cls is None and model_obj is None: + raise RuntimeError(f"Either model_cls or model_obj must be provided") + self.model_cls = model_cls + self.model_obj = model_obj + self.batcher = batcher + self.max_batch_size = max_batch_size + # wait for `wait_ms` ms to see if there's more requests to batch + self.wait_ms = wait_ms + # during wait for `wait_ms` ms, if there's more than `wait_n` requests, + # we stop waiting and start to process the batch immediately + self.wait_n = wait_n + if self.wait_n > self.max_batch_size: + self.wait_n = self.max_batch_size + + self.event_loop = event_loop + self.cv = None + self.batch_queue = [] + self.thread = None + + def __call__(self, *args: Any, **kwds: Any) -> Any: + if self.model_obj is not None: + raise RuntimeError( + f"model_obj has already been set, you can't call __call__() to create it again", + ) + # pass the originl arguments and construtor the origial model instance. + self.model_obj = self.model_cls(*args, **kwds) + return self + + async def predict(self, *input_list): + async with self.cv: + is_first_request = len(self.batch_queue) == 0 + if is_first_request or self.batch_queue[-1].size() >= self.max_batch_size: + self.batch_queue.append(BatchContext()) + # logger.debug(f'I am the first query of a new batch') + # else: + # logger.debug(f'I am added to an existing batch') + last_batch: BatchContext = self.batch_queue[-1] + idx = last_batch.add_request(input_list) + self.cv.notify(n=1) + + await last_batch.result_ready.wait() + if last_batch.error is not None: + raise last_batch.error + return last_batch.responses[idx] + + async def start(self): + if self.event_loop is None: + self.event_loop = asyncio.get_event_loop() + + if sys.version_info >= (3, 10): + batch_queue_lock = asyncio.Lock() + self.cv = asyncio.Condition(lock=batch_queue_lock) + else: + batch_queue_lock = asyncio.Lock(loop=self.event_loop) + self.cv = asyncio.Condition(lock=batch_queue_lock, loop=self.event_loop) + self.thread = threading.Thread(target=self._wait_batch_ready_and_process) + self.thread.start() + + async def stop(self): + logger.debug(f"stop() is called") + async with self.cv: + self.batch_queue.append(None) + self.cv.notify(n=1) + logger.debug(f"notify worker thread to stop") + await self.event_loop.run_in_executor(None, self.thread.join) + logger.debug(f"worker thread is stopped") + + async def _get_new_batch(self): + # WorkerLoop will be notified when there's a new request + # take first BatchContext out of quueue + early_ret_n = self.wait_n + wait_timeout = -1 + if self.wait_ms > 0: + wait_timeout = self.wait_ms / 1000.0 # in seconds + + async with self.cv: + await self.cv.wait_for(lambda: len(self.batch_queue) > 0) + try: + await asyncio.wait_for( + self.cv.wait_for( + lambda: self.batch_queue[0] + is None # should exit, no more request + or len(self.batch_queue[0].requests) >= early_ret_n, + ), + wait_timeout, + ) + except asyncio.TimeoutError: + # logger.info(f'wait batch size to reach {early_ret_n} timeout, actual batch size={len(self.batch_queue[0].requests)}') + pass + batch_ctx: BatchContext = self.batch_queue.pop(0) + + return batch_ctx + + def _wait_batch_ready_and_process(self): + while True: + f = asyncio.run_coroutine_threadsafe(self._get_new_batch(), self.event_loop) + batch_ctx = f.result() + + if batch_ctx is None: + return + + # logger.info(f"get batch of size {len(batch_ctx.requests)}") + + try: + batched_requests, unbatch_ctx = self.batcher.batch(batch_ctx.requests) + if issubclass(type(self.batcher), MultiBatcher): + batched_responses = [] + for i in batched_requests: + batched_responses.append(self.model_obj.predict_batch(*i)) + else: + batched_responses = self.model_obj.predict_batch(*batched_requests) + batch_ctx.responses = self.batcher.unbatch( + batched_responses, + unbatch_ctx, + ) + self.event_loop.call_soon_threadsafe(batch_ctx.set_result_ready) + except Exception as e: + self.event_loop.call_soon_threadsafe( + lambda: batch_ctx.set_error(error=e), + ) + logger.error(e, exc_info=True) + + async def __aenter__(self): + self.start() + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + await self.stop() diff --git a/src/batch_inference/aio/remote_model_host/__init__.py b/src/batch_inference/aio/remote_model_host/__init__.py new file mode 100644 index 0000000..5b7f7a9 --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. diff --git a/src/batch_inference/aio/remote_model_host/model_host.proto b/src/batch_inference/aio/remote_model_host/model_host.proto new file mode 100644 index 0000000..7e7dec4 --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/model_host.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +message Request { + bytes ndarrays = 1; +} + +message Response { + bytes ndarrays = 1; +} + +service ModelHost { + rpc predict (Request) returns (Response) {} +} diff --git a/src/batch_inference/aio/remote_model_host/model_host_client.py b/src/batch_inference/aio/remote_model_host/model_host_client.py new file mode 100644 index 0000000..809d2d0 --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/model_host_client.py @@ -0,0 +1,39 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import grpc +import msgpack + +from ...logger import logger +from . import model_host_pb2, model_host_pb2_grpc, msgpack_serialization + + +class ModelHostClient: + def __init__(self, grpc_port): + self.grpc_port = grpc_port + self.channel = None + self.stub = None + + # read this for details of creating async grpc server: + # https://github.com/grpc/grpc/blob/master/examples/python/helloworld/async_greeter_client.py + async def start(self): + self.channel = grpc.aio.insecure_channel(f"localhost:{self.grpc_port}") + self.stub = model_host_pb2_grpc.ModelHostStub(self.channel) + logger.info(f"model host started as client, will talk to {self.grpc_port}") + + async def stop(self): + pass + + async def predict(self, *input_list): + request_packed = msgpack.packb( + input_list, + use_bin_type=True, + default=msgpack_serialization.encode, + ) + response_pb = await self.stub.predict( + model_host_pb2.Request(ndarrays=request_packed), + ) + response = msgpack.unpackb( + response_pb.ndarrays, object_hook=msgpack_serialization.decode + ) + return response diff --git a/src/batch_inference/aio/remote_model_host/model_host_pb2.py b/src/batch_inference/aio/remote_model_host/model_host_pb2.py new file mode 100644 index 0000000..7b3fd22 --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/model_host_pb2.py @@ -0,0 +1,32 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: batch_inference/aio/remote_model_host/model_host.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n6batch_inference/aio/remote_model_host/model_host.proto"\x1b\n\x07Request\x12\x10\n\x08ndarrays\x18\x01 \x01(\x0c"\x1c\n\x08Response\x12\x10\n\x08ndarrays\x18\x01 \x01(\x0c\x32-\n\tModelHost\x12 \n\x07predict\x12\x08.Request\x1a\t.Response"\x00\x62\x06proto3', +) + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, + "batch_inference.aio.remote_model_host.model_host_pb2", + globals(), +) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _REQUEST._serialized_start = 58 + _REQUEST._serialized_end = 85 + _RESPONSE._serialized_start = 87 + _RESPONSE._serialized_end = 115 + _MODELHOST._serialized_start = 117 + _MODELHOST._serialized_end = 162 +# @@protoc_insertion_point(module_scope) diff --git a/src/batch_inference/aio/remote_model_host/model_host_pb2.pyi b/src/batch_inference/aio/remote_model_host/model_host_pb2.pyi new file mode 100644 index 0000000..a69ccb3 --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/model_host_pb2.pyi @@ -0,0 +1,19 @@ +from typing import ClassVar as _ClassVar +from typing import Optional as _Optional + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message + +DESCRIPTOR: _descriptor.FileDescriptor + +class Request(_message.Message): + __slots__ = ["ndarrays"] + NDARRAYS_FIELD_NUMBER: _ClassVar[int] + ndarrays: bytes + def __init__(self, ndarrays: _Optional[bytes] = ...) -> None: ... + +class Response(_message.Message): + __slots__ = ["ndarrays"] + NDARRAYS_FIELD_NUMBER: _ClassVar[int] + ndarrays: bytes + def __init__(self, ndarrays: _Optional[bytes] = ...) -> None: ... diff --git a/src/batch_inference/aio/remote_model_host/model_host_pb2_grpc.py b/src/batch_inference/aio/remote_model_host/model_host_pb2_grpc.py new file mode 100644 index 0000000..733884a --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/model_host_pb2_grpc.py @@ -0,0 +1,82 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from batch_inference.aio.remote_model_host import \ + model_host_pb2 as \ + batch__inference_dot_aio_dot_remote__model__host_dot_model__host__pb2 + + +class ModelHostStub: + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.predict = channel.unary_unary( + "/ModelHost/predict", + request_serializer=batch__inference_dot_aio_dot_remote__model__host_dot_model__host__pb2.Request.SerializeToString, + response_deserializer=batch__inference_dot_aio_dot_remote__model__host_dot_model__host__pb2.Response.FromString, + ) + + +class ModelHostServicer: + """Missing associated documentation comment in .proto file.""" + + def predict(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_ModelHostServicer_to_server(servicer, server): + rpc_method_handlers = { + "predict": grpc.unary_unary_rpc_method_handler( + servicer.predict, + request_deserializer=batch__inference_dot_aio_dot_remote__model__host_dot_model__host__pb2.Request.FromString, + response_serializer=batch__inference_dot_aio_dot_remote__model__host_dot_model__host__pb2.Response.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "ModelHost", + rpc_method_handlers, + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class ModelHost: + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def predict( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/ModelHost/predict", + batch__inference_dot_aio_dot_remote__model__host_dot_model__host__pb2.Request.SerializeToString, + batch__inference_dot_aio_dot_remote__model__host_dot_model__host__pb2.Response.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/src/batch_inference/aio/remote_model_host/model_host_server.py b/src/batch_inference/aio/remote_model_host/model_host_server.py new file mode 100644 index 0000000..54b5a6b --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/model_host_server.py @@ -0,0 +1,99 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +from typing import Any + +import grpc +import msgpack + +from ...batcher.batcher import Batcher +from ...logger import logger +from ..model_host import ModelHost +from . import model_host_pb2, model_host_pb2_grpc, msgpack_serialization + + +class GrpcServicer(model_host_pb2_grpc.ModelHostServicer): + def __init__(self, host: ModelHost) -> None: + self.host = host + + async def predict(self, request, context): + # logger.debug(f'received request from remote client') + request_py = msgpack.unpackb( + request.ndarrays, object_hook=msgpack_serialization.decode + ) + # request_py is a python list/tuple, unpack it to be positional arguments + response_py = await self.host.predict(*request_py) + respone_pb = msgpack.packb( + response_py, use_bin_type=True, default=msgpack_serialization.encode + ) + return model_host_pb2.Response(ndarrays=respone_pb) + + async def stop(self): + await self.host.stop() + + +class ModelHostServer: + def __init__( + self, + file_lock, + model_cls, + grpc_port, + batcher: Batcher, + max_batch_size=32, + wait_ms: int = 5, + wait_n: int = 16, + event_loop=None, + ): + self.model_cls = model_cls + self.batcher = batcher + self.max_batch_size = max_batch_size + self.wait_ms = wait_ms + self.wait_n = wait_n + + if event_loop is None: + self.event_loop = asyncio.get_event_loop() + else: + self.event_loop = event_loop + + self.grpc_port = grpc_port + self.file_lock = file_lock + + self.model_host: ModelHost = None + self.server = None + + def __call__(self, *args: Any, **kwds: Any) -> Any: + self.model_host = ModelHost( + model_cls=self.model_cls, + batcher=self.batcher, + max_batch_size=self.max_batch_size, + wait_ms=self.wait_ms, + wait_n=self.wait_n, + event_loop=self.event_loop, + )(*args, **kwds) + return self + + # read this for details of creating async grpc server: + # https://github.com/grpc/grpc/blob/master/examples/python/helloworld/async_greeter_client.py + async def start(self): + await self.model_host.start() + self.server = grpc.aio.server() + model_host_pb2_grpc.add_ModelHostServicer_to_server( + GrpcServicer(self.model_host), + self.server, + ) + self.server.add_insecure_port(f"[::]:{self.grpc_port}") + await self.server.start() + logger.info(f"model host started as server, listening on {self.grpc_port}") + + async def stop(self): + if self.server is not None: + await self.server.stop(grace=3) + + if self.model_host is not None: + await self.model_host.stop() + + self.file_lock.release() + + async def predict(self, *input_list): + return await self.model_host.predict(*input_list) diff --git a/src/batch_inference/aio/remote_model_host/msgpack_serialization.py b/src/batch_inference/aio/remote_model_host/msgpack_serialization.py new file mode 100644 index 0000000..677ae76 --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/msgpack_serialization.py @@ -0,0 +1,29 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import pickle + +import msgpack_numpy + +# Batch Inference Toolkit doesn't depend on PyTorch to run, The try...catch... is to +# make sure the code works no matter PyTorch is installed or not. +try: + import torch + + has_torch = True +except ImportError: + has_torch = False + + +def encode(obj): + if has_torch and isinstance(obj, torch.Tensor): + return {"torch.Tensor": True, "data": pickle.dumps(obj)} + else: + return msgpack_numpy.encode(obj) + + +def decode(obj): + if "torch.Tensor" in obj: + return pickle.loads(obj["data"]) + else: + return msgpack_numpy.decode(obj) diff --git a/src/batch_inference/aio/remote_model_host/readme.txt b/src/batch_inference/aio/remote_model_host/readme.txt new file mode 100644 index 0000000..35b6312 --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/readme.txt @@ -0,0 +1,2 @@ +# Run this command in source root to generate protobuf py files: +python -m grpc_tools.protoc -I src --python_out=src --pyi_out=src --grpc_python_out=src src/batch_inference/aio/remote_model_host/*.proto diff --git a/src/batch_inference/aio/remote_model_host/remote_model_host.py b/src/batch_inference/aio/remote_model_host/remote_model_host.py new file mode 100644 index 0000000..75a4c02 --- /dev/null +++ b/src/batch_inference/aio/remote_model_host/remote_model_host.py @@ -0,0 +1,88 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +import tempfile +import threading +from types import TracebackType +from typing import Any, Optional, Type + +from filelock import FileLock, Timeout + +from ...batcher.batcher import Batcher +from ...logger import logger +from .model_host_client import ModelHostClient +from .model_host_server import ModelHostServer + + +class RemoteModelHost: + def __init__( + self, + model_cls, + grpc_port, + batcher: Batcher, + max_batch_size=32, + wait_ms: int = 5, + wait_n: int = 16, + event_loop=None, + ): + file_lock = self._get_server_file_lock(grpc_port) + self.is_server = file_lock is not None + if self.is_server: + self.processor = ModelHostServer( + file_lock, + model_cls, + grpc_port, + batcher, + max_batch_size=max_batch_size, + wait_ms=wait_ms, + wait_n=wait_n, + event_loop=event_loop, + ) + else: + self.processor = ModelHostClient(grpc_port) + + def __call__(self, *args: Any, **kwds: Any) -> Any: + if self.is_server: + self.processor(*args, **kwds) + return self + + async def start(self): + await self.processor.start() + + async def stop(self): + await self.processor.stop() + + async def predict(self, *input_list): + return await self.processor.predict(*input_list) + + async def __aenter__(self): + await self.start() + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + await self.stop() + + def _get_server_file_lock(self, grpc_port): + # need file lock to ensure that only one RemoteModelHost is the server. + # In case the file does not exist, a+ is needed to ensure that the file lock works. r+ will throw an exception. + # don't unlock it until: + # 1. RemoteModelHost.stop is called + # 2. The proecss exits either normally or exceptionally, then the file is close by OS and lock is released. + # This is to make sure that you could re-create the server. + lock_file = os.path.join(tempfile.gettempdir(), f"filelock_{grpc_port}") + file_lock = FileLock(lock_file, timeout=1) + try: + file_lock.acquire() + logger.info( + f"file lock acquired, process: {os.getpid()}, thread: {threading.get_ident()}", + ) + return file_lock + except Timeout as e: + logger.info(f"can't get server file lock: {e}") + return diff --git a/src/batch_inference/batch_context.py b/src/batch_inference/batch_context.py new file mode 100644 index 0000000..35a377c --- /dev/null +++ b/src/batch_inference/batch_context.py @@ -0,0 +1,30 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from threading import Condition +from typing import Any + + +class BatchContext: + def __init__(self, request: Any) -> None: + self.request = request + self.response = None + self.error = None + self.response_ready = Condition() + + def get_response(self) -> Any: + with self.response_ready: + self.response_ready.wait() + if self.error is not None: + raise self.error + return self.response + + def set_response(self, response: Any = None): + with self.response_ready: + self.response = response + self.response_ready.notify() + + def set_error(self, error: Exception): + with self.response_ready: + self.error = error + self.response_ready.notify() diff --git a/src/batch_inference/batcher/__init__.py b/src/batch_inference/batcher/__init__.py new file mode 100644 index 0000000..78fdb0d --- /dev/null +++ b/src/batch_inference/batcher/__init__.py @@ -0,0 +1,5 @@ +from .batcher import Batcher +from .bucket_seq_batcher import BucketSeqBatcher +from .concat_batcher import ConcatBatcher +from .seq_batcher import SeqBatcher +from .multi_batcher import MultiBatcher \ No newline at end of file diff --git a/src/batch_inference/batcher/batcher.py b/src/batch_inference/batcher/batcher.py new file mode 100644 index 0000000..6337046 --- /dev/null +++ b/src/batch_inference/batcher/batcher.py @@ -0,0 +1,32 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any, List, Tuple + + +class Batcher: + """A batcher should contain a pair of batch and unbatch method""" + + def batch(self, requests: List[Tuple[Any]]) -> Tuple[Tuple[Any], Any]: + """Batch multiple N requests into 1 batched requests + + Args: + requests: a list of N requests, each request is a tuple of args from predict method + + Returns: + batched requests: 1 batched requests, which is a tuple of args for predict_batch method + context for unbatch: will be passed to unbatch method + """ + raise NotImplementedError() + + def unbatch(self, batched_response: Any, unbatch_ctx: Any) -> List: + """Unbatch 1 batched response into N responses + + Args: + batched_response: 1 batched responses from predict_batch method + unbatch_ctx: context from batch method + + Returns: + responses: a list of N responses, each response will be returned by predict method + """ + raise NotImplementedError() diff --git a/src/batch_inference/batcher/bucket_seq_batcher.py b/src/batch_inference/batcher/bucket_seq_batcher.py new file mode 100644 index 0000000..10b2733 --- /dev/null +++ b/src/batch_inference/batcher/bucket_seq_batcher.py @@ -0,0 +1,117 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any, List, Tuple + +from .multi_batcher import MultiBatcher +from .seq_batcher import SeqBatcher + + +class BucketSeqBatcher(MultiBatcher): + def __init__( + self, padding_tokens: List[Any], buckets: List[int], tensor: str = "np" + ) -> None: + """ + Pad and Concat sequence requests. Put requests with same length into one bucket + + Args: + padding_tokens: a List of M padding tokens for corresponding request arg + buckets: a List of request length for each bucket + """ + super().__init__() + self.buckets = buckets + self.seq_batcher = SeqBatcher(padding_tokens=padding_tokens, tensor=tensor) + + def batch(self, requests: List[Tuple[Any]]) -> Tuple[List[Tuple[Any]], Any]: + """Batch multiple M requests into N batched requests (M>=N) + + Args: + requests: a list of M requests, each request is a tuple of args from predict method + + Returns: + batched requests: a list of N batched requests, each batched request is a tuple of args for predict_batch method + context for unbatch: will be passed to unbatch method + """ + + # Sort requests by sequence length + requests = [ + ( + item, + idx, + self.seq_batcher.get_seq_length(item), + self.get_batch_size(item), + ) + for idx, item in enumerate(requests) + ] + sorted_requests = sorted(requests, key=lambda x: x[2]) + groups = [[]] + + # Group requests by sequence length + i, j = 0, 0 + while i < len(self.buckets) and j < len(sorted_requests): + if sorted_requests[j][2] <= self.buckets[i]: + groups[-1].append(sorted_requests[j]) + j += 1 + else: + groups.append([]) + i += 1 + + groups.append(sorted_requests[j:]) + groups = [group for group in groups if len(group) != 0] + + # Batch requests + batched_requests = [] + unbatch_ctx = [] + for group in groups: + concated, _ = self.seq_batcher.batch([item[0] for item in group]) + batched_requests.append(tuple(concated)) + unbatch_ctx.append([(item[1], item[3]) for item in group]) + + return batched_requests, unbatch_ctx + + def unbatch(self, batched_responses: List[Any], unbatch_ctx: Any) -> List: + """Unbatch responses from the batched_responses. The order of the responses + should be the same as the order of the inputs. + + Args: + batched_responses (List): The output from the batched responses. + unbatch_ctx (Any): The unbatch context. + + Returns: + List: The unbatched responses. + """ + responses = [] + + for batched_response, group_info in zip(batched_responses, unbatch_ctx): + start = 0 + for slice_info in group_info: # slice_info: [(original index, batch_size)] + response = self.slice(batched_response, start, start + slice_info[1]) + response = ( + response, + slice_info[0], + ) # append original index for reordering + responses.append(response) + start += slice_info[1] + + sorted_responses = sorted(responses, key=lambda x: x[-1]) + return [x[0] for x in sorted_responses] + + def get_batch_size(self, single_request): + return self.seq_batcher.tensor_size(single_request[0], dim=0) + + def slice(self, batched_response, start, end): + return_type = type(batched_response) + if return_type is tuple: + return tuple( + [ + self.seq_batcher.tensor_slice(output, start, end) + for output in batched_response + ] + ) + elif return_type is list: + return [ + self.seq_batcher.tensor_slice(output, start, end) + for output in batched_response + ] + else: + return self.seq_batcher.tensor_slice(batched_response, start, end) diff --git a/src/batch_inference/batcher/concat_batcher.py b/src/batch_inference/batcher/concat_batcher.py new file mode 100644 index 0000000..3823c46 --- /dev/null +++ b/src/batch_inference/batcher/concat_batcher.py @@ -0,0 +1,76 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any, List, Tuple + +from . import tensor_ops_np, tensor_ops_pt +from .batcher import Batcher + + +class ConcatBatcher(Batcher): + def __init__(self, tensor="np") -> None: + super().__init__() + if tensor == "np": + self.tensor_concat = tensor_ops_np.TensorOps.concat + self.tensor_slice = tensor_ops_np.TensorOps.slice + self.tensor_size = tensor_ops_np.TensorOps.size + elif tensor == "pt": + self.tensor_concat = tensor_ops_pt.TensorOps.concat + self.tensor_slice = tensor_ops_pt.TensorOps.slice + self.tensor_size = tensor_ops_pt.TensorOps.size + else: + raise NotImplementedError(f"unsupported tensor type {tensor}") + + def batch(self, requests: List[Tuple[Any]]) -> Tuple[Tuple[Any], Any]: + """Batch multiple N requests into 1 batched requests. + Each requests can contains M args, concat N ndarray requests into 1 ndarray for each arg + + Args: + requests: a list of N requests, each request is a tuple of M numpy.ndarray from predict method + + Returns: + batched requests: 1 batched requests, which is a tuple of numpy.ndarray for predict_batch method + context for unbatch: will be passed to unbatch method + """ + num_of_inputs = len(requests[0]) + batched_request = [] + batch_sizes = [self.tensor_size(item[0], dim=0) for item in requests] + for idx in range(0, num_of_inputs): + batched_request.append(self.tensor_concat([item[idx] for item in requests])) + return tuple(batched_request), batch_sizes + + def unbatch(self, batched_response: Any, unbatch_ctx: Any) -> List: + """Unbatch 1 batched response into N responses + If the batched respone contains K output variables, then each response will also contains K output variables. + + Args: + batched_response: 1 batched response from predict_batch method + unbatch_ctx: context from batch method + + Returns: + responses: a list of N responses, each response will be returned by predict method + """ + batch_sizes = unbatch_ctx + return_type = type(batched_response) + responses = [] + + start = 0 + for n in batch_sizes: + if return_type is tuple: + response = tuple( + [ + self.tensor_slice(output, start, start + n) + for output in batched_response + ] + ) + elif return_type is list: + response = [ + self.tensor_slice(output, start, start + n) + for output in batched_response + ] + else: # single output + response = self.tensor_slice(batched_response, start, start + n) + responses.append(response) + start += n + + return responses diff --git a/src/batch_inference/batcher/multi_batcher.py b/src/batch_inference/batcher/multi_batcher.py new file mode 100644 index 0000000..8e60115 --- /dev/null +++ b/src/batch_inference/batcher/multi_batcher.py @@ -0,0 +1,34 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any, List, Tuple + +from .batcher import Batcher + + +class MultiBatcher(Batcher): + """A batcher should contain a pair of batch and unbatch method""" + + def batch(self, requests: List[Tuple[Any]]) -> Tuple[List[Tuple[Any]], Any]: + """Batch multiple M requests into N batched requests (M>=N) + + Args: + requests: a list of M requests, each request is a tuple of args from predict method + + Returns: + batched requests: a list of N batched requests, each batched request is a tuple of args for predict_batch method + context for unbatch: will be passed to unbatch method + """ + raise NotImplementedError() + + def unbatch(self, batched_responses: List[Any], unbatch_ctx: Any) -> List: + """Unbatch N batched responses into M responses + + Args: + batched_responses: a list of N batched responses from predict_batch method + unbatch_ctx: context from batch method + + Returns: + responses: a list of M responses, each response will be returned by predict method + """ + raise NotImplementedError() diff --git a/src/batch_inference/batcher/seq_batcher.py b/src/batch_inference/batcher/seq_batcher.py new file mode 100644 index 0000000..7cdaa32 --- /dev/null +++ b/src/batch_inference/batcher/seq_batcher.py @@ -0,0 +1,71 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any, List, Tuple + +import numpy as np + +from . import tensor_ops_np, tensor_ops_pt +from .concat_batcher import ConcatBatcher + + +class SeqBatcher(ConcatBatcher): + def __init__(self, padding_tokens=List[Any], tensor: str = "np") -> None: + """ + Pad and Concat sequence requests + + Args: + padding_tokens: a List of M padding tokens for corresponding request arg. Use None if an argument doesn't need padding + """ + super().__init__(tensor=tensor) + if tensor == "np": + self.tensor_pad = tensor_ops_np.TensorOps.pad + elif tensor == "pt": + self.tensor_pad = tensor_ops_pt.TensorOps.pad + else: + raise NotImplementedError(f"unsupported tensor type {tensor}") + self.padding_tokens = padding_tokens + + def batch(self, requests: List[Tuple[Any]]) -> Tuple[Tuple[Any], Any]: + """Batch multiple N requests into 1 batched requests. + Each requests can contains M args, concat N ndarray requests into 1 ndarray for each arg + + Args: + requests: a list of N requests, each request is a tuple of M numpy.ndarray from predict method + + Returns: + batched requests: 1 batched requests, which is a tuple of numpy.ndarray for predict_batch method + context for unbatch: will be passed to unbatch method + """ + + padded_requests = requests + if self.padding_tokens: + max_len = -1 + for req in requests: + seq_len = self.get_seq_length(req) + if seq_len > max_len: + max_len = seq_len + padded_requests = [self.pad(req, max_len) for req in requests] + + return super().batch(padded_requests) + + def get_seq_length(self, single_request: List[Any]) -> int: + """ + Return length of first argument, which means other arguments will be padded to same length of first argument + """ + return self.tensor_size(single_request[0], dim=-1) + + def pad(self, single_request: Tuple[Any], seq_len: int) -> Tuple[Any]: + # original request: (input1, input2, ...) + + padded_inputs = [] + for idx, input in enumerate(single_request): + pad_length = seq_len - self.tensor_size(input, dim=-1) + if pad_length > 0: + p1d = (0, pad_length) + padded_inputs.append( + self.tensor_pad(input, p1d, self.padding_tokens[idx]), + ) + else: + padded_inputs.append(input) + return padded_inputs diff --git a/src/batch_inference/batcher/tensor_ops_np.py b/src/batch_inference/batcher/tensor_ops_np.py new file mode 100644 index 0000000..1649645 --- /dev/null +++ b/src/batch_inference/batcher/tensor_ops_np.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import numpy + + +class TensorOps: + @staticmethod + def concat(tensors): + return numpy.concatenate(tensors, axis=0) + + @staticmethod + def slice(tensor, start, end): + return tensor[start:end] + + @staticmethod + def size(tensor, dim: int): + return tensor.shape[dim] + + @staticmethod + def pad(tensor, pad, value): + pad_width = [(0, 0)] * (tensor.ndim - 1) + [pad] + return numpy.pad(tensor, pad_width, "constant", constant_values=value) diff --git a/src/batch_inference/batcher/tensor_ops_pt.py b/src/batch_inference/batcher/tensor_ops_pt.py new file mode 100644 index 0000000..4ebb3cb --- /dev/null +++ b/src/batch_inference/batcher/tensor_ops_pt.py @@ -0,0 +1,25 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +try: + import torch +except: + pass + + +class TensorOps: + @staticmethod + def concat(tensors): + return torch.cat(tensors, dim=0) + + @staticmethod + def slice(tensor, start, end): + return tensor[start:end] + + @staticmethod + def size(tensor, dim: int): + return tensor.size(dim=dim) + + @staticmethod + def pad(tensor, pad, value): + return torch.nn.functional.pad(tensor, pad, "constant", value) diff --git a/src/batch_inference/decorators.py b/src/batch_inference/decorators.py new file mode 100644 index 0000000..285192e --- /dev/null +++ b/src/batch_inference/decorators.py @@ -0,0 +1,68 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Any, Callable, Dict, Tuple, Type + +from .batcher.concat_batcher import Batcher, ConcatBatcher +from .logger import logger +from .model_host import ModelHost +from .remote_model_host import RemoteModelHost + + +def batching(*arg: Tuple, **kwargs: Dict[str, Any]) -> Callable[[Type], Type]: + if len(arg) > 0 and _valid_model(arg[0]): + return _decorator()(arg[0]) + else: + return _decorator(*arg, **kwargs) + + +def remote_batching( + grpc_port: int, batcher: Batcher = None, max_batch_size: int = 32 +) -> Callable[[Type], Type]: + if batcher is None: + batcher = ConcatBatcher() + + def add_host(model_cls: Type) -> Type: + def host(*args, **kwargs): + logger.debug(f"create ModelHost for {str(model_cls)}") + model_host = RemoteModelHost( + model_cls=model_cls, + grpc_port=grpc_port, + batcher=batcher, + max_batch_size=max_batch_size, + ) + model_host(*args, **kwargs) + return model_host + + setattr(model_cls, "host", host) + return model_cls + + return add_host + + +def _valid_model(obj): + method = getattr(obj, "predict_batch", None) + return callable(method) + + +def _decorator( + batcher: Batcher = None, max_batch_size: int = 32 +) -> Callable[[Type], Type]: + if batcher is None: + batcher = ConcatBatcher() + + def add_host(model_cls: Type) -> Type: + def host(*args, **kwargs) -> ModelHost: + logger.debug(f"create ModelHost for {str(model_cls)}") + model_host = ModelHost( + model_cls=model_cls, + batcher=batcher, + max_batch_size=max_batch_size, + ) + model_host(*args, **kwargs) + return model_host + + setattr(model_cls, "host", host) + return model_cls + + return add_host diff --git a/src/batch_inference/logger.py b/src/batch_inference/logger.py new file mode 100644 index 0000000..a4c62ae --- /dev/null +++ b/src/batch_inference/logger.py @@ -0,0 +1,16 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import logging +import sys + +logger = logging.getLogger("batch-inference") +logger.setLevel(logging.INFO) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter( + "%(asctime)s - %(process)d:%(thread)d - %(levelname)s - %(message)s", +) +handler.setFormatter(formatter) +logger.addHandler(handler) diff --git a/src/batch_inference/model_host.py b/src/batch_inference/model_host.py new file mode 100644 index 0000000..6148038 --- /dev/null +++ b/src/batch_inference/model_host.py @@ -0,0 +1,151 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import logging +import queue +import threading +import time +from types import TracebackType +from typing import Any, List, Optional, Type + +from .batch_context import BatchContext +from .batcher.batcher import Batcher +from .batcher.multi_batcher import MultiBatcher +from .logger import logger + + +class ModelHost: + def __init__( + self, + model_cls=None, + model_obj=None, + batcher: Batcher = None, + max_batch_size=32, + wait_n=8, + wait_ms: int = 5, + num_workers: int = 1, + ): + if model_cls is None and model_obj is None: + raise RuntimeError(f"Either model_cls or model_obj must be provided") + self.model_cls = model_cls + self.model_obj = model_obj + self.batcher = batcher + self.max_batch_size = max_batch_size + self.wait_n = wait_n + self.num_workers = num_workers + self.wait_ms = wait_ms + self.cycle_time = 0.1 + self.stopped = True + # double queue size to avoid blocking + self.batch_queue = [queue.Queue(maxsize=2 * max_batch_size)] * num_workers + self.threads = [] + + def __call__(self, *args: Any, **kwargs: Any) -> Any: + if self.model_obj is not None: + raise RuntimeError( + f"model_obj has already been set, you can't call __call__() to create it again", + ) + # pass the original arguments and constructor the original model instance. + self.model_obj = self.model_cls(*args, **kwargs) + return self + + def predict(self, *input_list): + if self.stopped: + raise RuntimeError(f"model host is stopped, can't predict anymore") + batch_ctx = BatchContext(input_list) + idx = batch_ctx.__hash__() % self.num_workers + self.batch_queue[idx].put(batch_ctx) + return batch_ctx.get_response() + + def start(self): + if not self.stopped: + raise RuntimeError(f"model host is already started") + + self.stopped = False + for i in range(self.num_workers): + thread = threading.Thread( + target=self._wait_batch_ready_and_process, args=(i,) + ) + thread.start() + self.threads.append(thread) + + def stop(self): + logger.debug(f"stop() is called") + self.stopped = True + logger.debug(f"notify worker thread to stop") + for thread in self.threads: + thread.join() + logger.debug(f"worker thread is stopped") + + def _get_new_batch(self, idx) -> List[BatchContext]: + # blocking util get at least request + try: + batch_list: List[BatchContext] = [ + self.batch_queue[idx].get(block=True, timeout=self.cycle_time) + ] + except queue.Empty: + return [] + + # append the left requests + while ( + len(batch_list) < self.max_batch_size and not self.batch_queue[idx].empty() + ): + try: + batch_list.append(self.batch_queue[idx].get(block=False)) + except queue.Empty: + break + + # check if we still need to wait + if self.wait_ms < 0 or len(batch_list) >= self.wait_n: + return batch_list + + # wait for `wait_ms` ms to see if there's more requests to batch + current_time = time.time() + end_time = current_time + self.wait_ms / 1000.0 + while end_time - current_time > 0 and len(batch_list) < self.wait_n: + try: + batch_list.append( + self.batch_queue[idx].get(timeout=end_time - current_time) + ) + current_time = time.time() + except queue.Empty: + break + return batch_list + + def _wait_batch_ready_and_process(self, idx): + while not self.stopped: + batch_list = self._get_new_batch(idx) + if len(batch_list) == 0: + continue + logging.debug(f"Batch Size: {len(batch_list)}") + try: + requests = [batch_ctx.request for batch_ctx in batch_list] + batched_requests, unbatch_ctx = self.batcher.batch(requests) + if issubclass(type(self.batcher), MultiBatcher): + batched_responses = [] + for i in batched_requests: + batched_responses.append(self.model_obj.predict_batch(*i)) + else: + batched_responses = self.model_obj.predict_batch(*batched_requests) + responses = self.batcher.unbatch( + batched_responses, + unbatch_ctx, + ) + for i, ctx in enumerate(batch_list): + ctx.set_response(responses[i]) + except Exception as e: + for ctx in batch_list: + ctx.set_error(e) + logger.error(e, exc_info=True) + + def __enter__(self): + self.start() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.stop() diff --git a/src/batch_inference/remote_model_host.py b/src/batch_inference/remote_model_host.py new file mode 100644 index 0000000..b6e8618 --- /dev/null +++ b/src/batch_inference/remote_model_host.py @@ -0,0 +1,76 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import threading +from types import TracebackType +from typing import Any, Optional, Tuple, Type + +from .aio.remote_model_host import remote_model_host +from .batcher.batcher import Batcher +from .logger import logger + + +class RemoteModelHost(remote_model_host.RemoteModelHost): + def __init__( + self, + model_cls: Type, + grpc_port: int, + batcher: Batcher, + max_batch_size=32, + wait_ms: int = 5, + wait_n: int = 32, + ): + # Event loop is coupled to thread. You cannot share the loop between different threads, + # except call_soon_threadsafe()/run_coroutine_threadsafe() calls. + # With sync APIs, we need another dedicated thread to run the event loop. + # We can't run event loop in the caller thread that instantiates the class + # and calls the predict API, it will block the caller thread. + self.event_loop = asyncio.new_event_loop() + super().__init__( + model_cls, + grpc_port=grpc_port, + batcher=batcher, + max_batch_size=max_batch_size, + wait_ms=wait_ms, + wait_n=wait_n, + event_loop=self.event_loop, + ) + self.event_loop_thread = None + + def start(self): + if self.event_loop_thread: + raise RuntimeError("Don't call start method twice in remote model host!") + + def run_event_loop(): + asyncio.set_event_loop(self.event_loop) + self.event_loop.run_forever() + logger.info(f"event loop execution thread is terminated") + + self.event_loop_thread = threading.Thread(target=run_event_loop) + self.event_loop_thread.start() + asyncio.run_coroutine_threadsafe(super().start(), self.event_loop).result() + + def predict(self, *input_list: Tuple) -> Any: + f = asyncio.run_coroutine_threadsafe( + super().predict(*input_list), + self.event_loop, + ) + return f.result() + + def stop(self): + asyncio.run_coroutine_threadsafe(super().stop(), self.event_loop).result() + self.event_loop.call_soon_threadsafe(self.event_loop.stop) + self.event_loop_thread.join() + + def __enter__(self) -> None: + self.start() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.stop() diff --git a/tests/gen_test_model.py b/tests/gen_test_model.py new file mode 100644 index 0000000..b16fed7 --- /dev/null +++ b/tests/gen_test_model.py @@ -0,0 +1,29 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import torch + + +class Mul(torch.nn.Module): + def forward(self, x, y): + res = torch.matmul(x, y) + return res + + +x = torch.randn(2, 3, 3) +y = torch.randn(2, 3, 3) + +model = Mul() + +torch.onnx.export( + model, # model being run + (x, y), # model input (or a tuple for multiple inputs) + "matmul.onnx", # where to save the model (can be a file or file-like object) + input_names=["x", "y"], # the model's input names + output_names=["res"], + dynamic_axes={ + "x": {0: "batch_size"}, # variable length axes + "y": {0: "batch_size"}, + "res": {0: "batch_size"}, + }, +) # the model's output names diff --git a/tests/matmul.onnx b/tests/matmul.onnx new file mode 100644 index 0000000..0a2a8c0 --- /dev/null +++ b/tests/matmul.onnx @@ -0,0 +1,22 @@ +pytorch1.13.1:” + +x +yres/MatMul"MatMul torch_jitZ! +x + +  +batch_size + +Z! +y + +  +batch_size + +b# +res + +  +batch_size + +B \ No newline at end of file diff --git a/tests/test_bucket_seq_batcher.py b/tests/test_bucket_seq_batcher.py new file mode 100644 index 0000000..0706936 --- /dev/null +++ b/tests/test_bucket_seq_batcher.py @@ -0,0 +1,74 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import unittest + +import numpy as np + +from batch_inference.batcher import BucketSeqBatcher + + +class TestModelHost(unittest.TestCase): + def test_batch(self): + buckets = [3, 5] + batcher = BucketSeqBatcher(padding_tokens=[0], buckets=buckets) + requests = [ + (np.random.random([1, 1, 8]),), + (np.random.random([1, 1, 4]),), + (np.random.random([1, 1, 9]),), + (np.random.random([1, 1, 5]),), + (np.random.random([1, 1, 2]),), + ] + batched_requests, unbatch_ctx = batcher.batch(requests) + self.assertEqual(len(batched_requests), 3) + for batched_request in batched_requests: + self.assertEqual(len(batched_request), 1) + self.assertEqual(batched_requests[0][0].shape, (1, 1, 2)) + self.assertEqual(batched_requests[1][0].shape, (2, 1, 5)) + self.assertEqual(batched_requests[2][0].shape, (2, 1, 9)) + + def test_unbatch(self): + buckets = [3, 5] + batcher = BucketSeqBatcher(padding_tokens=[0], buckets=buckets) + requests = [ + (np.random.random([1, 1, 8]),), + (np.random.random([1, 1, 4]),), + (np.random.random([1, 1, 9]),), + (np.random.random([1, 1, 5]),), + (np.random.random([1, 1, 2]),), + ] + batched_requests, unbatch_ctx = batcher.batch(requests) + responses = batcher.unbatch(batched_requests, unbatch_ctx) + self.assertEqual(len(responses), len(requests)) + for i in range(len(requests)): + padded_request = np.zeros(responses[i][0].shape) + padded_request[:, :, : requests[i][0].shape[2]] = requests[i][0] + self.assertTrue(np.equal(responses[i][0], padded_request).all()) + + def test_prebatched_input(self): + buckets = [5, 10] + batcher = BucketSeqBatcher(padding_tokens=[0], buckets=buckets) + requests = [ + [np.random.random([3, 1, 8])], + [np.random.random([1, 1, 4])], + [np.random.random([2, 1, 9])], + [np.random.random([1, 1, 5])], + [np.random.random([3, 1, 2])], + ] + batched_requests, unbatch_ctx = batcher.batch(requests) + self.assertEqual(len(batched_requests), 2) + for batched_request in batched_requests: + self.assertEqual(len(batched_request), 1) + self.assertEqual(batched_requests[0][0].shape, (5, 1, 5)) + self.assertEqual(batched_requests[1][0].shape, (5, 1, 9)) + + responses = batcher.unbatch(batched_requests, unbatch_ctx) + self.assertEqual(len(responses), len(requests)) + for i in range(len(requests)): + padded_request = np.zeros(responses[i][0].shape) + padded_request[:, :, : requests[i][0].shape[2]] = requests[i][0] + self.assertTrue(np.equal(responses[i][0], padded_request).all()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_decorator.py b/tests/test_decorator.py new file mode 100644 index 0000000..f35d8f9 --- /dev/null +++ b/tests/test_decorator.py @@ -0,0 +1,77 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +import unittest + +import numpy as np +import onnxruntime as ort + +from batch_inference import batching +from batch_inference.batcher import ConcatBatcher + + +@batching +class DecoratorModel: + def __init__(self, model_file): + self.ort_sess = ort.InferenceSession(model_file) + + def predict_batch(self, x): + res = self.ort_sess.run(None, {"x": x, "y": x}) + print(f"{type(res)}, {type(res[0])}") + return res[0] # 'res' + + +@batching() +class DecoratorModelDefaultArgs: + def __init__(self, model_file): + self.ort_sess = ort.InferenceSession(model_file) + + def predict_batch(self, x): + res = self.ort_sess.run(None, {"x": x, "y": x}) + return res[0] # 'res' + + +@batching(ConcatBatcher()) +class DecoratorModelWithArg: + def __init__(self, model_file): + self.ort_sess = ort.InferenceSession(model_file) + + def predict_batch(self, x): + res = self.ort_sess.run(None, {"x": x, "y": x}) + return res[0] # 'res' + + +class TestDecorator(unittest.TestCase): + def setUp(self) -> None: + model_file = os.path.join(os.path.dirname(__file__), "matmul.onnx") + self.decorator_model = DecoratorModel.host(model_file) + self.decorator_model.start() + self.decorator_model_default_args = DecoratorModelDefaultArgs.host(model_file) + self.decorator_model_default_args.start() + self.decorator_model_with_args = DecoratorModelWithArg.host(model_file) + self.decorator_model_with_args.start() + + def tearDown(self) -> None: + self.decorator_model.stop() + self.decorator_model_default_args.stop() + self.decorator_model_with_args.stop() + + def test_decorator(self): + x = np.random.randn(1, 3, 3).astype("f") + res = self.decorator_model.predict(x) + self.assertTrue(np.allclose(res, np.matmul(x, x), rtol=1e-05, atol=1e-05)) + + def test_decorator_default_args(self): + x = np.random.randn(1, 3, 3).astype("f") + res = self.decorator_model_default_args.predict(x) + self.assertTrue(np.allclose(res, np.matmul(x, x), rtol=1e-05, atol=1e-05)) + + def test_decorator_with_args(self): + x = np.random.randn(1, 3, 3).astype("f") + res = self.decorator_model_with_args.predict(x) + self.assertTrue(np.allclose(res, np.matmul(x, x), rtol=1e-05, atol=1e-05)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_model_host.py b/tests/test_model_host.py new file mode 100644 index 0000000..cd19baa --- /dev/null +++ b/tests/test_model_host.py @@ -0,0 +1,77 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import threading +import unittest + +import numpy as np + +from batch_inference import ModelHost +from batch_inference.batcher import ConcatBatcher + + +class MyModel: + def __init__(self, weights): + self.weights = weights + + # x: [batch_size, m, k], self.weights: [k, n] + def predict_batch(self, x): + y = np.matmul(x, self.weights) + return y + + +class TestModelHost(unittest.TestCase): + def setUp(self) -> None: + self.num_workers = 10 + self.weights = np.random.randn(3, 3).astype("f") + self.model_host = ModelHost( + MyModel, + batcher=ConcatBatcher(), + max_batch_size=self.num_workers, + )(self.weights) + self.model_host.start() + + def tearDown(self) -> None: + self.model_host.stop() + + def test_simple(self): + x = np.random.randn(1, 3, 3).astype("f") + y = self.model_host.predict(x) + print(y) + self.assertTrue( + np.allclose(y, np.matmul(x, self.weights), rtol=1e-05, atol=1e-05) + ) + + def test_concurrent(self): + def send_requests(): + for _ in range(0, 10): + x = np.random.randn(1, 3, 3).astype("f") + y = self.model_host.predict(x) + self.assertTrue( + np.allclose(y, np.matmul(x, self.weights), rtol=1e-05, atol=1e-05), + ) + + threads = [ + threading.Thread(target=send_requests) for i in range(0, self.num_workers) + ] + [th.start() for th in threads] + [th.join() for th in threads] + + +class TestModelHostWithAs(unittest.TestCase): + def test_withas(self): + weights = np.random.randn(3, 3).astype("f") + with ModelHost( + MyModel, + batcher=ConcatBatcher(), + max_batch_size=10, + )(weights) as model_host: + x = np.random.randn(1, 3, 3).astype("f") + y = model_host.predict(x) + self.assertTrue( + np.allclose(y, np.matmul(x, weights), rtol=1e-05, atol=1e-05) + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_ort_model.py b/tests/test_ort_model.py new file mode 100644 index 0000000..73bd3a0 --- /dev/null +++ b/tests/test_ort_model.py @@ -0,0 +1,62 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +import threading +import unittest + +import numpy as np +import onnxruntime as ort + +from batch_inference import batching + + +@batching +class MyOrtModel: + def __init__(self, model_file): + self.ort_sess = ort.InferenceSession(model_file) + + def predict_batch(self, x, y): + res = self.ort_sess.run(None, {"x": x, "y": y}) + return res[0] # 'res' + + +class TestModelHost(unittest.TestCase): + def setUp(self) -> None: + model_file = os.path.join(os.path.dirname(__file__), "matmul.onnx") + self.model_host = MyOrtModel.host(model_file) + self.model_host.start() + + def tearDown(self) -> None: + self.model_host.stop() + + def test_simple(self): + x = np.random.randn(1, 3, 3).astype("f") + y = np.random.randn(1, 3, 3).astype("f") + res = self.model_host.predict(x, y) + print(res) + self.assertTrue(np.allclose(res, np.matmul(x, y), rtol=1e-05, atol=1e-05)) + + def test_concurrent(self): + def send_requests(): + for _ in range(0, 10): + x = np.random.randn(1, 3, 3).astype("f") + y = np.random.randn(1, 3, 3).astype("f") + res = self.model_host.predict(x, y) + self.assertTrue( + np.allclose(res, np.matmul(x, y), rtol=1e-05, atol=1e-05), + ) + + threads = [] + for _ in range(0, 10): + threads.append(threading.Thread(target=send_requests)) + + for th in threads: + th.start() + + for th in threads: + th.join() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_remote_model_host.py b/tests/test_remote_model_host.py new file mode 100644 index 0000000..2a51546 --- /dev/null +++ b/tests/test_remote_model_host.py @@ -0,0 +1,84 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import threading +import unittest +from typing import List + +import numpy as np + +from batch_inference import RemoteModelHost +from batch_inference.batcher import ConcatBatcher + + +class MyModel: + def __init__(self, weights): + self.weights = weights + + # x: [batch_size, m, k], self.weights: [k, n] + def predict_batch(self, x): + y = np.matmul(x, self.weights) + return y + + +class TestRemoteModelHostSimple(unittest.TestCase): + def setUp(self) -> None: + self.weights = np.random.randn(3, 3).astype("f") + self.model_host: RemoteModelHost = RemoteModelHost( + MyModel, + grpc_port=23333, + batcher=ConcatBatcher(), + max_batch_size=32, + )(self.weights) + self.model_host.start() + + def tearDown(self) -> None: + self.model_host.stop() + + def test_simple(self): + x = np.random.randn(1, 3, 3).astype("f") + y = self.model_host.predict(x) + print(y) + self.assertTrue( + np.allclose(y, np.matmul(x, self.weights), rtol=1e-05, atol=1e-05) + ) + + +class TestRemoteModelHostConcurrent(unittest.TestCase): + def setUp(self) -> None: + self.num_workers = 10 + self.hosts: List[RemoteModelHost] = [] + self.weights = np.random.randn(3, 3).astype("f") + for _ in range(self.num_workers): + self.hosts.append( + RemoteModelHost( + MyModel, + grpc_port=23333, + batcher=ConcatBatcher(), + max_batch_size=self.num_workers, + )(self.weights), + ) + [host.start() for host in self.hosts] + + def tearDown(self) -> None: + [host.stop() for host in self.hosts] + + def test_concurrent(self): + def send_requests(i): + for _ in range(0, 100): + x = np.random.randn(1, 3, 3).astype("f") + y = self.hosts[i].predict(x) + self.assertTrue( + np.allclose(y, np.matmul(x, self.weights), rtol=1e-05, atol=1e-05), + ) + + threads = [ + threading.Thread(target=send_requests, args=(i,)) + for i in range(0, self.num_workers) + ] + [th.start() for th in threads] + [th.join() for th in threads] + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_start_twice.py b/tests/test_start_twice.py new file mode 100644 index 0000000..932b76e --- /dev/null +++ b/tests/test_start_twice.py @@ -0,0 +1,56 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +import unittest + +import onnxruntime as ort + +from batch_inference import ModelHost, RemoteModelHost +from batch_inference.batcher import ConcatBatcher + + +class MyModel: + def __init__(self, model_file): + self.ort_sess = ort.InferenceSession(model_file) + + def predict_batch(self, x, y): + res = self.ort_sess.run(None, {"x": x, "y": y}) + return res[0] # 'res' + + +class TestRemoteModelHostStartTwice(unittest.TestCase): + def test_remote_start_twice(self): + try: + model_file = os.path.join(os.path.dirname(__file__), "matmul.onnx") + self.model_host: RemoteModelHost = RemoteModelHost( + MyModel, + grpc_port=23333, + batcher=ConcatBatcher(), + max_batch_size=32, + )(model_file) + self.model_host.start() + self.model_host.start() + raise AssertionError + except RuntimeError as e: + print(e) + finally: + self.model_host.stop() + + +class TestModelHostStartTwice(unittest.TestCase): + def test_local_start_twice(self): + try: + model_file = os.path.join(os.path.dirname(__file__), "matmul.onnx") + self.model_host: ModelHost = ModelHost( + MyModel, + batcher=ConcatBatcher(), + max_batch_size=32, + )(model_file) + self.model_host.start() + self.model_host.start() + raise AssertionError + except RuntimeError as e: + print(e) + finally: + self.model_host.stop() diff --git a/tests/test_torch_model.py b/tests/test_torch_model.py new file mode 100644 index 0000000..ac89136 --- /dev/null +++ b/tests/test_torch_model.py @@ -0,0 +1,92 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import threading +import unittest +from typing import List + +import torch + +from batch_inference import ModelHost, RemoteModelHost +from batch_inference.batcher import ConcatBatcher + + +class MyTorchModel: + def __init__(self): + self.weights = torch.ones(3, 3) + + def predict_batch(self, x): + res = torch.matmul(x, self.weights) + return res + + +class TestModelHost(unittest.TestCase): + def setUp(self) -> None: + self.model_host = ModelHost( + MyTorchModel, batcher=ConcatBatcher(tensor="pt"), max_batch_size=32 + )() + self.model_host.start() + + def tearDown(self) -> None: + self.model_host.stop() + + def test_simple(self): + x = torch.randn(1, 3, 3) + res = self.model_host.predict(x) + expected_res = torch.matmul(x, torch.ones(3, 3)) + self.assertTrue(torch.allclose(res, expected_res, rtol=1e-05, atol=1e-05)) + + def test_concurrent(self): + def send_requests(): + for _ in range(0, 10): + x = torch.randn(1, 3, 3) + res = self.model_host.predict(x) + self.assertTrue( + torch.allclose( + res, torch.matmul(x, torch.ones(3, 3)), rtol=1e-05, atol=1e-05 + ), + ) + + threads = [threading.Thread(target=send_requests) for i in range(0, 10)] + [th.start() for th in threads] + [th.join() for th in threads] + + +class TestRemoteModelHostConcurrent(unittest.TestCase): + def setUp(self) -> None: + self.num_workers = 2 + self.hosts: List[RemoteModelHost] = [] + for _ in range(self.num_workers): + self.hosts.append( + RemoteModelHost( + MyTorchModel, + grpc_port=23333, + batcher=ConcatBatcher(tensor="pt"), + max_batch_size=32, + )(), + ) + [host.start() for host in self.hosts] + + def tearDown(self) -> None: + [host.stop() for host in self.hosts] + + def test_concurrent(self): + def send_requests(i): + for _ in range(0, 100): + x = torch.randn(1, 3, 3) + res = self.hosts[i].predict(x) + expected_res = torch.matmul(x, torch.ones(3, 3)) + self.assertTrue( + torch.allclose(res, expected_res, rtol=1e-05, atol=1e-05), + ) + + threads = [ + threading.Thread(target=send_requests, args=(i,)) + for i in range(0, self.num_workers) + ] + [th.start() for th in threads] + [th.join() for th in threads] + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_torch_rnn_batcher.py b/tests/test_torch_rnn_batcher.py new file mode 100644 index 0000000..9baeee0 --- /dev/null +++ b/tests/test_torch_rnn_batcher.py @@ -0,0 +1,130 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import threading +import unittest +from typing import Any, List, Tuple + +import numpy as np +import torch +from torch.nn.utils.rnn import pack_sequence, pad_packed_sequence + +from batch_inference import batching +from batch_inference.batcher import Batcher + + +class RnnModel(torch.nn.Module): + """ + A sample to show batching with pad_packed_sequence for RNN model + """ + + def __init__(self): + super().__init__() + self.rnn = torch.nn.RNN(1, 8, 1, batch_first=True) + self.nn = torch.nn.Linear(1, 8) + + def forward(self, input_ids, langs): + out, h = self.rnn(input_ids) + out_pad = pad_packed_sequence(out, batch_first=True) + embed = self.nn(langs) + label = torch.matmul(h, embed.t()) + return ( + out_pad[0].detach().numpy(), + out_pad[1].detach().numpy(), + label.detach().numpy(), + ) + + +class MyRnnBatcher(Batcher): + def batch(self, requests: List[Tuple[Any]]) -> Tuple[Tuple[Any], Any]: + """Batch multiple N requests into 1 batched requests + + Args: + requests: a list of N requests, each request is a tuple of args from predict method + + Returns: + batched requests: 1 batched requests, which is a tuple of args for predict_batch method + context for unbatch: will be passed to unbatch method + """ + # sort on input_id length before pack_sequence + combined = sorted(enumerate(requests), key=lambda i: i[1][0].size, reverse=True) + # save order to unbatch it later + order = [i[0] for i in combined] + langs = [i[1][1] for i in combined] + + # concat langs from multiple requests + batched_langs = np.concatenate(langs, axis=0) + batched_langs = torch.from_numpy(batched_langs) + + # Apply pack_sequence on input_ids + input_ids = [i[1][0][0] for i in combined] + input_ids = [torch.from_numpy(x).unsqueeze(-1) for x in input_ids] + packed_input_ids = pack_sequence(input_ids, enforce_sorted=True) + return (packed_input_ids, batched_langs), order + + def unbatch(self, batched_response: Any, unbatch_ctx: Any) -> List: + """Unbatch 1 batched response into N responses + + Args: + batched_response: 1 batched responses from predict_batch method + unbatch_ctx: context from batch method + + Returns: + responses: a list of N responses, each response will be returned by predict method + """ + order = unbatch_ctx + responses = [ + ( + batched_response[0][i : i + 1], + batched_response[1][i : i + 1], + batched_response[2][i : i + 1], + ) + for i in range(0, len(order)) + ] + # sort to original order + responses = [responses[i] for i in order] + return responses + + +@batching(batcher=MyRnnBatcher()) +class BatchedModel: + def __init__(self): + self.model = RnnModel() + + def predict_batch(self, input_ids, langs): + return self.model.forward(input_ids, langs) + + +class TestModelHost(unittest.TestCase): + def setUp(self) -> None: + self.batched_model = BatchedModel.host() + self.batched_model.start() + + def tearDown(self) -> None: + self.batched_model.stop() + + def test_simple(self): + x = np.random.randn(1, 4).astype("f") + y = np.random.randn(1, 1).astype("f") + result_batch = self.batched_model.predict(x, y) + print(result_batch) + self.assertTrue(result_batch) + + def test_concurrent(self): + def send_requests(): + for i in range(1, 10): + x = np.random.randn(1, i).astype("f") + y = np.random.randn(1, 1).astype("f") + result_batch = self.batched_model.predict(x, y) + self.assertTrue(result_batch) + + threads = [threading.Thread(target=send_requests) for i in range(0, 10)] + + for th in threads: + th.start() + + for th in threads: + th.join() + + +if __name__ == "__main__": + unittest.main()