diff --git a/README.md b/README.md
index 6c0ce53..12b3f09 100644
--- a/README.md
+++ b/README.md
@@ -921,28 +921,100 @@ memphis.is_connected()
```
### Creating a Memphis function
-Utility for creating Memphis functions.
-event_handler gets the message payload as bytes and message headers as a dict and should return the modified payload and headers.
-An exception should be raised if the message should be considered failed and go into the dead-letter station.
-if all returned values are None the message will be filtered out from the station.
+Memphis provides a create_function utility for more easily creatin Memphis Functions.
-> Make sure to encode the bytes object that you will return with utf-8!
+The user created `event_handler` will be called for every message in the given batch of events. The user's `event_handler` will take in a `msg_payload` as bytes and `msg_headers` as a dict, and should return a modified version of the payload and headers in the same data types.
+
+The user function should raise an exception if the message processing has failed. If any exception is raised (deliberately or by a failed operation) the message will be sent to the dead letter station.
+
+If the returned modified version of the `msg_payload` or `msg_headers` are returned as `None`, then the message will be skipped and will not be sent to the station or dead letter station.
+
+> Make sure to encode the modified `msg_payload` bytes object with utf-8 encoding!
+
+This example function takes the bytes object `msg_payload` and encodes it into a string so that it may be parsed as JSON.
```python
import json
import base64
from memphis.functions import create_function
-def handler(event, context): # this function name should be passed to the memphis.yaml file in the handler section
- return create_function(event, event_handler = event_handler)
+def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file
+ return create_function(event, user_func = event_handler)
def event_handler(msg_payload, msg_headers):
- try:
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
as_json['modified'] = True
return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
- except Exception as e:
- raise e
+```
+
+If the user would want to have a message that they would want to validate and send to the dead letter station if the validation fails then the user can raise an exception. In the following example, the field `check` is simply a boolean. The following function will send any messages which fail the `check` to the dead letter station.
+
+```python
+import json
+import base64
+from memphis.functions import create_function
+
+def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file
+ return create_function(event, user_func = event_handler)
+
+def event_handler(msg_payload, msg_headers):
+ payload = str(msg_payload, 'utf-8')
+ as_json = json.loads(payload)
+ if as_json['check'] == False:
+ raise Exception("Validation Failed!")
+
+ return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
+```
+
+If a user would rather just skip the message and not have it be sent to the station or dead letter station, the cuser could instead return `None`, `None`:
+
+```python
+import json
+import base64
+from memphis.functions import create_function
+
+def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file
+ return create_function(event, user_func = event_handler)
+
+def event_handler(msg_payload, msg_headers):
+ payload = str(msg_payload, 'utf-8')
+ as_json = json.loads(payload)
+ if as_json['check'] == False:
+ return None, None
+
+ return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
+```
+
+Lastly, if the user is using another data format like Protocol Buffers, the user may simply decode the `msg_payload` into that format instead of JSON. Assuming we have a .proto definition like this:
+```proto
+syntax = "proto3";
+package protobuf_example;
+
+message Message{
+ string data_field = 1;
+}
+```
+
+We can decode this and get the data_field out like this:
+
+```python
+import json
+import base64
+from memphis.functions import create_function
+import message_pb2
+
+def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file
+ return create_function(event, user_func = event_handler)
+
+def event_handler(msg_payload, msg_headers):
+ message = message_pb2.Message()
+ message.ParseFromString(base64.b64decode(encoded_str))
+
+ # Arbitrarily changing the data_field
+ message.data_field = "my new data"
+
+ # SerializeToString returns bytes, which is the type we want
+ return message.SerializeToString(), msg_headers
```
diff --git a/memphis/functions.py b/memphis/functions.py
index afec2dc..fdc67ea 100644
--- a/memphis/functions.py
+++ b/memphis/functions.py
@@ -10,7 +10,16 @@ def create_function(
Args:
event (dict):
- The event that is provided to the functionby Memphis.
+ A dict of events given to the Function in the format:
+ {
+ messages: [
+ {
+ headers: {},
+ payload: "base64_encoded_payload"
+ },
+ ...
+ ]
+ }
event_handler (callable):
`create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers.
This function will modify the payload and headers and return them in the modified format.
@@ -30,7 +39,26 @@ def create_function(
Returns:
handler (callable):
- The Memphis function handler
+ The Memphis function handler which is responsible for iterating over the messages in the event and passing them to the user provided event handler.
+ Returns:
+ The Memphis function handler returns a JSON string which represents the successful and failed messages. This is in the format:
+ {
+ messages: [
+ {
+ headers: {},
+ payload: "base64_encoded_payload"
+ },
+ ...
+ ],
+ failed_messages[
+ {
+ headers: {},
+ payload: "base64_encoded_payload"
+ },
+ ...
+ ]
+ }
+ All failed_messages will be sent to the dead letter station, and the messages will be sent to the station.
"""
class EncodeBase64(json.JSONEncoder):
def default(self, o):
@@ -54,10 +82,12 @@ def handler(event):
})
elif processed_message is None and processed_headers is None: # filter out empty messages
continue
+ elif processed_message is None or processed_headers is None:
+ err_msg = f"processed_messages is of type {type(processed_message)} and processed_headers is {type(processed_headers)}. Either both of these should be None or neither"
+ raise Exception(err_msg)
else:
err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict"
raise Exception(err_msg)
-
except Exception as e:
processed_events["failed_messages"].append({
"headers": message["headers"],