diff --git a/rdl/data_sources/AWSLambdaDataSource.py b/rdl/data_sources/AWSLambdaDataSource.py index 1c7254f..edd048d 100644 --- a/rdl/data_sources/AWSLambdaDataSource.py +++ b/rdl/data_sources/AWSLambdaDataSource.py @@ -5,7 +5,7 @@ from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo from rdl.data_sources.SourceTableInfo import SourceTableInfo -from rdl.shared import Providers +from rdl.shared import Providers, Constants from rdl.shared.Utils import prevent_senstive_data_logging @@ -148,38 +148,45 @@ def __get_data_frame(self, data: [[]], column_names: []): return pandas.DataFrame(data=data, columns=column_names) def __invoke_lambda(self, pay_load): - self.logger.debug("\nRequest being sent to Lambda:") - self.logger.debug(pay_load) - - lambda_response = self.aws_lambda_client.invoke( - FunctionName=self.connection_data["function"], - InvocationType="RequestResponse", - LogType="None", # |'Tail', Set to Tail to include the execution log in the response - Payload=json.dumps(pay_load).encode(), - ) - - response_status_code = int(lambda_response["StatusCode"]) - response_function_error = lambda_response.get("FunctionError") - self.logger.debug("\nResponse received from Lambda:") - self.logger.debug(f'Response - StatusCode = "{response_status_code}"') - self.logger.debug(f'Response - FunctionError = "{response_function_error}"') - - response_payload = json.loads(lambda_response["Payload"].read()) - - if response_status_code != 200 or response_function_error: - self.logger.error( - f'Error in response from aws lambda {self.connection_data["function"]}' - ) - self.logger.error(f"Response - Status Code = {response_status_code}") - self.logger.error(f"Response - Error Function = {response_function_error}") - self.logger.error(f"Response - Error Details:") - # the below is risky as it may contain actual data if this line is reached in case of a successful result - # however, the same Payload field is used to return actual error details in case of real errors - # i.e. StatusCode is 200 (since AWS could invoke the lambda) - # BUT the lambda barfed with an error and therefore the FunctionError would not be None - self.logger.error(response_payload) - raise Exception( - "Error received when invoking AWS Lambda. See logs for further details." + max_attempts = Constants.MAX_AWS_LAMBDA_INVOKATION_ATTEMPTS + response_payload = None + + for current_attempt in list(range(1, max_attempts+1, 1)): + self.logger.debug(f"\nRequest being sent to Lambda, attempt {current_attempt} of {max_attempts}:") + self.logger.debug(pay_load) + + lambda_response = self.aws_lambda_client.invoke( + FunctionName=self.connection_data["function"], + InvocationType="RequestResponse", + LogType="None", # |'Tail', Set to Tail to include the execution log in the response + Payload=json.dumps(pay_load).encode(), ) + response_status_code = int(lambda_response["StatusCode"]) + response_function_error = lambda_response.get("FunctionError") + self.logger.debug(f"\nResponse received from Lambda, attempt {current_attempt} of {max_attempts}:") + self.logger.debug(f'Response - StatusCode = "{response_status_code}"') + self.logger.debug(f'Response - FunctionError = "{response_function_error}"') + + response_payload = json.loads(lambda_response["Payload"].read()) + + if response_status_code != 200 or response_function_error: + self.logger.error( + f'Error in response from aws lambda \'{self.connection_data["function"]}\', ' + f'attempt {current_attempt} of {max_attempts}' + ) + self.logger.error(f"Response - Status Code = {response_status_code}") + self.logger.error(f"Response - Error Function = {response_function_error}") + self.logger.error(f"Response - Error Details:") + # the below is risky as it may contain actual data if this line is reached in case of success + # however, the same Payload field is used to return actual error details in case of failure + # i.e. StatusCode is 200 (since AWS could invoke the lambda) + # BUT the lambda barfed with an error and therefore the FunctionError would not be None + self.logger.error(response_payload) + + if current_attempt >= max_attempts: + raise Exception( + "Error received when invoking AWS Lambda. See logs for further details." + ) + return response_payload diff --git a/rdl/shared/Constants.py b/rdl/shared/Constants.py index 7aac6b5..a8b1309 100644 --- a/rdl/shared/Constants.py +++ b/rdl/shared/Constants.py @@ -1,5 +1,6 @@ APP_NAME = "Relational Data Loader" DATA_PIPELINE_EXECUTION_SCHEMA_NAME = "rdl" +MAX_AWS_LAMBDA_INVOKATION_ATTEMPTS = 3 class FullRefreshReason: