From dacfd6a191aedc3916180799a14a7b8865bb5d84 Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 20 Oct 2023 20:11:45 -0500 Subject: [PATCH 01/21] "Stashing Changes" --- examples/functions.py | 8 ++++++++ memphis/memphis.py | 14 ++++++++++++-- memphis/types.py | 6 ++++++ 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 examples/functions.py diff --git a/examples/functions.py b/examples/functions.py new file mode 100644 index 0000000..4dba43e --- /dev/null +++ b/examples/functions.py @@ -0,0 +1,8 @@ +from memphis import memphis, MemphisConnectError, MemphisError, MemphisHeaderError +from memphis.message import Message + +# JSON validated or no schema +def default_function(headers: dict, payload: dict): + pass + +# def protobuf_function(headers: dict, payload: ) \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index a4c27b3..d972c82 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -32,7 +32,7 @@ from memphis.headers import Headers from memphis.producer import Producer from memphis.station import Station -from memphis.types import Retention, Storage +from memphis.types import Retention, Storage, Schema from memphis.utils import get_internal_name, random_bytes from memphis.partition_generator import PartitionGenerator @@ -904,4 +904,14 @@ def unset_cached_consumer_station(self, station_name): del self.consumers_map[key] except Exception as e: raise e - \ No newline at end of file + + def create_function( + self, + function: callable, + functions_schema_type, + functions_schema: Schema = None, + schema_type: Schema = Schema.NO_VALIDATION, + schema = None + ) -> None: + if schema is not None: + \ No newline at end of file diff --git a/memphis/types.py b/memphis/types.py index 79fad6a..d94cd5a 100644 --- a/memphis/types.py +++ b/memphis/types.py @@ -25,3 +25,9 @@ class Retention(Enum): class Storage(Enum): DISK = "file" MEMORY = "memory" + +class Schema(Enum): + PROTOBUF: "protobuf" + AVRO: "avro" + JSON: "json" + NO_VALIDATION: None \ No newline at end of file From e2a6eb96bb639e2b0316268bc702e2dc6a9b773b Mon Sep 17 00:00:00 2001 From: John Peters Date: Tue, 24 Oct 2023 21:32:17 -0500 Subject: [PATCH 02/21] "Added example wrapper for python function" --- memphis/memphis.py | 76 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 69 insertions(+), 7 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index d972c82..bddfa0f 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -22,6 +22,7 @@ import base64 import re import warnings +import dill import nats as broker from google.protobuf import descriptor_pb2, descriptor_pool @@ -907,11 +908,72 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, - function: callable, - functions_schema_type, - functions_schema: Schema = None, - schema_type: Schema = Schema.NO_VALIDATION, - schema = None + user_func: callable, + dependencies: list = [], ) -> None: - if schema is not None: - \ No newline at end of file + def lambda_handler(event, context): + import json + + processed_events = {} + processed_events["successfullMessages"] = [] + processed_events["errorMessages"] = [] + for message in event.messages: + try: + processed_message = user_func(message["payload"]) + byte_message = None + + if isinstance(processed_message, bytearray): + byte_message = processed_message + elif isinstance(processed_message, dict): + byte_message = bytearray(json.dumps(processed_message), encoding="utf-8") + elif isinstance(processed_message, str): + byte_message = bytearray(processed_message, encoding="utf-8") + else: + raise Exception("No valid data formats") + + processed_events["successfullMessages"].append({ + "headers": message["headers"], + "payload": byte_message + }) + + except Exception as e: + processed_events["errorMessages"].append({ + "headers": message["headers"], + "payload": message["payload"], + "error": str(e) + }) + + return json.dumps(processed_events) + + # Now get string representation of the user function and the lambda handler + lambda_handler_string = dill.source.getsource(lambda_handler) + user_func_string = dill.source.getsource(user_func) + + # Do whatever http request you need to do here to register the function... + + +# func FlattenHandler(ctx context.Context, event *MemphisEvent) (*MemphisEvent, error) { +# var processedEvent MemphisEvent +# for _, msg := range event.Messages { +# msgStr := string(msg.Payload) + +# flattenedMessages, err := FlattenMessages([]byte(msgStr)) + +# if err != nil{ +# processedEvent.FailedMessages = append(processedEvent.FailedMessages, MemphisMsgWithError{ +# Headers: msg.Headers, +# Payload: []byte(msgStr), +# Error: err.Error(), +# }) + +# continue +# } + +# processedEvent.Messages = append(processedEvent.Messages, MemphisMsg{ +# Headers: msg.Headers, +# Payload: flattenedMessages, +# }) +# } + +# return &processedEvent, nil +# } From 908974a5b74b08b47dab0adf215bd494282f48e4 Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 27 Oct 2023 19:55:41 -0500 Subject: [PATCH 03/21] "Swapped comments out with a return statement of the lambda handler" --- memphis/memphis.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index bddfa0f..b79b6f2 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -909,7 +909,6 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, user_func: callable, - dependencies: list = [], ) -> None: def lambda_handler(event, context): import json @@ -945,11 +944,7 @@ def lambda_handler(event, context): return json.dumps(processed_events) - # Now get string representation of the user function and the lambda handler - lambda_handler_string = dill.source.getsource(lambda_handler) - user_func_string = dill.source.getsource(user_func) - - # Do whatever http request you need to do here to register the function... + return lambda_handler # func FlattenHandler(ctx context.Context, event *MemphisEvent) (*MemphisEvent, error) { From 8e3eef8c5c63676e072856643dc859728313113b Mon Sep 17 00:00:00 2001 From: John Peters Date: Sun, 29 Oct 2023 00:54:10 -0500 Subject: [PATCH 04/21] "Fixed an issue with using bytearrays b/c those aren't serializiable to json with dumps" --- memphis/memphis.py | 45 ++++++--------------------------------------- 1 file changed, 6 insertions(+), 39 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index b79b6f2..e391fa6 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -919,20 +919,10 @@ def lambda_handler(event, context): for message in event.messages: try: processed_message = user_func(message["payload"]) - byte_message = None - - if isinstance(processed_message, bytearray): - byte_message = processed_message - elif isinstance(processed_message, dict): - byte_message = bytearray(json.dumps(processed_message), encoding="utf-8") - elif isinstance(processed_message, str): - byte_message = bytearray(processed_message, encoding="utf-8") - else: - raise Exception("No valid data formats") processed_events["successfullMessages"].append({ "headers": message["headers"], - "payload": byte_message + "payload": processed_message }) except Exception as e: @@ -941,34 +931,11 @@ def lambda_handler(event, context): "payload": message["payload"], "error": str(e) }) - - return json.dumps(processed_events) + + try: + return json.dumps(processed_events).encode('utf-8') + except Exception as e: + return f"Returned message types from user function are not able to be converted into JSON: {e}" return lambda_handler - -# func FlattenHandler(ctx context.Context, event *MemphisEvent) (*MemphisEvent, error) { -# var processedEvent MemphisEvent -# for _, msg := range event.Messages { -# msgStr := string(msg.Payload) - -# flattenedMessages, err := FlattenMessages([]byte(msgStr)) - -# if err != nil{ -# processedEvent.FailedMessages = append(processedEvent.FailedMessages, MemphisMsgWithError{ -# Headers: msg.Headers, -# Payload: []byte(msgStr), -# Error: err.Error(), -# }) - -# continue -# } - -# processedEvent.Messages = append(processedEvent.Messages, MemphisMsg{ -# Headers: msg.Headers, -# Payload: flattenedMessages, -# }) -# } - -# return &processedEvent, nil -# } From 86319beca4caf8dc32dbdfde3b792f8c7fa14847 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 2 Nov 2023 23:52:31 -0500 Subject: [PATCH 05/21] "Made function take in the event and just calls it now..." --- memphis/memphis.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index e391fa6..d67bddc 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -909,8 +909,9 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, user_func: callable, + event ) -> None: - def lambda_handler(event, context): + def lambda_handler(event): import json processed_events = {} @@ -937,5 +938,5 @@ def lambda_handler(event, context): except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" - return lambda_handler + return lambda_handler(event) From 0a48a818be2105d75989ffefe81dba72c0eac8e4 Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 3 Nov 2023 00:02:47 -0500 Subject: [PATCH 06/21] "Re-ordered function arguments so that python doesn't complain" --- memphis/memphis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 521c83c..29d3841 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -908,8 +908,8 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, - user_func: callable, - event + event, + user_func: callable ) -> None: def lambda_handler(event): import json From 57d0df80153c4262e6c50debc01877b12850f2e7 Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 3 Nov 2023 00:04:59 -0500 Subject: [PATCH 07/21] "Accessing event wrongly. Fixed" --- memphis/memphis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 29d3841..1c0b8cf 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -917,7 +917,7 @@ def lambda_handler(event): processed_events = {} processed_events["successfullMessages"] = [] processed_events["errorMessages"] = [] - for message in event.messages: + for message in event["messages"]: try: processed_message = user_func(message["payload"]) From 5e525e1916a191c204827f404949c713d4c88fcd Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:07:46 -0600 Subject: [PATCH 08/21] "create_function documented and is using base64 encoding" --- README.md | 21 +++++++++++++++++ memphis/memphis.py | 56 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 96204f9..13b1581 100644 --- a/README.md +++ b/README.md @@ -919,3 +919,24 @@ consumer.destroy() ```python memphis.is_connected() ``` + +### Using the create_function utility function +The create_function utility function is used to take a function that the user creates to process one message, into one which processes a batch of messages. When writing a Memphis Function, the logic in the main python file is expected to look like this example, with a lambda_handler, the user function and a call to create_function: + +> Make sure to encode the bytes object that you will return with utf-8! + +```python +import json +import base64 +from memphis.Memphis import create_function + +def lambda_handler(event, context): + return create_function(event, user_func = modify_message) + +def modify_message(message_payload): + payload = str(message_payload, 'utf-8') + as_json = json.loads(payload) + as_json['modified'] = True + + return bytes(json.dumps(as_json), encoding='utf-8') +``` \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index 1c0b8cf..c388fca 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -909,8 +909,43 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, event, - user_func: callable + user_func: callable ) -> None: + """ + This function handles a batch of messages and processes them with the passed-in user function. + + Args: + event (dict): + The Lambda event that is provided to the function Lambda is calling, usually named `lambda_handler`. + For more information, refer to: https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html + user_func (callable): + `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. + This function will modify the payload and headers and return them in the modified format. + + Args: + payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. + headers (dict): The headers associated with the Memphis message. + + Returns: + modified_message (bytes): The modified message must be encoded into bytes before being returned from the `user_func`. + modified_headers (dict): The headers will be passed in and returned as a Python dictionary. + + Raises: + Error: + Raises an exception of any kind when something goes wrong with processing a message. + The unprocessed message and the exception will be appended to the list of failed messages that is returned to the broker. + + Returns: + lambda_handler (callable): + A function that handles the batching of data and manages the returned messages. + Return the result of this function in the Lambda handler. + """ + class EncodeBase64(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, bytes): + return str(base64.b64encode(obj), encoding='utf-8') + return json.JSONEncoder.default(self, obj) + def lambda_handler(event): import json @@ -919,12 +954,18 @@ def lambda_handler(event): processed_events["errorMessages"] = [] for message in event["messages"]: try: - processed_message = user_func(message["payload"]) + payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) + processed_message, processed_headers = user_func(payload, message['headers']) - processed_events["successfullMessages"].append({ - "headers": message["headers"], - "payload": processed_message - }) + if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): + processed_events["successfullMessages"].append({ + "headers": processed_headers, + "payload": processed_message + }) + else: + err_msg = "The returned processed_message or processed_headers were not in the right format. " + + "processed_message must be bytes and processed_headers, dict" + raise Exception(err_msg) except Exception as e: processed_events["errorMessages"].append({ @@ -934,9 +975,8 @@ def lambda_handler(event): }) try: - return json.dumps(processed_events).encode('utf-8') + return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" return lambda_handler(event) - From e89a4e6b7cd3baf8f3d1bef36900e9e72082eead Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:09:51 -0600 Subject: [PATCH 09/21] "Missing a newline in the types file for some reason... I didn't modify it but pylint is complaining so I'm adding it" --- memphis/types.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/memphis/types.py b/memphis/types.py index d94cd5a..7ab1b13 100644 --- a/memphis/types.py +++ b/memphis/types.py @@ -30,4 +30,5 @@ class Schema(Enum): PROTOBUF: "protobuf" AVRO: "avro" JSON: "json" - NO_VALIDATION: None \ No newline at end of file + NO_VALIDATION: None + \ No newline at end of file From 2390927a9262f6363eb116c7c76897148302bd74 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:13:07 -0600 Subject: [PATCH 10/21] "removed extra json import" --- memphis/memphis.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index d47e991..b5ff2c3 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -988,8 +988,6 @@ def default(self, obj): return json.JSONEncoder.default(self, obj) def lambda_handler(event): - import json - processed_events = {} processed_events["successfullMessages"] = [] processed_events["errorMessages"] = [] From 4f5de9c5d49a6addba365c82853e07c5338308fb Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:17:54 -0600 Subject: [PATCH 11/21] "removed extra string add" --- memphis/memphis.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index b5ff2c3..2063536 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -1002,8 +1002,7 @@ def lambda_handler(event): "payload": processed_message }) else: - err_msg = "The returned processed_message or processed_headers were not in the right format. " - + "processed_message must be bytes and processed_headers, dict" + err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" raise Exception(err_msg) except Exception as e: From 5090b6c55ebad330238b3762e4d64e5e35b6ad7e Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:21:46 -0600 Subject: [PATCH 12/21] "trying to make pylint happy by removing white space and renaming some parameters" --- memphis/memphis.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 2063536..6aedcaa 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -946,7 +946,7 @@ def unset_cached_consumer_station(self, station_name): del self.consumers_map[key] except Exception as e: raise e - + def create_function( self, event, @@ -982,11 +982,11 @@ def create_function( Return the result of this function in the Lambda handler. """ class EncodeBase64(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, bytes): - return str(base64.b64encode(obj), encoding='utf-8') - return json.JSONEncoder.default(self, obj) - + def default(self, o): + if isinstance(o, bytes): + return str(base64.b64encode(o), encoding='utf-8') + return json.JSONEncoder.default(self, o) + def lambda_handler(event): processed_events = {} processed_events["successfullMessages"] = [] @@ -995,7 +995,7 @@ def lambda_handler(event): try: payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) processed_message, processed_headers = user_func(payload, message['headers']) - + if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): processed_events["successfullMessages"].append({ "headers": processed_headers, @@ -1011,7 +1011,7 @@ def lambda_handler(event): "payload": message["payload"], "error": str(e) }) - + try: return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') except Exception as e: From 1f9b08a5c93f0dc9be9c97ed72bb9516d4415419 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:23:01 -0600 Subject: [PATCH 13/21] "removed unused import" --- memphis/memphis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 6aedcaa..3e0629b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -22,7 +22,6 @@ import base64 import re import warnings -import dill import nats as broker from google.protobuf import descriptor_pb2, descriptor_pool From 736e4a799fa2831d088aa58127f753ed35914083 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:26:20 -0600 Subject: [PATCH 14/21] "removed unused schema import and removed functions.py example that is not being used" --- examples/functions.py | 8 -------- memphis/memphis.py | 2 +- 2 files changed, 1 insertion(+), 9 deletions(-) delete mode 100644 examples/functions.py diff --git a/examples/functions.py b/examples/functions.py deleted file mode 100644 index 4dba43e..0000000 --- a/examples/functions.py +++ /dev/null @@ -1,8 +0,0 @@ -from memphis import memphis, MemphisConnectError, MemphisError, MemphisHeaderError -from memphis.message import Message - -# JSON validated or no schema -def default_function(headers: dict, payload: dict): - pass - -# def protobuf_function(headers: dict, payload: ) \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index 3e0629b..745826b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -32,7 +32,7 @@ from memphis.headers import Headers from memphis.producer import Producer from memphis.station import Station -from memphis.types import Retention, Storage, Schema +from memphis.types import Retention, Storage from memphis.utils import get_internal_name, random_bytes from memphis.partition_generator import PartitionGenerator From c965c6d2c92b706a9e6b96cffecfa5a57685b908 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:31:07 -0600 Subject: [PATCH 15/21] "moved my function to a new file" --- README.md | 2 +- memphis/functions.py | 73 ++++++++++++++++++++++++++++++++++++++++++++ memphis/memphis.py | 71 ------------------------------------------ 3 files changed, 74 insertions(+), 72 deletions(-) create mode 100644 memphis/functions.py diff --git a/README.md b/README.md index 13b1581..e1b72b8 100644 --- a/README.md +++ b/README.md @@ -928,7 +928,7 @@ The create_function utility function is used to take a function that the user cr ```python import json import base64 -from memphis.Memphis import create_function +from memphis.functions import create_function def lambda_handler(event, context): return create_function(event, user_func = modify_message) diff --git a/memphis/functions.py b/memphis/functions.py new file mode 100644 index 0000000..d9f2c87 --- /dev/null +++ b/memphis/functions.py @@ -0,0 +1,73 @@ +import json +import base64 + +def create_function( + event, + user_func: callable +) -> None: + """ + This function handles a batch of messages and processes them with the passed-in user function. + + Args: + event (dict): + The Lambda event that is provided to the function Lambda is calling, usually named `lambda_handler`. + For more information, refer to: https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html + user_func (callable): + `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. + This function will modify the payload and headers and return them in the modified format. + + Args: + payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. + headers (dict): The headers associated with the Memphis message. + + Returns: + modified_message (bytes): The modified message must be encoded into bytes before being returned from the `user_func`. + modified_headers (dict): The headers will be passed in and returned as a Python dictionary. + + Raises: + Error: + Raises an exception of any kind when something goes wrong with processing a message. + The unprocessed message and the exception will be appended to the list of failed messages that is returned to the broker. + + Returns: + lambda_handler (callable): + A function that handles the batching of data and manages the returned messages. + Return the result of this function in the Lambda handler. + """ + class EncodeBase64(json.JSONEncoder): + def default(self, o): + if isinstance(o, bytes): + return str(base64.b64encode(o), encoding='utf-8') + return json.JSONEncoder.default(self, o) + + def lambda_handler(event): + processed_events = {} + processed_events["successfullMessages"] = [] + processed_events["errorMessages"] = [] + for message in event["messages"]: + try: + payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) + processed_message, processed_headers = user_func(payload, message['headers']) + + if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): + processed_events["successfullMessages"].append({ + "headers": processed_headers, + "payload": processed_message + }) + else: + err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" + raise Exception(err_msg) + + except Exception as e: + processed_events["errorMessages"].append({ + "headers": message["headers"], + "payload": message["payload"], + "error": str(e) + }) + + try: + return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') + except Exception as e: + return f"Returned message types from user function are not able to be converted into JSON: {e}" + + return lambda_handler(event) \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index 745826b..828c86b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -946,74 +946,3 @@ def unset_cached_consumer_station(self, station_name): except Exception as e: raise e - def create_function( - self, - event, - user_func: callable - ) -> None: - """ - This function handles a batch of messages and processes them with the passed-in user function. - - Args: - event (dict): - The Lambda event that is provided to the function Lambda is calling, usually named `lambda_handler`. - For more information, refer to: https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html - user_func (callable): - `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. - This function will modify the payload and headers and return them in the modified format. - - Args: - payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. - headers (dict): The headers associated with the Memphis message. - - Returns: - modified_message (bytes): The modified message must be encoded into bytes before being returned from the `user_func`. - modified_headers (dict): The headers will be passed in and returned as a Python dictionary. - - Raises: - Error: - Raises an exception of any kind when something goes wrong with processing a message. - The unprocessed message and the exception will be appended to the list of failed messages that is returned to the broker. - - Returns: - lambda_handler (callable): - A function that handles the batching of data and manages the returned messages. - Return the result of this function in the Lambda handler. - """ - class EncodeBase64(json.JSONEncoder): - def default(self, o): - if isinstance(o, bytes): - return str(base64.b64encode(o), encoding='utf-8') - return json.JSONEncoder.default(self, o) - - def lambda_handler(event): - processed_events = {} - processed_events["successfullMessages"] = [] - processed_events["errorMessages"] = [] - for message in event["messages"]: - try: - payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) - processed_message, processed_headers = user_func(payload, message['headers']) - - if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): - processed_events["successfullMessages"].append({ - "headers": processed_headers, - "payload": processed_message - }) - else: - err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" - raise Exception(err_msg) - - except Exception as e: - processed_events["errorMessages"].append({ - "headers": message["headers"], - "payload": message["payload"], - "error": str(e) - }) - - try: - return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') - except Exception as e: - return f"Returned message types from user function are not able to be converted into JSON: {e}" - - return lambda_handler(event) From fa859267cfdbdd104bb15aa7c1a80afbe028a861 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:33:07 -0600 Subject: [PATCH 16/21] I left some new lines in memphis.py and didn't add one in functions... --- memphis/functions.py | 2 +- memphis/memphis.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/memphis/functions.py b/memphis/functions.py index d9f2c87..2b01329 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -70,4 +70,4 @@ def lambda_handler(event): except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" - return lambda_handler(event) \ No newline at end of file + return lambda_handler(event) diff --git a/memphis/memphis.py b/memphis/memphis.py index 828c86b..7314a4b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -945,4 +945,3 @@ def unset_cached_consumer_station(self, station_name): del self.consumers_map[key] except Exception as e: raise e - From 7172f20551a4b920e21108929206c9bc65daf502 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 12 Nov 2023 09:46:18 +0200 Subject: [PATCH 17/21] fixes --- README.md | 24 +++++++++++++++--------- memphis/functions.py | 34 +++++++++++++++++----------------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index e1b72b8..40105bc 100644 --- a/README.md +++ b/README.md @@ -920,8 +920,11 @@ consumer.destroy() memphis.is_connected() ``` -### Using the create_function utility function -The create_function utility function is used to take a function that the user creates to process one message, into one which processes a batch of messages. When writing a Memphis Function, the logic in the main python file is expected to look like this example, with a lambda_handler, the user function and a call to create_function: +### Creating a Memphis function +Utility for creating Memphis functions. +event_handler gets the message payload as bytes and message headers as a dict and should return the modified payload and headers.
+An exception should be raised if the message should be considered failed and go into the dead-letter station.
+if all returned values are None the message will be filtered out from the station. > Make sure to encode the bytes object that you will return with utf-8! @@ -930,13 +933,16 @@ import json import base64 from memphis.functions import create_function -def lambda_handler(event, context): - return create_function(event, user_func = modify_message) +def handler(event, context): # this function name should be passed to the memphis.yaml file in the handler section + return create_function(event, user_func = event_handler) -def modify_message(message_payload): - payload = str(message_payload, 'utf-8') - as_json = json.loads(payload) - as_json['modified'] = True +def event_handler(msg_payload, msg_headers): + try: + payload = str(msg_payload, 'utf-8') + as_json = json.loads(payload) + as_json['modified'] = True - return bytes(json.dumps(as_json), encoding='utf-8') + return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers + except Exception as e: + raise e ``` \ No newline at end of file diff --git a/memphis/functions.py b/memphis/functions.py index 2b01329..afec2dc 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -3,17 +3,16 @@ def create_function( event, - user_func: callable + event_handler: callable ) -> None: """ - This function handles a batch of messages and processes them with the passed-in user function. + This function creates a Memphis function and processes events with the passed-in event_handler function. Args: event (dict): - The Lambda event that is provided to the function Lambda is calling, usually named `lambda_handler`. - For more information, refer to: https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html - user_func (callable): - `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. + The event that is provided to the functionby Memphis. + event_handler (callable): + `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. This function will modify the payload and headers and return them in the modified format. Args: @@ -27,12 +26,11 @@ def create_function( Raises: Error: Raises an exception of any kind when something goes wrong with processing a message. - The unprocessed message and the exception will be appended to the list of failed messages that is returned to the broker. + The unprocessed message and the exception will be sent to the dead-letter station. Returns: - lambda_handler (callable): - A function that handles the batching of data and manages the returned messages. - Return the result of this function in the Lambda handler. + handler (callable): + The Memphis function handler """ class EncodeBase64(json.JSONEncoder): def default(self, o): @@ -40,26 +38,28 @@ def default(self, o): return str(base64.b64encode(o), encoding='utf-8') return json.JSONEncoder.default(self, o) - def lambda_handler(event): + def handler(event): processed_events = {} - processed_events["successfullMessages"] = [] - processed_events["errorMessages"] = [] + processed_events["messages"] = [] + processed_events["failed_messages"] = [] for message in event["messages"]: try: payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) - processed_message, processed_headers = user_func(payload, message['headers']) + processed_message, processed_headers = event_handler(payload, message['headers']) if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): - processed_events["successfullMessages"].append({ + processed_events["messages"].append({ "headers": processed_headers, "payload": processed_message }) + elif processed_message is None and processed_headers is None: # filter out empty messages + continue else: err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" raise Exception(err_msg) except Exception as e: - processed_events["errorMessages"].append({ + processed_events["failed_messages"].append({ "headers": message["headers"], "payload": message["payload"], "error": str(e) @@ -70,4 +70,4 @@ def lambda_handler(event): except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" - return lambda_handler(event) + return handler(event) From 37b7e80c9fcf915bc0ccf81afda536b113df1e07 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 12 Nov 2023 09:48:47 +0200 Subject: [PATCH 18/21] remove unused code --- memphis/types.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/memphis/types.py b/memphis/types.py index 7ab1b13..4e7a142 100644 --- a/memphis/types.py +++ b/memphis/types.py @@ -24,11 +24,4 @@ class Retention(Enum): class Storage(Enum): DISK = "file" - MEMORY = "memory" - -class Schema(Enum): - PROTOBUF: "protobuf" - AVRO: "avro" - JSON: "json" - NO_VALIDATION: None - \ No newline at end of file + MEMORY = "memory" \ No newline at end of file From faccb45dde0f51df3e0257189918dcfa4b8b3adc Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 12 Nov 2023 09:51:48 +0200 Subject: [PATCH 19/21] pylint --- memphis/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memphis/types.py b/memphis/types.py index 4e7a142..79fad6a 100644 --- a/memphis/types.py +++ b/memphis/types.py @@ -24,4 +24,4 @@ class Retention(Enum): class Storage(Enum): DISK = "file" - MEMORY = "memory" \ No newline at end of file + MEMORY = "memory" From 15d6c23c746b5caa91b140c8d81f4626c3d369ca Mon Sep 17 00:00:00 2001 From: John Peters Date: Sun, 12 Nov 2023 12:24:37 -0600 Subject: [PATCH 20/21] "Fixed some syntax and added afew more examples in the readme to make it easier for users" --- README.md | 90 +++++++++++++++++++++++++++++++++++++++----- memphis/functions.py | 32 +++++++++++++++- 2 files changed, 111 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 40105bc..95d2b4c 100644 --- a/README.md +++ b/README.md @@ -921,28 +921,100 @@ memphis.is_connected() ``` ### Creating a Memphis function -Utility for creating Memphis functions. -event_handler gets the message payload as bytes and message headers as a dict and should return the modified payload and headers.
-An exception should be raised if the message should be considered failed and go into the dead-letter station.
-if all returned values are None the message will be filtered out from the station. +Memphis provides a create_function utility for more easily creatin Memphis Functions. -> Make sure to encode the bytes object that you will return with utf-8! +The user created `event_handler` will be called for every message in the given batch of events. The user `event_handler` will take in a `msg_payload` as bytes and `msg_headers` as a dict, and should return a modified version of the payload and headers in the same data types. + +The user function should raise an exception if the message processing has failed. If any exception is raised (deliberately or by a failed operation) the message will be placed in the dead letter station=. + +If the returned modified version of the `msg_payload` or `msg_headers` are returned as `None`, then the message will be skipped and will not be sent to the station or dead letter station. + +> Make sure to encode the modified `msg_payload` bytes object with utf-8 encoding! + +This example function takes the bytes object `msg_payload` and encodes it into a string so that it may be parsed as JSON. ```python import json import base64 from memphis.functions import create_function -def handler(event, context): # this function name should be passed to the memphis.yaml file in the handler section +def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file return create_function(event, user_func = event_handler) def event_handler(msg_payload, msg_headers): - try: payload = str(msg_payload, 'utf-8') as_json = json.loads(payload) as_json['modified'] = True return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers - except Exception as e: - raise e +``` + +If the user would want to have a message that they would want to validate and send to the dead letter station if the validation fails then the user can raise an exception. In the following example, the field `check` is simply a boolean. The following function will send any messages which fail the `check` to the dead letter station. + +```python +import json +import base64 +from memphis.functions import create_function + +def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file + return create_function(event, user_func = event_handler) + +def event_handler(msg_payload, msg_headers): + payload = str(msg_payload, 'utf-8') + as_json = json.loads(payload) + if as_json['check'] == False: + raise Exception("Validation Failed!") + + return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers +``` + +If a user would rather just skip the message and not have it be sent to the station or dead letter station, the cuser could instead return `None`, `None`: + +```python +import json +import base64 +from memphis.functions import create_function + +def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file + return create_function(event, user_func = event_handler) + +def event_handler(msg_payload, msg_headers): + payload = str(msg_payload, 'utf-8') + as_json = json.loads(payload) + if as_json['check'] == False: + return None, None + + return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers +``` + +Lastly, if the user is using another data format like Protocol Buffers, the user may simply decode the `msg_payload` into that format instead of JSON. Assuming we have a .proto definition like this: +```proto +syntax = "proto3"; +package protobuf_example; + +message Message{ + string data_field = 1; +} +``` + +We can decode this and get the data_field out like this: + +```python +import json +import base64 +from memphis.functions import create_function +import message_pb2 + +def handler(event, context): # The name of this function should match the handler field in the memphis.yaml file + return create_function(event, user_func = event_handler) + +def event_handler(msg_payload, msg_headers): + message = message_pb2.Message() + message.ParseFromString(base64.b64decode(encoded_str)) + + # Arbitrarily changing the data_field + message.data_field = "my new data" + + # SerializeToString returns bytes, which is the type we want + return message.SerializeToString(), msg_headers ``` \ No newline at end of file diff --git a/memphis/functions.py b/memphis/functions.py index afec2dc..ff1705d 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -10,7 +10,16 @@ def create_function( Args: event (dict): - The event that is provided to the functionby Memphis. + A dict of events given to the Function in the format: + { + messages: [ + { + headers: {}, + payload: "base64_encoded_payload" + }, + ... + ] + } event_handler (callable): `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. This function will modify the payload and headers and return them in the modified format. @@ -30,7 +39,26 @@ def create_function( Returns: handler (callable): - The Memphis function handler + The Memphis function handler which is responsible for iterating over the messages in the event and passing them to the user provided event handler. + Returns: + The Memphis function handler returns a JSON string which represents the successful and failed messages. This is in the format: + { + messages: [ + { + headers: {}, + payload: "base64_encoded_payload" + }, + ... + ], + failed_messages[ + { + headers: {}, + payload: "base64_encoded_payload" + }, + ... + ] + } + All failed_messages will be sent to the dead letter station, and the messages will be sent to the station. """ class EncodeBase64(json.JSONEncoder): def default(self, o): From a922520b507d1a84e13af4ba1fb260d87d565c19 Mon Sep 17 00:00:00 2001 From: John Peters Date: Sun, 12 Nov 2023 12:37:08 -0600 Subject: [PATCH 21/21] "Extra clarity with one of the error messages and a type is fixed in the readme" --- README.md | 4 ++-- memphis/functions.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index fc8b41a..12b3f09 100644 --- a/README.md +++ b/README.md @@ -923,9 +923,9 @@ memphis.is_connected() ### Creating a Memphis function Memphis provides a create_function utility for more easily creatin Memphis Functions. -The user created `event_handler` will be called for every message in the given batch of events. The user `event_handler` will take in a `msg_payload` as bytes and `msg_headers` as a dict, and should return a modified version of the payload and headers in the same data types. +The user created `event_handler` will be called for every message in the given batch of events. The user's `event_handler` will take in a `msg_payload` as bytes and `msg_headers` as a dict, and should return a modified version of the payload and headers in the same data types. -The user function should raise an exception if the message processing has failed. If any exception is raised (deliberately or by a failed operation) the message will be placed in the dead letter station=. +The user function should raise an exception if the message processing has failed. If any exception is raised (deliberately or by a failed operation) the message will be sent to the dead letter station. If the returned modified version of the `msg_payload` or `msg_headers` are returned as `None`, then the message will be skipped and will not be sent to the station or dead letter station. diff --git a/memphis/functions.py b/memphis/functions.py index ff1705d..fdc67ea 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -82,10 +82,12 @@ def handler(event): }) elif processed_message is None and processed_headers is None: # filter out empty messages continue + elif processed_message is None or processed_headers is None: + err_msg = f"processed_messages is of type {type(processed_message)} and processed_headers is {type(processed_headers)}. Either both of these should be None or neither" + raise Exception(err_msg) else: err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" raise Exception(err_msg) - except Exception as e: processed_events["failed_messages"].append({ "headers": message["headers"],