Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix linting in examples folder #180

Merged
merged 7 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ format:
cargo +nightly fmt --all

lint:
pip install -e .
isort --check --diff --project=mosec ${PY_SOURCE_FILES}
black --check --diff ${PY_SOURCE_FILES}
pylint -j 8 --recursive=y mosec
pylint -j 8 --recursive=y --disable=import-error examples --generated-members=numpy.*,torch.*,cv2.*,cv.*
pydocstyle mosec
@-rm mosec/_version.py
mypy --install-types --non-interactive ${PY_SOURCE_FILES}
Expand Down
12 changes: 7 additions & 5 deletions examples/custom_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Custom Environment setup"""

import logging
import os

from mosec import Server, Worker
from mosec.errors import ValidationError

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
Expand All @@ -29,28 +29,30 @@


class Inference(Worker):
"""Customisable inference class."""

def __init__(self):
super().__init__()
# initialize your models here and allocate dedicated device to it
device = os.environ["CUDA_VISIBLE_DEVICES"]
logger.info(f"Initializing model on device={device}")
logger.info("Initializing model on device=%s", device)

def forward(self, _: dict) -> dict:
device = os.environ["CUDA_VISIBLE_DEVICES"]
# NOTE self.worker_id is 1-indexed
logger.info(f"Worker={self.worker_id} on device={device} is processing...")
logger.info("Worker=%d on device=%s is processing...", self.worker_id, device)
return {"device": device}


if __name__ == "__main__":
num_device = 4
NUM_DEVICE = 4

def _get_cuda_device(cid: int) -> dict:
return {"CUDA_VISIBLE_DEVICES": str(cid)}

server = Server()

server.append_worker(
Inference, num=num_device, env=[_get_cuda_device(x) for x in range(num_device)]
Inference, num=NUM_DEVICE, env=[_get_cuda_device(x) for x in range(NUM_DEVICE)]
)
server.run()
7 changes: 6 additions & 1 deletion examples/distil_bert_server_pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Mosec with Pytorch Distil BERT."""

import logging
from typing import List, TypeVar
Expand Down Expand Up @@ -38,6 +39,8 @@


class Preprocess(Worker):
"""Preprocess BERT on current setup."""

