Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dacfd6a
"Stashing Changes"
John-Memphis Oct 21, 2023
e51ee6b
Merge branch 'master' into functions_wrapper_dev
John-Memphis Oct 25, 2023
e2a6eb9
"Added example wrapper for python function"
John-Memphis Oct 25, 2023
908974a
"Swapped comments out with a return statement of the lambda handler"
John-Memphis Oct 28, 2023
8e3eef8
"Fixed an issue with using bytearrays b/c those aren't serializiable …
John-Memphis Oct 29, 2023
86319be
"Made function take in the event and just calls it now..."
John-Memphis Nov 3, 2023
db7d395
Merge branch 'master' into functions_wrapper_dev
John-Memphis Nov 3, 2023
0a48a81
"Re-ordered function arguments so that python doesn't complain"
John-Memphis Nov 3, 2023
57d0df8
"Accessing event wrongly. Fixed"
John-Memphis Nov 3, 2023
5e525e1
"create_function documented and is using base64 encoding"
John-Memphis Nov 9, 2023
a2b3217
Merge branch 'master' into functions_wrapper_dev
John-Memphis Nov 9, 2023
e89a4e6
"Missing a newline in the types file for some reason... I didn't modi…
John-Memphis Nov 9, 2023
2390927
"removed extra json import"
John-Memphis Nov 9, 2023
4f5de9c
"removed extra string add"
John-Memphis Nov 9, 2023
5090b6c
"trying to make pylint happy by removing white space and renaming som…
John-Memphis Nov 9, 2023
1f9b08a
"removed unused import"
John-Memphis Nov 9, 2023
736e4a7
"removed unused schema import and removed functions.py example that i…
John-Memphis Nov 9, 2023
c965c6d
"moved my function to a new file"
John-Memphis Nov 9, 2023
fa85926
I left some new lines in memphis.py and didn't add one in functions...
John-Memphis Nov 9, 2023
7172f20
fixes
idanasulin2706 Nov 12, 2023
37b7e80
remove unused code
idanasulin2706 Nov 12, 2023
faccb45
pylint
idanasulin2706 Nov 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -919,3 +919,30 @@ consumer.destroy()
```python
memphis.is_connected()
```

### Creating a Memphis function
Utility for creating Memphis functions.
event_handler gets the message payload as bytes and message headers as a dict and should return the modified payload and headers.<br>
An exception should be raised if the message should be considered failed and go into the dead-letter station.<br>
if all returned values are None the message will be filtered out from the station.

> Make sure to encode the bytes object that you will return with utf-8!

```python
import json
import base64
from memphis.functions import create_function

def handler(event, context): # this function name should be passed to the memphis.yaml file in the handler section
return create_function(event, user_func = event_handler)

def event_handler(msg_payload, msg_headers):
try:
payload = str(msg_payload, 'utf-8')
as_json = json.loads(payload)
as_json['modified'] = True

return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers
except Exception as e:
raise e
```
73 changes: 73 additions & 0 deletions memphis/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import json
import base64

def create_function(
event,
event_handler: callable
) -> None:
"""
This function creates a Memphis function and processes events with the passed-in event_handler function.

Args:
event (dict):
The event that is provided to the functionby Memphis.
event_handler (callable):
`create_function` assumes the function signature is in the format: <event_handler>(payload, headers) -> processed_payload, processed_headers.
This function will modify the payload and headers and return them in the modified format.

Args:
payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding.
headers (dict): The headers associated with the Memphis message.

Returns:
modified_message (bytes): The modified message must be encoded into bytes before being returned from the `user_func`.
modified_headers (dict): The headers will be passed in and returned as a Python dictionary.

Raises:
Error:
Raises an exception of any kind when something goes wrong with processing a message.
The unprocessed message and the exception will be sent to the dead-letter station.

Returns:
handler (callable):
The Memphis function handler
"""
class EncodeBase64(json.JSONEncoder):
def default(self, o):
if isinstance(o, bytes):
return str(base64.b64encode(o), encoding='utf-8')
return json.JSONEncoder.default(self, o)

def handler(event):
processed_events = {}
processed_events["messages"] = []
processed_events["failed_messages"] = []
for message in event["messages"]:
try:
payload = base64.b64decode(bytes(message['payload'], encoding='utf-8'))
processed_message, processed_headers = event_handler(payload, message['headers'])

if isinstance(processed_message, bytes) and isinstance(processed_headers, dict):
processed_events["messages"].append({
"headers": processed_headers,
"payload": processed_message
})
elif processed_message is None and processed_headers is None: # filter out empty messages
continue
else:
err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict"
raise Exception(err_msg)

except Exception as e:
processed_events["failed_messages"].append({
"headers": message["headers"],
"payload": message["payload"],
"error": str(e)
})

try:
return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8')
except Exception as e:
return f"Returned message types from user function are not able to be converted into JSON: {e}"

return handler(event)
1 change: 0 additions & 1 deletion memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,4 +945,3 @@ def unset_cached_consumer_station(self, station_name):
del self.consumers_map[key]
except Exception as e:
raise e