-
Notifications
You must be signed in to change notification settings - Fork 0
/
callback_decorator.py
146 lines (105 loc) · 4.11 KB
/
callback_decorator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from abc import ABC, abstractmethod
from functools import singledispatchmethod
import json
from typing import Union
from pydantic import BaseModel, ValidationError
class ErrorHandlerCallback(ABC):
"""Decorates with error handling layer."""
class FatalError(Exception):
"""Rejects the message because it cannot be processed."""
pass
class TryAgainError(Exception):
"""Resends the message later to try to process it again."""
pass
def __init__(self, do_callback):
self.do_callback = do_callback
def __call__(self, channel, method_frame, header_frame, body):
try:
self.do_callback(body, header_frame.headers)
except self.FatalError:
self.reject_message(channel, method_frame, header_frame, body)
except self.TryAgainError:
self.resend_message_later(channel, method_frame, header_frame, body)
else:
self.acknowledge_message(channel, method_frame, header_frame, body)
@staticmethod
@abstractmethod
def reject_message(channel, method_frame, header_frame, body):
pass
@staticmethod
@abstractmethod
def resend_message_later(channel, method_frame, header_frame, body):
pass
@staticmethod
@abstractmethod
def acknowledge_message(channel, method_frame, header_frame, body):
pass
class JsonCallback:
def __init__(self, do_callback):
self.do_callback = do_callback
def __call__(self, body, headers):
try:
body = json.loads(body)
except json.JSONDecodeError as exc:
raise ErrorHandlerCallback.FatalError(exc)
self.do_callback(body, headers)
class ModelCallback:
"""Decorates with message parsing layer."""
def __init__(self, *message_types):
self.message_types = message_types
def __call__(self, do_callback):
def wrapper(body, headers):
class Messages(BaseModel):
message: Union[self.message_types]
try:
body = Messages(message=body).message
except ValidationError as exc:
raise ErrorHandlerCallback.FatalError(exc)
do_callback(body, headers)
return wrapper
class CallbackMeta(type(ABC)):
"""Decorates with message parsing layer, if arguments are given."""
@singledispatchmethod
def __call__(cls, do_callback: callable):
return super().__call__(do_callback)
@__call__.register
def _(cls, message_type: type(BaseModel), *message_types):
factory = super().__call__
def wrap(do_callback):
return factory(JsonCallback(ModelCallback(message_type, *message_types)(do_callback)))
return wrap
class Callback(ErrorHandlerCallback, metaclass=CallbackMeta):
"""Decorates the callback function with error handling and message parsing layers.
To create a decorator, derive from the class implementing the abstract methods.
Example:
>>> class MyCallback(Callback):
... @staticmethod
... def reject_message(channel, method_frame, header_frame, body):
... channel.basic_reject(method_frame.delivery_tag)
... @staticmethod
... def resend_message_later(channel, method_frame, header_frame, body):
... pass
... @staticmethod
... def acknowledge_message(channel, method_frame, header_frame, body):
... channel.basic_ack(method_frame.delivery_tag)
Without decorator parameters, the callback function will get the original message body and the headers.
Example:
>>> @MyCallback
... def do_my_callback(body, headers):
... ...
With decorator parameters, the parsed message and the headers are passed.
The value of the arguments has to be a type of `BaseModel`.
Example:
>>> from pydantic import BaseModel
>>>
>>>
>>> class MyModel(BaseModel):
... my_field: int
...
>>>
>>> @MyCallback(MyModel)
... def do_my_callback(my_object, headers):
... ...
On error, raise `Callback.TryAgainError` or `Callback.FatalError` in the callback function.
"""
pass