Skip to content

Commit

Permalink
[#1159] Support cloud events for KFServing custom framework (#1343)
Browse files Browse the repository at this point in the history
* parent a0a52f8
author Manasvi Tickoo <MTICKOO@bloomberg.net> 1611275174 -0500
committer Manasvi Tickoo <MTICKOO@bloomberg.net> 1612370131 -0500

1159 - cloudevent support ofr kfserving

PR comments

Fix clpudevent headers

Add unit test for cloud event messages

Add avro unit test

Add avro to kfserving requirements.txt

PR comments

kfserving check ce-contenttyp before unmarshalling

Update k8s libraries to 0.19.2 (#1305)

* go mod tidy

* Update k8s libs to 0.19.2, fix issues, and run make tests

* Handle ce to_binary return type wile building resp
  • Loading branch information
mtickoobb committed Feb 6, 2021
1 parent c1b9fa0 commit 7e23cd5
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 10 deletions.
53 changes: 45 additions & 8 deletions python/kfserving/kfserving/handlers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@

import inspect
import tornado.web
import typing
import json
import pytz
import cloudevents.exceptions as ce
from cloudevents.http import CloudEvent, from_http, is_binary, is_structured, to_binary, to_structured
from cloudevents.sdk.converters.util import has_binary_headers
from http import HTTPStatus
from kfserving.kfmodel_repository import KFModelRepository
from datetime import datetime


class HTTPHandler(tornado.web.RequestHandler):
Expand All @@ -43,21 +49,52 @@ def validate(self, request):
)
return request


class PredictHandler(HTTPHandler):
async def post(self, name: str):
if has_binary_headers(self.request.headers):
try:
#Use default unmarshaller if contenttype is set in header
if "ce-contenttype" in self.request.headers:
body = from_http(self.request.headers, self.request.body)
else:
body = from_http(self.request.headers, self.request.body, lambda x: x)
except (ce.MissingRequiredFields, ce.InvalidRequiredFields, ce.InvalidStructuredJSON, ce.InvalidHeadersFormat, ce.DataMarshallerError, ce.DataUnmarshallerError) as e:
raise tornado.web.HTTPError(
status_code=HTTPStatus.BAD_REQUEST,
reason="Cloud Event Exceptions: %s" % e
)
else:
try:
body = json.loads(self.request.body)
except json.decoder.JSONDecodeError as e:
raise tornado.web.HTTPError(
status_code=HTTPStatus.BAD_REQUEST,
reason="Unrecognized request format: %s" % e
)

model = self.get_model(name)
try:
body = json.loads(self.request.body)
except json.decoder.JSONDecodeError as e:
raise tornado.web.HTTPError(
status_code=HTTPStatus.BAD_REQUEST,
reason="Unrecognized request format: %s" % e
)
request = model.preprocess(body)
request = self.validate(request)
response = (await model.predict(request)) if inspect.iscoroutinefunction(model.predict) else model.predict(request)
response = model.postprocess(response)

if has_binary_headers(self.request.headers):
event = CloudEvent(body._attributes, response)
if is_binary(self.request.headers):
eventheader, eventbody = to_binary(event)
elif is_structured(self.request.headers):
eventheader, eventbody = to_structured(event)
for k, v in eventheader.items():
if k != "ce-time":
self.set_header(k, v)
else: #utc now() timestamp
self.set_header('ce-time', datetime.utcnow().replace(tzinfo=pytz.utc).strftime('%Y-%m-%dT%H:%M:%S.%f%z'))

if isinstance(eventbody, (bytes, bytearray)):
response = eventbody
else:
response = eventbody.data

self.write(response)


Expand Down
12 changes: 11 additions & 1 deletion python/kfserving/kfserving/kfmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ def load(self) -> bool:
return self.ready

def preprocess(self, request: Dict) -> Dict:
return request
# If cloudevent dict, then parse 'data' field. Otherwise, pass through.
if "data" in request \
and "time" in request \
and "type" in request \
and "source" in request \
and "id" in request \
and "specversion" in request:
return request["data"]
else:
return request

def postprocess(self, request: Dict) -> Dict:
return request
Expand Down Expand Up @@ -91,3 +100,4 @@ async def explain(self, request: Dict) -> Dict:
status_code=response.code,
reason=response.body)
return json.loads(response.body)

2 changes: 2 additions & 0 deletions python/kfserving/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ adal>=1.2.2
table_logger>=0.3.5
numpy>=1.17.3
azure-storage-blob>=1.3.0,<=2.1.0
cloudevents>=1.2.0
avro>=1.10.1
150 changes: 149 additions & 1 deletion python/kfserving/test/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,41 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import avro.io, avro.schema, io, pytest, requests
from cloudevents.http import CloudEvent, to_binary, to_structured
from kfserving import kfmodel
from kfserving import kfserver
from tornado.httpclient import HTTPClientError
from kfserving.kfmodel_repository import KFModelRepository

test_avsc_schema = '''
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
'''

def dummy_cloud_event(data, set_contenttype=False):
# This data defines a binary cloudevent
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
"specversion": "1.0",
"id": "36077800-0c23-4f38-a0b4-01f4369f670a",
"time": "2021-01-28T21:04:43.144141+00:00"
}
if set_contenttype:
attributes["contenttype"] = "application/json"

