From 7e23cd5220f8346fc36dc95a14a4d1767219789c Mon Sep 17 00:00:00 2001 From: mtickoobb Date: Sat, 6 Feb 2021 07:31:11 -0500 Subject: [PATCH] [#1159] Support cloud events for KFServing custom framework (#1343) * parent a0a52f8e7b97276b89393447524c78f2b702c257 author Manasvi Tickoo 1611275174 -0500 committer Manasvi Tickoo 1612370131 -0500 1159 - cloudevent support ofr kfserving PR comments Fix clpudevent headers Add unit test for cloud event messages Add avro unit test Add avro to kfserving requirements.txt PR comments kfserving check ce-contenttyp before unmarshalling Update k8s libraries to 0.19.2 (#1305) * go mod tidy * Update k8s libs to 0.19.2, fix issues, and run make tests * Handle ce to_binary return type wile building resp --- python/kfserving/kfserving/handlers/http.py | 53 +++++-- python/kfserving/kfserving/kfmodel.py | 12 +- python/kfserving/requirements.txt | 2 + python/kfserving/test/test_server.py | 150 +++++++++++++++++++- 4 files changed, 207 insertions(+), 10 deletions(-) diff --git a/python/kfserving/kfserving/handlers/http.py b/python/kfserving/kfserving/handlers/http.py index e1b4d772a1..c02b9aefee 100644 --- a/python/kfserving/kfserving/handlers/http.py +++ b/python/kfserving/kfserving/handlers/http.py @@ -14,9 +14,15 @@ import inspect import tornado.web +import typing import json +import pytz +import cloudevents.exceptions as ce +from cloudevents.http import CloudEvent, from_http, is_binary, is_structured, to_binary, to_structured +from cloudevents.sdk.converters.util import has_binary_headers from http import HTTPStatus from kfserving.kfmodel_repository import KFModelRepository +from datetime import datetime class HTTPHandler(tornado.web.RequestHandler): @@ -43,21 +49,52 @@ def validate(self, request): ) return request - class PredictHandler(HTTPHandler): async def post(self, name: str): + if has_binary_headers(self.request.headers): + try: + #Use default unmarshaller if contenttype is set in header + if "ce-contenttype" in self.request.headers: + body = from_http(self.request.headers, self.request.body) + else: + body = from_http(self.request.headers, self.request.body, lambda x: x) + except (ce.MissingRequiredFields, ce.InvalidRequiredFields, ce.InvalidStructuredJSON, ce.InvalidHeadersFormat, ce.DataMarshallerError, ce.DataUnmarshallerError) as e: + raise tornado.web.HTTPError( + status_code=HTTPStatus.BAD_REQUEST, + reason="Cloud Event Exceptions: %s" % e + ) + else: + try: + body = json.loads(self.request.body) + except json.decoder.JSONDecodeError as e: + raise tornado.web.HTTPError( + status_code=HTTPStatus.BAD_REQUEST, + reason="Unrecognized request format: %s" % e + ) + model = self.get_model(name) - try: - body = json.loads(self.request.body) - except json.decoder.JSONDecodeError as e: - raise tornado.web.HTTPError( - status_code=HTTPStatus.BAD_REQUEST, - reason="Unrecognized request format: %s" % e - ) request = model.preprocess(body) request = self.validate(request) response = (await model.predict(request)) if inspect.iscoroutinefunction(model.predict) else model.predict(request) response = model.postprocess(response) + + if has_binary_headers(self.request.headers): + event = CloudEvent(body._attributes, response) + if is_binary(self.request.headers): + eventheader, eventbody = to_binary(event) + elif is_structured(self.request.headers): + eventheader, eventbody = to_structured(event) + for k, v in eventheader.items(): + if k != "ce-time": + self.set_header(k, v) + else: #utc now() timestamp + self.set_header('ce-time', datetime.utcnow().replace(tzinfo=pytz.utc).strftime('%Y-%m-%dT%H:%M:%S.%f%z')) + + if isinstance(eventbody, (bytes, bytearray)): + response = eventbody + else: + response = eventbody.data + self.write(response) diff --git a/python/kfserving/kfserving/kfmodel.py b/python/kfserving/kfserving/kfmodel.py index 87e183bb0e..45490afc20 100644 --- a/python/kfserving/kfserving/kfmodel.py +++ b/python/kfserving/kfserving/kfmodel.py @@ -51,7 +51,16 @@ def load(self) -> bool: return self.ready def preprocess(self, request: Dict) -> Dict: - return request + # If cloudevent dict, then parse 'data' field. Otherwise, pass through. + if "data" in request \ + and "time" in request \ + and "type" in request \ + and "source" in request \ + and "id" in request \ + and "specversion" in request: + return request["data"] + else: + return request def postprocess(self, request: Dict) -> Dict: return request @@ -91,3 +100,4 @@ async def explain(self, request: Dict) -> Dict: status_code=response.code, reason=response.body) return json.loads(response.body) + diff --git a/python/kfserving/requirements.txt b/python/kfserving/requirements.txt index 4a7116542e..bc26663ab2 100644 --- a/python/kfserving/requirements.txt +++ b/python/kfserving/requirements.txt @@ -12,3 +12,5 @@ adal>=1.2.2 table_logger>=0.3.5 numpy>=1.17.3 azure-storage-blob>=1.3.0,<=2.1.0 +cloudevents>=1.2.0 +avro>=1.10.1 diff --git a/python/kfserving/test/test_server.py b/python/kfserving/test/test_server.py index d8f5fd122a..212cb00d17 100644 --- a/python/kfserving/test/test_server.py +++ b/python/kfserving/test/test_server.py @@ -12,12 +12,41 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest +import avro.io, avro.schema, io, pytest, requests +from cloudevents.http import CloudEvent, to_binary, to_structured from kfserving import kfmodel from kfserving import kfserver from tornado.httpclient import HTTPClientError from kfserving.kfmodel_repository import KFModelRepository +test_avsc_schema = ''' + { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + } + ''' + +def dummy_cloud_event(data, set_contenttype=False): + # This data defines a binary cloudevent + attributes = { + "type": "com.example.sampletype1", + "source": "https://example.com/event-producer", + "specversion": "1.0", + "id": "36077800-0c23-4f38-a0b4-01f4369f670a", + "time": "2021-01-28T21:04:43.144141+00:00" + } + if set_contenttype: + attributes["contenttype"] = "application/json" + + event = CloudEvent(attributes, data) + return event + class DummyModel(kfmodel.KFModel): def __init__(self, name): @@ -34,6 +63,47 @@ async def predict(self, request): async def explain(self, request): return {"predictions": request["instances"]} +class DummyCEModel(kfmodel.KFModel): + def __init__(self, name): + super().__init__(name) + self.name = name + self.ready = False + + def load(self): + self.ready = True + + async def predict(self, request): + return {"predictions": request.data["instances"]} + + async def explain(self, request): + return {"predictions": request.data["instances"]} + +class DummyAvroCEModel(kfmodel.KFModel): + def __init__(self, name): + super().__init__(name) + self.name = name + self.ready = False + + def load(self): + self.ready = True + + def _parserequest(self, request): + schema = avro.schema.parse(test_avsc_schema) + raw_bytes = request.data + bytes_reader = io.BytesIO(raw_bytes) + decoder = avro.io.BinaryDecoder(bytes_reader) + reader = avro.io.DatumReader(schema) + record1 = reader.read(decoder) + return record1 + + async def predict(self, request): + record1 = self._parserequest(request) + return {"predictions": [[record1['name'] , record1['favorite_number'], record1['favorite_color']]]} + + async def explain(self, request): + record1 = self._parserequest(request) + return {"predictions": [[record1['name'] , record1['favorite_number'], record1['favorite_color']]]} + class DummyKFModelRepository(KFModelRepository): def __init__(self, test_load_success: bool): @@ -76,6 +146,19 @@ async def test_predict(self, http_server_client): assert resp.body == b'{"predictions": [[1, 2]]}' assert resp.headers['content-type'] == "application/json; charset=UTF-8" + async def test_predict_ce_structured(self, http_server_client): + + event = dummy_cloud_event({"instances":[[1,2]]}) + headers, body = to_structured(event) + resp = await http_server_client.fetch('/v1/models/TestModel:predict', + method="POST", + headers=headers, + body=body) + + assert resp.code == 200 + assert resp.body == b'{"predictions": [[1, 2]]}' + assert resp.headers['content-type'] == "application/json; charset=UTF-8" + async def test_explain(self, http_server_client): resp = await http_server_client.fetch('/v1/models/TestModel:explain', method="POST", @@ -141,3 +224,68 @@ async def test_model_not_ready_error(self, http_server_client): with pytest.raises(HTTPClientError) as excinfo: _ = await http_server_client.fetch('/v1/models/TestModel') assert excinfo.value.code == 503 + +class TestTFHttpServerCloudEvent(): + + @pytest.fixture(scope="class") + def app(self): # pylint: disable=no-self-use + model = DummyCEModel("TestModel") + server = kfserver.KFServer() + server.register_model(model) + return server.create_application() + + async def test_predict_ce_binary(self, http_server_client): + event = dummy_cloud_event({"instances":[[1,2]]}, set_contenttype=True) + headers, body = to_binary(event) + resp = await http_server_client.fetch('/v1/models/TestModel:predict', + method="POST", + headers=headers, + body=body) + + assert resp.code == 200 + assert resp.body == b'{"predictions": [[1, 2]]}' + assert resp.headers['content-type'] == "application/x-www-form-urlencoded" + assert resp.headers['ce-specversion'] == "1.0" + assert resp.headers['ce-id'] == "36077800-0c23-4f38-a0b4-01f4369f670a" + assert resp.headers['ce-source'] == "https://example.com/event-producer" + assert resp.headers['ce-type'] == "com.example.sampletype1" + assert resp.headers['ce-datacontenttype'] == "application/x-www-form-urlencoded" + assert resp.headers['ce-time'] > "2021-01-28T21:04:43.144141+00:00" + +class TestTFHttpServerAvroCloudEvent(): + + @pytest.fixture(scope="class") + def app(self): # pylint: disable=no-self-use + model = DummyAvroCEModel("TestModel") + server = kfserver.KFServer() + server.register_model(model) + return server.create_application() + + async def test_predict_ce_avro_binary(self, http_server_client): + schema = avro.schema.parse(test_avsc_schema) + msg = {"name": "foo", "favorite_number": 1, "favorite_color": "pink"} + + writer = avro.io.DatumWriter(schema) + bytes_writer = io.BytesIO() + encoder = avro.io.BinaryEncoder(bytes_writer) + writer.write(msg, encoder) + data = bytes_writer.getvalue() + + event = dummy_cloud_event(data, set_contenttype=True) + + # Creates the HTTP request representation of the CloudEvent in binary content mode + headers, body = to_binary(event) + resp = await http_server_client.fetch('/v1/models/TestModel:predict', + method="POST", + headers=headers, + body=body) + + assert resp.code == 200 + assert resp.body == b'{"predictions": [["foo", 1, "pink"]]}' + assert resp.headers['content-type'] == "application/x-www-form-urlencoded" + assert resp.headers['ce-specversion'] == "1.0" + assert resp.headers['ce-id'] == "36077800-0c23-4f38-a0b4-01f4369f670a" + assert resp.headers['ce-source'] == "https://example.com/event-producer" + assert resp.headers['ce-type'] == "com.example.sampletype1" + assert resp.headers['ce-datacontenttype'] == "application/x-www-form-urlencoded" + assert resp.headers['ce-time'] > "2021-01-28T21:04:43.144141+00:00"