From dacfd6a191aedc3916180799a14a7b8865bb5d84 Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 20 Oct 2023 20:11:45 -0500 Subject: [PATCH 01/19] "Stashing Changes" --- examples/functions.py | 8 ++++++++ memphis/memphis.py | 14 ++++++++++++-- memphis/types.py | 6 ++++++ 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 examples/functions.py diff --git a/examples/functions.py b/examples/functions.py new file mode 100644 index 0000000..4dba43e --- /dev/null +++ b/examples/functions.py @@ -0,0 +1,8 @@ +from memphis import memphis, MemphisConnectError, MemphisError, MemphisHeaderError +from memphis.message import Message + +# JSON validated or no schema +def default_function(headers: dict, payload: dict): + pass + +# def protobuf_function(headers: dict, payload: ) \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index a4c27b3..d972c82 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -32,7 +32,7 @@ from memphis.headers import Headers from memphis.producer import Producer from memphis.station import Station -from memphis.types import Retention, Storage +from memphis.types import Retention, Storage, Schema from memphis.utils import get_internal_name, random_bytes from memphis.partition_generator import PartitionGenerator @@ -904,4 +904,14 @@ def unset_cached_consumer_station(self, station_name): del self.consumers_map[key] except Exception as e: raise e - \ No newline at end of file + + def create_function( + self, + function: callable, + functions_schema_type, + functions_schema: Schema = None, + schema_type: Schema = Schema.NO_VALIDATION, + schema = None + ) -> None: + if schema is not None: + \ No newline at end of file diff --git a/memphis/types.py b/memphis/types.py index 79fad6a..d94cd5a 100644 --- a/memphis/types.py +++ b/memphis/types.py @@ -25,3 +25,9 @@ class Retention(Enum): class Storage(Enum): DISK = "file" MEMORY = "memory" + +class Schema(Enum): + PROTOBUF: "protobuf" + AVRO: "avro" + JSON: "json" + NO_VALIDATION: None \ No newline at end of file From e2a6eb96bb639e2b0316268bc702e2dc6a9b773b Mon Sep 17 00:00:00 2001 From: John Peters Date: Tue, 24 Oct 2023 21:32:17 -0500 Subject: [PATCH 02/19] "Added example wrapper for python function" --- memphis/memphis.py | 76 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 69 insertions(+), 7 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index d972c82..bddfa0f 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -22,6 +22,7 @@ import base64 import re import warnings +import dill import nats as broker from google.protobuf import descriptor_pb2, descriptor_pool @@ -907,11 +908,72 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, - function: callable, - functions_schema_type, - functions_schema: Schema = None, - schema_type: Schema = Schema.NO_VALIDATION, - schema = None + user_func: callable, + dependencies: list = [], ) -> None: - if schema is not None: - \ No newline at end of file + def lambda_handler(event, context): + import json + + processed_events = {} + processed_events["successfullMessages"] = [] + processed_events["errorMessages"] = [] + for message in event.messages: + try: + processed_message = user_func(message["payload"]) + byte_message = None + + if isinstance(processed_message, bytearray): + byte_message = processed_message + elif isinstance(processed_message, dict): + byte_message = bytearray(json.dumps(processed_message), encoding="utf-8") + elif isinstance(processed_message, str): + byte_message = bytearray(processed_message, encoding="utf-8") + else: + raise Exception("No valid data formats") + + processed_events["successfullMessages"].append({ + "headers": message["headers"], + "payload": byte_message + }) + + except Exception as e: + processed_events["errorMessages"].append({ + "headers": message["headers"], + "payload": message["payload"], + "error": str(e) + }) + + return json.dumps(processed_events) + + # Now get string representation of the user function and the lambda handler + lambda_handler_string = dill.source.getsource(lambda_handler) + user_func_string = dill.source.getsource(user_func) + + # Do whatever http request you need to do here to register the function... + + +# func FlattenHandler(ctx context.Context, event *MemphisEvent) (*MemphisEvent, error) { +# var processedEvent MemphisEvent +# for _, msg := range event.Messages { +# msgStr := string(msg.Payload) + +# flattenedMessages, err := FlattenMessages([]byte(msgStr)) + +# if err != nil{ +# processedEvent.FailedMessages = append(processedEvent.FailedMessages, MemphisMsgWithError{ +# Headers: msg.Headers, +# Payload: []byte(msgStr), +# Error: err.Error(), +# }) + +# continue +# } + +# processedEvent.Messages = append(processedEvent.Messages, MemphisMsg{ +# Headers: msg.Headers, +# Payload: flattenedMessages, +# }) +# } + +# return &processedEvent, nil +# } From 908974a5b74b08b47dab0adf215bd494282f48e4 Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 27 Oct 2023 19:55:41 -0500 Subject: [PATCH 03/19] "Swapped comments out with a return statement of the lambda handler" --- memphis/memphis.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index bddfa0f..b79b6f2 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -909,7 +909,6 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, user_func: callable, - dependencies: list = [], ) -> None: def lambda_handler(event, context): import json @@ -945,11 +944,7 @@ def lambda_handler(event, context): return json.dumps(processed_events) - # Now get string representation of the user function and the lambda handler - lambda_handler_string = dill.source.getsource(lambda_handler) - user_func_string = dill.source.getsource(user_func) - - # Do whatever http request you need to do here to register the function... + return lambda_handler # func FlattenHandler(ctx context.Context, event *MemphisEvent) (*MemphisEvent, error) { From 8e3eef8c5c63676e072856643dc859728313113b Mon Sep 17 00:00:00 2001 From: John Peters Date: Sun, 29 Oct 2023 00:54:10 -0500 Subject: [PATCH 04/19] "Fixed an issue with using bytearrays b/c those aren't serializiable to json with dumps" --- memphis/memphis.py | 45 ++++++--------------------------------------- 1 file changed, 6 insertions(+), 39 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index b79b6f2..e391fa6 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -919,20 +919,10 @@ def lambda_handler(event, context): for message in event.messages: try: processed_message = user_func(message["payload"]) - byte_message = None - - if isinstance(processed_message, bytearray): - byte_message = processed_message - elif isinstance(processed_message, dict): - byte_message = bytearray(json.dumps(processed_message), encoding="utf-8") - elif isinstance(processed_message, str): - byte_message = bytearray(processed_message, encoding="utf-8") - else: - raise Exception("No valid data formats") processed_events["successfullMessages"].append({ "headers": message["headers"], - "payload": byte_message + "payload": processed_message }) except Exception as e: @@ -941,34 +931,11 @@ def lambda_handler(event, context): "payload": message["payload"], "error": str(e) }) - - return json.dumps(processed_events) + + try: + return json.dumps(processed_events).encode('utf-8') + except Exception as e: + return f"Returned message types from user function are not able to be converted into JSON: {e}" return lambda_handler - -# func FlattenHandler(ctx context.Context, event *MemphisEvent) (*MemphisEvent, error) { -# var processedEvent MemphisEvent -# for _, msg := range event.Messages { -# msgStr := string(msg.Payload) - -# flattenedMessages, err := FlattenMessages([]byte(msgStr)) - -# if err != nil{ -# processedEvent.FailedMessages = append(processedEvent.FailedMessages, MemphisMsgWithError{ -# Headers: msg.Headers, -# Payload: []byte(msgStr), -# Error: err.Error(), -# }) - -# continue -# } - -# processedEvent.Messages = append(processedEvent.Messages, MemphisMsg{ -# Headers: msg.Headers, -# Payload: flattenedMessages, -# }) -# } - -# return &processedEvent, nil -# } From 86319beca4caf8dc32dbdfde3b792f8c7fa14847 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 2 Nov 2023 23:52:31 -0500 Subject: [PATCH 05/19] "Made function take in the event and just calls it now..." --- memphis/memphis.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index e391fa6..d67bddc 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -909,8 +909,9 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, user_func: callable, + event ) -> None: - def lambda_handler(event, context): + def lambda_handler(event): import json processed_events = {} @@ -937,5 +938,5 @@ def lambda_handler(event, context): except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" - return lambda_handler + return lambda_handler(event) From 0a48a818be2105d75989ffefe81dba72c0eac8e4 Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 3 Nov 2023 00:02:47 -0500 Subject: [PATCH 06/19] "Re-ordered function arguments so that python doesn't complain" --- memphis/memphis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 521c83c..29d3841 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -908,8 +908,8 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, - user_func: callable, - event + event, + user_func: callable ) -> None: def lambda_handler(event): import json From 57d0df80153c4262e6c50debc01877b12850f2e7 Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 3 Nov 2023 00:04:59 -0500 Subject: [PATCH 07/19] "Accessing event wrongly. Fixed" --- memphis/memphis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 29d3841..1c0b8cf 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -917,7 +917,7 @@ def lambda_handler(event): processed_events = {} processed_events["successfullMessages"] = [] processed_events["errorMessages"] = [] - for message in event.messages: + for message in event["messages"]: try: processed_message = user_func(message["payload"]) From 5e525e1916a191c204827f404949c713d4c88fcd Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:07:46 -0600 Subject: [PATCH 08/19] "create_function documented and is using base64 encoding" --- README.md | 21 +++++++++++++++++ memphis/memphis.py | 56 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 96204f9..13b1581 100644 --- a/README.md +++ b/README.md @@ -919,3 +919,24 @@ consumer.destroy() ```python memphis.is_connected() ``` + +### Using the create_function utility function +The create_function utility function is used to take a function that the user creates to process one message, into one which processes a batch of messages. When writing a Memphis Function, the logic in the main python file is expected to look like this example, with a lambda_handler, the user function and a call to create_function: + +> Make sure to encode the bytes object that you will return with utf-8! + +```python +import json +import base64 +from memphis.Memphis import create_function + +def lambda_handler(event, context): + return create_function(event, user_func = modify_message) + +def modify_message(message_payload): + payload = str(message_payload, 'utf-8') + as_json = json.loads(payload) + as_json['modified'] = True + + return bytes(json.dumps(as_json), encoding='utf-8') +``` \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index 1c0b8cf..c388fca 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -909,8 +909,43 @@ def unset_cached_consumer_station(self, station_name): def create_function( self, event, - user_func: callable + user_func: callable ) -> None: + """ + This function handles a batch of messages and processes them with the passed-in user function. + + Args: + event (dict): + The Lambda event that is provided to the function Lambda is calling, usually named `lambda_handler`. + For more information, refer to: https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html + user_func (callable): + `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. + This function will modify the payload and headers and return them in the modified format. + + Args: + payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. + headers (dict): The headers associated with the Memphis message. + + Returns: + modified_message (bytes): The modified message must be encoded into bytes before being returned from the `user_func`. + modified_headers (dict): The headers will be passed in and returned as a Python dictionary. + + Raises: + Error: + Raises an exception of any kind when something goes wrong with processing a message. + The unprocessed message and the exception will be appended to the list of failed messages that is returned to the broker. + + Returns: + lambda_handler (callable): + A function that handles the batching of data and manages the returned messages. + Return the result of this function in the Lambda handler. + """ + class EncodeBase64(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, bytes): + return str(base64.b64encode(obj), encoding='utf-8') + return json.JSONEncoder.default(self, obj) + def lambda_handler(event): import json @@ -919,12 +954,18 @@ def lambda_handler(event): processed_events["errorMessages"] = [] for message in event["messages"]: try: - processed_message = user_func(message["payload"]) + payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) + processed_message, processed_headers = user_func(payload, message['headers']) - processed_events["successfullMessages"].append({ - "headers": message["headers"], - "payload": processed_message - }) + if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): + processed_events["successfullMessages"].append({ + "headers": processed_headers, + "payload": processed_message + }) + else: + err_msg = "The returned processed_message or processed_headers were not in the right format. " + + "processed_message must be bytes and processed_headers, dict" + raise Exception(err_msg) except Exception as e: processed_events["errorMessages"].append({ @@ -934,9 +975,8 @@ def lambda_handler(event): }) try: - return json.dumps(processed_events).encode('utf-8') + return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" return lambda_handler(event) - From e89a4e6b7cd3baf8f3d1bef36900e9e72082eead Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:09:51 -0600 Subject: [PATCH 09/19] "Missing a newline in the types file for some reason... I didn't modify it but pylint is complaining so I'm adding it" --- memphis/types.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/memphis/types.py b/memphis/types.py index d94cd5a..7ab1b13 100644 --- a/memphis/types.py +++ b/memphis/types.py @@ -30,4 +30,5 @@ class Schema(Enum): PROTOBUF: "protobuf" AVRO: "avro" JSON: "json" - NO_VALIDATION: None \ No newline at end of file + NO_VALIDATION: None + \ No newline at end of file From 2390927a9262f6363eb116c7c76897148302bd74 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:13:07 -0600 Subject: [PATCH 10/19] "removed extra json import" --- memphis/memphis.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index d47e991..b5ff2c3 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -988,8 +988,6 @@ def default(self, obj): return json.JSONEncoder.default(self, obj) def lambda_handler(event): - import json - processed_events = {} processed_events["successfullMessages"] = [] processed_events["errorMessages"] = [] From 4f5de9c5d49a6addba365c82853e07c5338308fb Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:17:54 -0600 Subject: [PATCH 11/19] "removed extra string add" --- memphis/memphis.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index b5ff2c3..2063536 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -1002,8 +1002,7 @@ def lambda_handler(event): "payload": processed_message }) else: - err_msg = "The returned processed_message or processed_headers were not in the right format. " - + "processed_message must be bytes and processed_headers, dict" + err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" raise Exception(err_msg) except Exception as e: From 5090b6c55ebad330238b3762e4d64e5e35b6ad7e Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:21:46 -0600 Subject: [PATCH 12/19] "trying to make pylint happy by removing white space and renaming some parameters" --- memphis/memphis.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 2063536..6aedcaa 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -946,7 +946,7 @@ def unset_cached_consumer_station(self, station_name): del self.consumers_map[key] except Exception as e: raise e - + def create_function( self, event, @@ -982,11 +982,11 @@ def create_function( Return the result of this function in the Lambda handler. """ class EncodeBase64(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, bytes): - return str(base64.b64encode(obj), encoding='utf-8') - return json.JSONEncoder.default(self, obj) - + def default(self, o): + if isinstance(o, bytes): + return str(base64.b64encode(o), encoding='utf-8') + return json.JSONEncoder.default(self, o) + def lambda_handler(event): processed_events = {} processed_events["successfullMessages"] = [] @@ -995,7 +995,7 @@ def lambda_handler(event): try: payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) processed_message, processed_headers = user_func(payload, message['headers']) - + if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): processed_events["successfullMessages"].append({ "headers": processed_headers, @@ -1011,7 +1011,7 @@ def lambda_handler(event): "payload": message["payload"], "error": str(e) }) - + try: return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') except Exception as e: From 1f9b08a5c93f0dc9be9c97ed72bb9516d4415419 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:23:01 -0600 Subject: [PATCH 13/19] "removed unused import" --- memphis/memphis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 6aedcaa..3e0629b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -22,7 +22,6 @@ import base64 import re import warnings -import dill import nats as broker from google.protobuf import descriptor_pb2, descriptor_pool From 736e4a799fa2831d088aa58127f753ed35914083 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:26:20 -0600 Subject: [PATCH 14/19] "removed unused schema import and removed functions.py example that is not being used" --- examples/functions.py | 8 -------- memphis/memphis.py | 2 +- 2 files changed, 1 insertion(+), 9 deletions(-) delete mode 100644 examples/functions.py diff --git a/examples/functions.py b/examples/functions.py deleted file mode 100644 index 4dba43e..0000000 --- a/examples/functions.py +++ /dev/null @@ -1,8 +0,0 @@ -from memphis import memphis, MemphisConnectError, MemphisError, MemphisHeaderError -from memphis.message import Message - -# JSON validated or no schema -def default_function(headers: dict, payload: dict): - pass - -# def protobuf_function(headers: dict, payload: ) \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index 3e0629b..745826b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -32,7 +32,7 @@ from memphis.headers import Headers from memphis.producer import Producer from memphis.station import Station -from memphis.types import Retention, Storage, Schema +from memphis.types import Retention, Storage from memphis.utils import get_internal_name, random_bytes from memphis.partition_generator import PartitionGenerator From c965c6d2c92b706a9e6b96cffecfa5a57685b908 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:31:07 -0600 Subject: [PATCH 15/19] "moved my function to a new file" --- README.md | 2 +- memphis/functions.py | 73 ++++++++++++++++++++++++++++++++++++++++++++ memphis/memphis.py | 71 ------------------------------------------ 3 files changed, 74 insertions(+), 72 deletions(-) create mode 100644 memphis/functions.py diff --git a/README.md b/README.md index 13b1581..e1b72b8 100644 --- a/README.md +++ b/README.md @@ -928,7 +928,7 @@ The create_function utility function is used to take a function that the user cr ```python import json import base64 -from memphis.Memphis import create_function +from memphis.functions import create_function def lambda_handler(event, context): return create_function(event, user_func = modify_message) diff --git a/memphis/functions.py b/memphis/functions.py new file mode 100644 index 0000000..d9f2c87 --- /dev/null +++ b/memphis/functions.py @@ -0,0 +1,73 @@ +import json +import base64 + +def create_function( + event, + user_func: callable +) -> None: + """ + This function handles a batch of messages and processes them with the passed-in user function. + + Args: + event (dict): + The Lambda event that is provided to the function Lambda is calling, usually named `lambda_handler`. + For more information, refer to: https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html + user_func (callable): + `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. + This function will modify the payload and headers and return them in the modified format. + + Args: + payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. + headers (dict): The headers associated with the Memphis message. + + Returns: + modified_message (bytes): The modified message must be encoded into bytes before being returned from the `user_func`. + modified_headers (dict): The headers will be passed in and returned as a Python dictionary. + + Raises: + Error: + Raises an exception of any kind when something goes wrong with processing a message. + The unprocessed message and the exception will be appended to the list of failed messages that is returned to the broker. + + Returns: + lambda_handler (callable): + A function that handles the batching of data and manages the returned messages. + Return the result of this function in the Lambda handler. + """ + class EncodeBase64(json.JSONEncoder): + def default(self, o): + if isinstance(o, bytes): + return str(base64.b64encode(o), encoding='utf-8') + return json.JSONEncoder.default(self, o) + + def lambda_handler(event): + processed_events = {} + processed_events["successfullMessages"] = [] + processed_events["errorMessages"] = [] + for message in event["messages"]: + try: + payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) + processed_message, processed_headers = user_func(payload, message['headers']) + + if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): + processed_events["successfullMessages"].append({ + "headers": processed_headers, + "payload": processed_message + }) + else: + err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" + raise Exception(err_msg) + + except Exception as e: + processed_events["errorMessages"].append({ + "headers": message["headers"], + "payload": message["payload"], + "error": str(e) + }) + + try: + return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') + except Exception as e: + return f"Returned message types from user function are not able to be converted into JSON: {e}" + + return lambda_handler(event) \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index 745826b..828c86b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -946,74 +946,3 @@ def unset_cached_consumer_station(self, station_name): except Exception as e: raise e - def create_function( - self, - event, - user_func: callable - ) -> None: - """ - This function handles a batch of messages and processes them with the passed-in user function. - - Args: - event (dict): - The Lambda event that is provided to the function Lambda is calling, usually named `lambda_handler`. - For more information, refer to: https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html - user_func (callable): - `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. - This function will modify the payload and headers and return them in the modified format. - - Args: - payload (bytes): The payload of the message. It will be encoded as bytes, and the user can assume UTF-8 encoding. - headers (dict): The headers associated with the Memphis message. - - Returns: - modified_message (bytes): The modified message must be encoded into bytes before being returned from the `user_func`. - modified_headers (dict): The headers will be passed in and returned as a Python dictionary. - - Raises: - Error: - Raises an exception of any kind when something goes wrong with processing a message. - The unprocessed message and the exception will be appended to the list of failed messages that is returned to the broker. - - Returns: - lambda_handler (callable): - A function that handles the batching of data and manages the returned messages. - Return the result of this function in the Lambda handler. - """ - class EncodeBase64(json.JSONEncoder): - def default(self, o): - if isinstance(o, bytes): - return str(base64.b64encode(o), encoding='utf-8') - return json.JSONEncoder.default(self, o) - - def lambda_handler(event): - processed_events = {} - processed_events["successfullMessages"] = [] - processed_events["errorMessages"] = [] - for message in event["messages"]: - try: - payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) - processed_message, processed_headers = user_func(payload, message['headers']) - - if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): - processed_events["successfullMessages"].append({ - "headers": processed_headers, - "payload": processed_message - }) - else: - err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" - raise Exception(err_msg) - - except Exception as e: - processed_events["errorMessages"].append({ - "headers": message["headers"], - "payload": message["payload"], - "error": str(e) - }) - - try: - return json.dumps(processed_events, cls=EncodeBase64).encode('utf-8') - except Exception as e: - return f"Returned message types from user function are not able to be converted into JSON: {e}" - - return lambda_handler(event) From fa859267cfdbdd104bb15aa7c1a80afbe028a861 Mon Sep 17 00:00:00 2001 From: John Peters Date: Thu, 9 Nov 2023 11:33:07 -0600 Subject: [PATCH 16/19] I left some new lines in memphis.py and didn't add one in functions... --- memphis/functions.py | 2 +- memphis/memphis.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/memphis/functions.py b/memphis/functions.py index d9f2c87..2b01329 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -70,4 +70,4 @@ def lambda_handler(event): except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" - return lambda_handler(event) \ No newline at end of file + return lambda_handler(event) diff --git a/memphis/memphis.py b/memphis/memphis.py index 828c86b..7314a4b 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -945,4 +945,3 @@ def unset_cached_consumer_station(self, station_name): del self.consumers_map[key] except Exception as e: raise e - From 7172f20551a4b920e21108929206c9bc65daf502 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 12 Nov 2023 09:46:18 +0200 Subject: [PATCH 17/19] fixes --- README.md | 24 +++++++++++++++--------- memphis/functions.py | 34 +++++++++++++++++----------------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index e1b72b8..40105bc 100644 --- a/README.md +++ b/README.md @@ -920,8 +920,11 @@ consumer.destroy() memphis.is_connected() ``` -### Using the create_function utility function -The create_function utility function is used to take a function that the user creates to process one message, into one which processes a batch of messages. When writing a Memphis Function, the logic in the main python file is expected to look like this example, with a lambda_handler, the user function and a call to create_function: +### Creating a Memphis function +Utility for creating Memphis functions. +event_handler gets the message payload as bytes and message headers as a dict and should return the modified payload and headers.
+An exception should be raised if the message should be considered failed and go into the dead-letter station.
+if all returned values are None the message will be filtered out from the station. > Make sure to encode the bytes object that you will return with utf-8! @@ -930,13 +933,16 @@ import json import base64 from memphis.functions import create_function -def lambda_handler(event, context): - return create_function(event, user_func = modify_message) +def handler(event, context): # this function name should be passed to the memphis.yaml file in the handler section + return create_function(event, user_func = event_handler) -def modify_message(message_payload): - payload = str(message_payload, 'utf-8') - as_json = json.loads(payload) - as_json['modified'] = True +def event_handler(msg_payload, msg_headers): + try: + payload = str(msg_payload, 'utf-8') + as_json = json.loads(payload) + as_json['modified'] = True - return bytes(json.dumps(as_json), encoding='utf-8') + return bytes(json.dumps(as_json), encoding='utf-8'), msg_headers + except Exception as e: + raise e ``` \ No newline at end of file diff --git a/memphis/functions.py b/memphis/functions.py index 2b01329..afec2dc 100644 --- a/memphis/functions.py +++ b/memphis/functions.py @@ -3,17 +3,16 @@ def create_function( event, - user_func: callable + event_handler: callable ) -> None: """ - This function handles a batch of messages and processes them with the passed-in user function. + This function creates a Memphis function and processes events with the passed-in event_handler function. Args: event (dict): - The Lambda event that is provided to the function Lambda is calling, usually named `lambda_handler`. - For more information, refer to: https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html - user_func (callable): - `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. + The event that is provided to the functionby Memphis. + event_handler (callable): + `create_function` assumes the function signature is in the format: (payload, headers) -> processed_payload, processed_headers. This function will modify the payload and headers and return them in the modified format. Args: @@ -27,12 +26,11 @@ def create_function( Raises: Error: Raises an exception of any kind when something goes wrong with processing a message. - The unprocessed message and the exception will be appended to the list of failed messages that is returned to the broker. + The unprocessed message and the exception will be sent to the dead-letter station. Returns: - lambda_handler (callable): - A function that handles the batching of data and manages the returned messages. - Return the result of this function in the Lambda handler. + handler (callable): + The Memphis function handler """ class EncodeBase64(json.JSONEncoder): def default(self, o): @@ -40,26 +38,28 @@ def default(self, o): return str(base64.b64encode(o), encoding='utf-8') return json.JSONEncoder.default(self, o) - def lambda_handler(event): + def handler(event): processed_events = {} - processed_events["successfullMessages"] = [] - processed_events["errorMessages"] = [] + processed_events["messages"] = [] + processed_events["failed_messages"] = [] for message in event["messages"]: try: payload = base64.b64decode(bytes(message['payload'], encoding='utf-8')) - processed_message, processed_headers = user_func(payload, message['headers']) + processed_message, processed_headers = event_handler(payload, message['headers']) if isinstance(processed_message, bytes) and isinstance(processed_headers, dict): - processed_events["successfullMessages"].append({ + processed_events["messages"].append({ "headers": processed_headers, "payload": processed_message }) + elif processed_message is None and processed_headers is None: # filter out empty messages + continue else: err_msg = "The returned processed_message or processed_headers were not in the right format. processed_message must be bytes and processed_headers, dict" raise Exception(err_msg) except Exception as e: - processed_events["errorMessages"].append({ + processed_events["failed_messages"].append({ "headers": message["headers"], "payload": message["payload"], "error": str(e) @@ -70,4 +70,4 @@ def lambda_handler(event): except Exception as e: return f"Returned message types from user function are not able to be converted into JSON: {e}" - return lambda_handler(event) + return handler(event) From 37b7e80c9fcf915bc0ccf81afda536b113df1e07 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 12 Nov 2023 09:48:47 +0200 Subject: [PATCH 18/19] remove unused code --- memphis/types.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/memphis/types.py b/memphis/types.py index 7ab1b13..4e7a142 100644 --- a/memphis/types.py +++ b/memphis/types.py @@ -24,11 +24,4 @@ class Retention(Enum): class Storage(Enum): DISK = "file" - MEMORY = "memory" - -class Schema(Enum): - PROTOBUF: "protobuf" - AVRO: "avro" - JSON: "json" - NO_VALIDATION: None - \ No newline at end of file + MEMORY = "memory" \ No newline at end of file From faccb45dde0f51df3e0257189918dcfa4b8b3adc Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 12 Nov 2023 09:51:48 +0200 Subject: [PATCH 19/19] pylint --- memphis/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memphis/types.py b/memphis/types.py index 4e7a142..79fad6a 100644 --- a/memphis/types.py +++ b/memphis/types.py @@ -24,4 +24,4 @@ class Retention(Enum): class Storage(Enum): DISK = "file" - MEMORY = "memory" \ No newline at end of file + MEMORY = "memory"