def __init__(self):
super().__init__()
self.tokenizer = AutoTokenizer.from_pretrained(
Expand All @@ -55,12 +58,14 @@ def forward(self, data: str) -> T:


class Inference(Worker):
"""Pytorch Inference class"""

def __init__(self):
super().__init__()
self.device = (
torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
)
logger.info(f"using computing device: {self.device}")
logger.info("using computing device: %s", self.device)
self.model = AutoModelForSequenceClassification.from_pretrained(
"distilbert-base-uncased-finetuned-sst-2-english"
)
Expand Down
19 changes: 13 additions & 6 deletions examples/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Sample structures for using mosec server."""

import logging
import time
Expand All @@ -29,26 +30,32 @@


class Preprocess(Worker):
"""Sample Class."""

def forward(self, data: dict) -> float:
logger.debug(f"pre received {data}")
logger.debug("pre received %s", data)
# Customized, simple input validation
try:
time = float(data["time"])
count_time = float(data["time"])
except KeyError as err:
raise ValidationError(f"cannot find key {err}")
return time
raise ValidationError(f"cannot find key {err}") from err
return count_time


class Inference(Worker):
"""Sample Class."""

def forward(self, data: float) -> float:
logger.info(f"sleeping for {data} seconds")
logger.info("sleeping for %d seconds", data)
time.sleep(data)
return data


class Postprocess(Worker):
"""Sample Class."""

def forward(self, data: float) -> dict:
logger.debug(f"post received {data}")
logger.debug("post received %f", data)
return {"msg": f"sleep {data} seconds"}


Expand Down
23 changes: 15 additions & 8 deletions examples/plasma_shm_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example: Using Plasma store with mosec plugin PlasmaShmWrapper.

We start a subprocess for the plasma server, and pass the path
to the plasma client which serves as the shm wrapper.
We also register the plasma server process as a daemon, so
that when it exits the service is able to gracefully shutdown
and restarted by the orchestrator.
"""

from functools import partial

from pyarrow import plasma # type: ignore
Expand All @@ -22,27 +31,25 @@


class DataProducer(Worker):
"""Sample Data Producer."""

def forward(self, data: dict) -> bytes:
try:
data_bytes = b"a" * int(data["size"])
except KeyError as err:
raise ValidationError(err)
raise ValidationError(err) from err
return data_bytes


class DataConsumer(Worker):
"""Sample Data Consumer."""

def forward(self, data: bytes) -> dict:
return {"ipc test data length": len(data)}


if __name__ == "__main__":
"""
We start a subprocess for the plasma server, and pass the path
to the plasma client which serves as the shm wrapper.
We also register the plasma server process as a daemon, so
that when it exits the service is able to gracefully shutdown
and restarted by the orchestrator.
"""

# 200 Mb store, adjust the size according to your requirement
with plasma.start_plasma_store(plasma_store_memory=200 * 1000 * 1000) as (
shm_path,
Expand Down
26 changes: 17 additions & 9 deletions examples/python_side_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Adding metrics service."""

import logging
import os
Expand All @@ -20,6 +21,14 @@
from typing import List
from wsgiref.simple_server import make_server

from prometheus_client import ( # type: ignore
CONTENT_TYPE_LATEST,
CollectorRegistry,
Counter,
generate_latest,
multiprocess,
)

thinkcache marked this conversation as resolved.
Show resolved Hide resolved
from mosec import Server, Worker
from mosec.errors import ValidationError

Expand All @@ -39,20 +48,15 @@
pathlib.Path(metric_dir_path).mkdir(parents=True, exist_ok=True)
os.environ["PROMETHEUS_MULTIPROC_DIR"] = metric_dir_path

from prometheus_client import ( # type: ignore
CONTENT_TYPE_LATEST,
CollectorRegistry,
Counter,
generate_latest,
multiprocess,
)

metric_registry = CollectorRegistry()
multiprocess.MultiProcessCollector(metric_registry)
counter = Counter("inference_result", "statistic of result", ("status", "worker_id"))


def metric_app(environ, start_response):
def metric_app(start_response):
"""Sample Metric function."""

data = generate_latest(metric_registry)
start_response(
"200 OK",
Expand All @@ -62,11 +66,15 @@ def metric_app(environ, start_response):


def metric_service(host="", port=8080):
"""Sample metric service."""

with make_server(host, port, metric_app) as httpd:
httpd.serve_forever()


class Inference(Worker):
"""Sample Inference Worker."""

def __init__(self):
super().__init__()
self.worker_id = str(self.worker_id)
Expand All @@ -76,7 +84,7 @@ def deserialize(self, data: bytes) -> int:
try:
res = int(json_data.get("num"))
except Exception as err:
raise ValidationError(err)
raise ValidationError(err) from err
return res

def forward(self, data: List[int]) -> List[bool]:
Expand Down
1 change: 1 addition & 0 deletions examples/resnet50_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Sample Resnet client."""

import base64

Expand Down
41 changes: 24 additions & 17 deletions examples/resnet50_server_pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example: Sample Resnet server."""

import base64
import logging
Expand Down Expand Up @@ -38,35 +39,39 @@


class Preprocess(Worker):
def forward(self, req: dict) -> np.ndarray:
"""Sample Preprocess worker"""

def forward(self, data: dict) -> np.ndarray:
# Customized validation for input key and field content; raise
# ValidationError so that the client can get 422 as http status
try:
image = req["image"]
im = np.frombuffer(base64.b64decode(image), np.uint8)
im = cv2.imdecode(im, cv2.IMREAD_COLOR)[:, :, ::-1] # bgr -> rgb
image = data["image"]
img = np.frombuffer(base64.b64decode(image), np.uint8)
img = cv2.imdecode(img, cv2.IMREAD_COLOR)[:, :, ::-1] # bgr -> rgb
except KeyError as err:
raise ValidationError(f"cannot find key {err}")
raise ValidationError(f"cannot find key {err}") from err
except Exception as err:
raise ValidationError(f"cannot decode as image data: {err}")
raise ValidationError(f"cannot decode as image data: {err}") from err

im = cv2.resize(im, (256, 256))
crop_im = (
im[16 : 16 + 224, 16 : 16 + 224].astype(np.float32) / 255
img = cv2.resize(img, (256, 256))
crop_img = (
img[16 : 16 + 224, 16 : 16 + 224].astype(np.float32) / 255
) # center crop
crop_im -= [0.485, 0.456, 0.406]
crop_im /= [0.229, 0.224, 0.225]
crop_im = np.transpose(crop_im, (2, 0, 1))
return crop_im
crop_img -= [0.485, 0.456, 0.406]
crop_img /= [0.229, 0.224, 0.225]
crop_img = np.transpose(crop_img, (2, 0, 1))
return crop_img


class Inference(Worker):
"""Sample Inference worker"""

def __init__(self):
super().__init__()
self.device = (
torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
)
logger.info(f"using computing device: {self.device}")
logger.info("using computing device: %s", self.device)
self.model = torchvision.models.resnet50(pretrained=True)
self.model.eval()
self.model.to(self.device)
Expand All @@ -77,7 +82,7 @@ def __init__(self):
] * INFERENCE_BATCH_SIZE

def forward(self, data: List[np.ndarray]) -> List[int]:
logger.info(f"processing batch with size: {len(data)}")
logger.info("processing batch with size: %d", len(data))
with torch.no_grad():
batch = torch.stack([torch.tensor(arr, device=self.device) for arr in data])
output = self.model(batch)
Expand All @@ -86,15 +91,17 @@ def forward(self, data: List[np.ndarray]) -> List[int]:


class Postprocess(Worker):
"""Sample Postprocess worker"""

def __init__(self):
super().__init__()
logger.info("loading categories file...")
local_filename, _ = urlretrieve(
"https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt"
)

with open(local_filename) as f:
self.categories = list(map(lambda x: x.strip(), f.readlines()))
with open(local_filename, encoding="utf8") as file:
self.categories = list(map(lambda x: x.strip(), file.readlines()))

def forward(self, data: int) -> dict:
return {"category": self.categories[data]}
Expand Down