Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
dacfd6a
"Stashing Changes"
John-Memphis Oct 21, 2023
e51ee6b
Merge branch 'master' into functions_wrapper_dev
John-Memphis Oct 25, 2023
e2a6eb9
"Added example wrapper for python function"
John-Memphis Oct 25, 2023
908974a
"Swapped comments out with a return statement of the lambda handler"
John-Memphis Oct 28, 2023
8e3eef8
"Fixed an issue with using bytearrays b/c those aren't serializiable …
John-Memphis Oct 29, 2023
86319be
"Made function take in the event and just calls it now..."
John-Memphis Nov 3, 2023
db7d395
Merge branch 'master' into functions_wrapper_dev
John-Memphis Nov 3, 2023
0a48a81
"Re-ordered function arguments so that python doesn't complain"
John-Memphis Nov 3, 2023
57d0df8
"Accessing event wrongly. Fixed"
John-Memphis Nov 3, 2023
5e525e1
"create_function documented and is using base64 encoding"
John-Memphis Nov 9, 2023
a2b3217
Merge branch 'master' into functions_wrapper_dev
John-Memphis Nov 9, 2023
e89a4e6
"Missing a newline in the types file for some reason... I didn't modi…
John-Memphis Nov 9, 2023
2390927
"removed extra json import"
John-Memphis Nov 9, 2023
4f5de9c
"removed extra string add"
John-Memphis Nov 9, 2023
5090b6c
"trying to make pylint happy by removing white space and renaming som…
John-Memphis Nov 9, 2023
1f9b08a
"removed unused import"
John-Memphis Nov 9, 2023
736e4a7
"removed unused schema import and removed functions.py example that i…
John-Memphis Nov 9, 2023
c965c6d
"moved my function to a new file"
John-Memphis Nov 9, 2023
fa85926
I left some new lines in memphis.py and didn't add one in functions...
John-Memphis Nov 9, 2023
7172f20
fixes
idanasulin2706 Nov 12, 2023
37b7e80
remove unused code
idanasulin2706 Nov 12, 2023
faccb45
pylint
idanasulin2706 Nov 12, 2023
15d6c23
"Fixed some syntax and added afew more examples in the readme to make…
John-Memphis Nov 12, 2023
13ebf09
"merge conflicts with master resolved"
John-Memphis Nov 12, 2023
a922520
"Extra clarity with one of the error messages and a type is fixed in …
John-Memphis Nov 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 82 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -921,28 +921,100 @@ memphis.is_connected()
```

### Creating a Memphis function
Utility for creating Memphis functions.
event_handler gets the message payload as bytes and message headers as a dict and should return the modified payload and headers.<br>
An exception should be raised if the message should be considered failed and go into the dead-letter station.<br>
if all returned values are None the message will be filtered out from the station.
Memphis provides a create_function utility for more easily creatin Memphis Functions.

> Make sure to encode the bytes object that you will return with utf-8!
The user created `event_handler` will be called for every message in the given batch of events. The user's `event_handler` will take in a `msg_payload` as bytes and `msg_headers` as a dict, and should return a modified version of the payload and headers in the same data types.

The user function should raise an exception if the message processing has failed. If any exception is raised (deliberately or by a failed operation) the message will be sent to the dead letter station.

If the returned modified version of the `msg_payload` or `msg_headers` are returned as `None`, then the message will be skipped and will not be sent to the station or dead letter station.

> Make sure to encode the modified `msg_payload` bytes object with utf-8 encoding!

This example function takes the bytes object `msg_payload` and encodes it into a string so that it may be parsed as JSON.

```python
import json
import base64
from memphis.functions import create_function

def handler(event, context): # this function name should be passed to the memphis.yaml file in the handler section
return create_function(event, event_handler = event_handler)
def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file
return create_function(event, user_func = event_handler)

def event_handler(msg_payload, msg_headers):
try:
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
as_json['modified'] = True

return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
except Exception as e:
raise e
```

If the user would want to have a message that they would want to validate and send to the dead letter station if the validation fails then the user can raise an exception. In the following example, the field `check` is simply a boolean. The following function will send any messages which fail the `check` to the dead letter station.

```python
import json
import base64
from memphis.functions import create_function

def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file
return create_function(event, user_func = event_handler)

def event_handler(msg_payload, msg_headers):
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
if as_json['check'] == False:
raise Exception("Validation Failed!")

return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
```

If a user would rather just skip the message and not have it be sent to the station or dead letter station, the cuser could instead return `None`, `None`:

```python
import json
import base64
from memphis.functions import create_function

def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file
return create_function(event, user_func = event_handler)

def event_handler(msg_payload, msg_headers):
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
if as_json['check'] == False:
return None, None

return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
```

Lastly, if the user is using another data format like Protocol Buffers, the user may simply decode the `msg_payload` into that format instead of JSON. Assuming we have a .proto definition like this:
```proto
syntax = "proto3";
package protobuf_example;

message Message{
string data_field = 1;
}
```

We can decode this and get the data_field out like this:

```python
import json
import base64
from memphis.functions import create_function
import message_pb2

def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file
return create_function(event, user_func = event_handler)

def event_handler(msg_payload, msg_headers):
message = message_pb2.Message()
message.ParseFromString(base64.b64decode(encoded_str))

# Arbitrarily changing the data_field
message.data_field = "my new data"

# SerializeToString returns bytes, which is the type we want
return message.SerializeToString(), msg_headers
```
36 changes: 33 additions & 3 deletions memphis/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@ def create_function(

Args:
event (dict):
The event that is provided to the functionby Memphis.
A dict of events given to the Function in the format:
{
messages: [
{
headers: {},
payload: "base64_encoded_payload"
},
...
]
}
event_handler (callable):
`create_function` assumes the function signature is in the format: <event_handler>(payload, headers) -> processed_payload, processed_headers.
This function will modify the payload and headers and return them in the modified format.
Expand All @@ -30,7 +39,26 @@ def create_function(

Returns:
handler (callable):
The Memphis function handler
The Memphis function handler which is responsible for iterating over the messages in the event and passing them to the user provided event handler.
Returns:
The Memphis function handler returns a JSON string which represents the successful and failed messages. This is in the format:
{
messages: [
{
headers: {},
payload: "base64_encoded_payload"
},
...
],
failed_messages[
{
headers: {},
payload: "base64_encoded_payload"
},
...
]
}
All failed_messages will be sent to the dead letter station, and the messages will be sent to the station.
"""
class EncodeBase64(json.JSONEncoder):
def default(self, o):
Expand All @@ -54,10 +82,12 @@ def handler(event):
})
elif processed_message is None and processed_headers is None: # filter out empty messages
continue
elif processed_message is None or processed_headers is None:
err_msg = f"processed_messages is of type {type(processed_message)} and processed_headers is {type(processed_headers)}. Either both of these should be None or neither"
raise Exception(err_msg)
else:
err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict"
raise Exception(err_msg)

except Exception as e:
processed_events["failed_messages"].append({
"headers": message["headers"],
Expand Down