event = CloudEvent(attributes, data)
return event


class DummyModel(kfmodel.KFModel):
def __init__(self, name):
Expand All @@ -34,6 +63,47 @@ async def predict(self, request):
async def explain(self, request):
return {"predictions": request["instances"]}

class DummyCEModel(kfmodel.KFModel):
def __init__(self, name):
super().__init__(name)
self.name = name
self.ready = False

def load(self):
self.ready = True

async def predict(self, request):
return {"predictions": request.data["instances"]}

async def explain(self, request):
return {"predictions": request.data["instances"]}

class DummyAvroCEModel(kfmodel.KFModel):
def __init__(self, name):
super().__init__(name)
self.name = name
self.ready = False

def load(self):
self.ready = True

def _parserequest(self, request):
schema = avro.schema.parse(test_avsc_schema)
raw_bytes = request.data
bytes_reader = io.BytesIO(raw_bytes)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
record1 = reader.read(decoder)
return record1

async def predict(self, request):
record1 = self._parserequest(request)
return {"predictions": [[record1['name'] , record1['favorite_number'], record1['favorite_color']]]}

async def explain(self, request):
record1 = self._parserequest(request)
return {"predictions": [[record1['name'] , record1['favorite_number'], record1['favorite_color']]]}


class DummyKFModelRepository(KFModelRepository):
def __init__(self, test_load_success: bool):
Expand Down Expand Up @@ -76,6 +146,19 @@ async def test_predict(self, http_server_client):
assert resp.body == b'{"predictions": [[1, 2]]}'
assert resp.headers['content-type'] == "application/json; charset=UTF-8"

async def test_predict_ce_structured(self, http_server_client):

event = dummy_cloud_event({"instances":[[1,2]]})
headers, body = to_structured(event)
resp = await http_server_client.fetch('/v1/models/TestModel:predict',
method="POST",
headers=headers,
body=body)

assert resp.code == 200
assert resp.body == b'{"predictions": [[1, 2]]}'
assert resp.headers['content-type'] == "application/json; charset=UTF-8"

async def test_explain(self, http_server_client):
resp = await http_server_client.fetch('/v1/models/TestModel:explain',
method="POST",
Expand Down Expand Up @@ -141,3 +224,68 @@ async def test_model_not_ready_error(self, http_server_client):
with pytest.raises(HTTPClientError) as excinfo:
_ = await http_server_client.fetch('/v1/models/TestModel')
assert excinfo.value.code == 503

class TestTFHttpServerCloudEvent():

@pytest.fixture(scope="class")
def app(self): # pylint: disable=no-self-use
model = DummyCEModel("TestModel")
server = kfserver.KFServer()
server.register_model(model)
return server.create_application()

async def test_predict_ce_binary(self, http_server_client):
event = dummy_cloud_event({"instances":[[1,2]]}, set_contenttype=True)
headers, body = to_binary(event)
resp = await http_server_client.fetch('/v1/models/TestModel:predict',
method="POST",
headers=headers,
body=body)

assert resp.code == 200
assert resp.body == b'{"predictions": [[1, 2]]}'
assert resp.headers['content-type'] == "application/x-www-form-urlencoded"
assert resp.headers['ce-specversion'] == "1.0"
assert resp.headers['ce-id'] == "36077800-0c23-4f38-a0b4-01f4369f670a"
assert resp.headers['ce-source'] == "https://example.com/event-producer"
assert resp.headers['ce-type'] == "com.example.sampletype1"
assert resp.headers['ce-datacontenttype'] == "application/x-www-form-urlencoded"
assert resp.headers['ce-time'] > "2021-01-28T21:04:43.144141+00:00"

class TestTFHttpServerAvroCloudEvent():

@pytest.fixture(scope="class")
def app(self): # pylint: disable=no-self-use
model = DummyAvroCEModel("TestModel")
server = kfserver.KFServer()
server.register_model(model)
return server.create_application()

async def test_predict_ce_avro_binary(self, http_server_client):
schema = avro.schema.parse(test_avsc_schema)
msg = {"name": "foo", "favorite_number": 1, "favorite_color": "pink"}

writer = avro.io.DatumWriter(schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(msg, encoder)
data = bytes_writer.getvalue()

event = dummy_cloud_event(data, set_contenttype=True)

# Creates the HTTP request representation of the CloudEvent in binary content mode
headers, body = to_binary(event)
resp = await http_server_client.fetch('/v1/models/TestModel:predict',
method="POST",
headers=headers,
body=body)

assert resp.code == 200
assert resp.body == b'{"predictions": [["foo", 1, "pink"]]}'
assert resp.headers['content-type'] == "application/x-www-form-urlencoded"
assert resp.headers['ce-specversion'] == "1.0"
assert resp.headers['ce-id'] == "36077800-0c23-4f38-a0b4-01f4369f670a"
assert resp.headers['ce-source'] == "https://example.com/event-producer"
assert resp.headers['ce-type'] == "com.example.sampletype1"
assert resp.headers['ce-datacontenttype'] == "application/x-www-form-urlencoded"
assert resp.headers['ce-time'] > "2021-01-28T21:04:43.144141+00:00"

0 comments on commit 7e23cd5

Please sign in to comment.