diff --git a/README.md b/README.md index 96204f9..40105bc 100644 --- a/README.md +++ b/README.md @@ -919,3 +919,30 @@ consumer.destroy() ```python memphis.is_connected() ``` + +### Creating a Memphis function +Utility for creating Memphis functions. +event_handler gets the message payload as bytes and message headers as a dict and should return the modified payload and headers.
+An exception should be raised if the message should be considered failed and go into the dead-letter station.
+if all returned values are None the message will be filtered out from the station. + +> Make sure to encode the bytes object that you will return with utf-8! + +```python +import json +import base64 +from memphis.functions import create_function + +def handler(event, context): # this function name should be passed to the memphis.yaml file in the handler section + return create_function(event, user_func = event_handler) + +def event_handler(msg_payload, msg_headers): + try: + payload = str(msg_payload, 'utf-8') + as_json = json.loads(payload) + as_json['modified'] = True + + return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers + except Exception as e: + raise e +``` \ No newline at end of file diff --git a/memphis/functions.py b/memphis/functions.py new file mode 100644 index 0000000..afec2dc --- /dev/null +++ b/memphis/functions.py @@ -0,0 +1,73 @@ +import json +import base64 + +def create_function( + event, + event_handler: callable +) -> None: + """ + This function creates a Memphis function and processes events with the passed-in event_handler function. + + Args: + event (dict): + The event that is provided to the functionby Memphis. + event_handler (callable): + `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. + This function will modify the payload and headers and return them in the modified format. + + Args: + payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. + headers (dict): The headers associated with the Memphis message. + + Returns: + modified_message (bytes): The modified message must be encoded into bytes before being returned from the `user_func`. + modified_headers (dict): The headers will be passed in and returned as a Python dictionary. + + Raises: + Error: + Raises an exception of any kind when something goes wrong with processing a message. + The unprocessed message and the exception will be sent to the dead-letter station. + + Returns: + handler (callable): + The Memphis function handler + """ + class EncodeBase64(json.JSONEncoder): + def default(self, o): + if isinstance(o, bytes): + return str(base64.b64encode(o), encoding='utf-8') + return json.JSONEncoder.default(self, o) + + def handler(event): + processed_events = {} + processed_events["messages"] = [] + processed_events["failed_messages"] = [] + for message in event["messages"]: + try: + payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) + processed_message, processed_headers = event_handler(payload, message['headers']) + + if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): + processed_events["messages"].append({ + "headers": processed_headers, + "payload": processed_message + }) + elif processed_message is None and processed_headers is None: # filter out empty messages + continue + else: + err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" + raise Exception(err_msg) + + except Exception as e: + processed_events["failed_messages"].append({ + "headers": message["headers"], + "payload": message["payload"], + "error": str(e) + }) + + try: + return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') + except Exception as e: + return f"Returned message types from user function are not able to be converted into JSON: {e}" + + return handler(event) diff --git a/memphis/memphis.py b/memphis/memphis.py index 3edbf05..7314a4b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -945,4 +945,3 @@ def unset_cached_consumer_station(self, station_name): del self.consumers_map[key] except Exception as e: raise e - \ No newline at end of file