Skip to content

Commit

Permalink
Merge pull request #227 from lwinterface/feat/validation-error-callback
Browse files Browse the repository at this point in the history
Added validation error callback
  • Loading branch information
artas728 committed Feb 22, 2022
2 parents f37019a + 0fec33b commit de156b0
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
8 changes: 6 additions & 2 deletions examples/simple_examples/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestValidator(Validator):


message = {
"key1": "value1",
"key1": ["value1",],
"key2": 2,
"key3": 3.0,
"key4": [1, 2, 3, 4],
Expand All @@ -50,7 +50,11 @@ async def publish_periodically():
await app.publish(subject="some.publish.subject", message=message)


@app.listen("some.publish.subject", validator=TestValidator)
def validation_error_cb(msg, error):
return {"success": False, "error": "validation_error_cb"}


@app.listen("some.publish.subject", validator=TestValidator, validation_error_cb=validation_error_cb)
async def requests_listener(msg):
log.info(f"got message {msg.data}")
await asyncio.sleep(1)
Expand Down
10 changes: 6 additions & 4 deletions panini/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import typing
import uuid
from types import CoroutineType
from types import CoroutineType, FunctionType


from panini.managers.nats_client import NATSClient
Expand Down Expand Up @@ -33,7 +33,6 @@ def __init__(
max_reconnect_attempts: int = 60,
reconnecting_time_sleep: int = 2,
allocation_queue_group: str = "",

logger_required: bool = True,
logger_files_path: str = None,
logger_in_separate_process: bool = False,
Expand Down Expand Up @@ -183,13 +182,16 @@ def set_logger(
def listen(
self,
subject: list or str,
data_type="json",
validator: type = None,
data_type = "json"
validation_error_cb: FunctionType = None,
):
return self._event_manager.listen(
subject=subject,
data_type=data_type,
validator=validator,
data_type=data_type
validation_error_cb=validation_error_cb

)

async def publish(
Expand Down
21 changes: 14 additions & 7 deletions panini/managers/event_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio

from types import FunctionType
from panini import exceptions
from panini.exceptions import NotReadyError, ValidationError

Expand All @@ -19,11 +19,12 @@ def subscriptions(self):
def listen(
self,
subject: list or str,
validator: type = None,
data_type="json",
validator: type = None,
validation_error_cb: FunctionType = None,
):
def wrapper(function):
function = self.wrap_function_by_validator(function, validator)
function = self.wrap_function_by_validator(function, validator, validation_error_cb)
if type(subject) is list:
for t in subject:
self._check_subscription(t)
Expand All @@ -35,24 +36,30 @@ def wrapper(function):
return function
return wrapper

def wrap_function_by_validator(self, function, validator):
def wrap_function_by_validator(self, function, validator, validation_error_cb):
def validate_message(msg):
try:
if validator is not None:
validator.validated_message(msg.data)
except exceptions.ValidationError as se:
if validation_error_cb:
return validation_error_cb(msg, se)
error = f"subject: {msg.subject} error: {str(se)}"
return {"success": False, "error": error}
except Exception as e:
raise ValidationError(e)
return msg
return True

def wrapper(msg):
validate_message(msg)
validation_result = validate_message(msg)
if not validation_result is True:
return validation_result
return function(msg)

async def wrapper_async(msg):
validate_message(msg)
validation_result = validate_message(msg)
if not validation_result is True:
return validation_result
return await function(msg)

if asyncio.iscoroutinefunction(function):
Expand Down
17 changes: 16 additions & 1 deletion tests/test_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ class DataValidator(Validator):
async def publish(msg):
return {"success": True}

def validation_error_cb(msg, error):
return {"success": False, "error": "validation_error_cb"}

@app.listen("test_validator.foo-with-error-cb", validator=DataValidator, validation_error_cb=validation_error_cb)
async def publish(msg):
return {"success": True}

@app.listen("test_validator.check")
async def check(msg):
try:
Expand Down Expand Up @@ -58,11 +65,19 @@ def test_correct_message(client):

def test_request_with_correct_message(client):
response = client.request("test_validator.foo", {"data": "string"})
assert response["success"] is True
assert response["success"] is False


def test_request_with_incorrect_message(client):
response = client.request("test_validator.foo", {"data": 1})
assert (
response["success"] is True
) # both should be success = True, validator do not stop the request


def test_request_with_incorrect_message_error_cb(client):
response = client.request("test_validator.foo-with-error-cb", {"notdata": "string"})
assert response["success"] is False
assert "error" in response
assert "validation_error_cb" in response["error"]

0 comments on commit de156b0

Please sign in to comment.