Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 40 additions & 33 deletions rdl/data_sources/AWSLambdaDataSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
from rdl.data_sources.SourceTableInfo import SourceTableInfo
from rdl.shared import Providers
from rdl.shared import Providers, Constants
from rdl.shared.Utils import prevent_senstive_data_logging


Expand Down Expand Up @@ -148,38 +148,45 @@ def __get_data_frame(self, data: [[]], column_names: []):
return pandas.DataFrame(data=data, columns=column_names)

def __invoke_lambda(self, pay_load):
self.logger.debug("\nRequest being sent to Lambda:")
self.logger.debug(pay_load)

lambda_response = self.aws_lambda_client.invoke(
FunctionName=self.connection_data["function"],
InvocationType="RequestResponse",
LogType="None", # |'Tail', Set to Tail to include the execution log in the response
Payload=json.dumps(pay_load).encode(),
)

response_status_code = int(lambda_response["StatusCode"])
response_function_error = lambda_response.get("FunctionError")
self.logger.debug("\nResponse received from Lambda:")
self.logger.debug(f'Response - StatusCode = "{response_status_code}"')
self.logger.debug(f'Response - FunctionError = "{response_function_error}"')

response_payload = json.loads(lambda_response["Payload"].read())

if response_status_code != 200 or response_function_error:
self.logger.error(
f'Error in response from aws lambda {self.connection_data["function"]}'
)
self.logger.error(f"Response - Status Code = {response_status_code}")
self.logger.error(f"Response - Error Function = {response_function_error}")
self.logger.error(f"Response - Error Details:")
# the below is risky as it may contain actual data if this line is reached in case of a successful result
# however, the same Payload field is used to return actual error details in case of real errors
# i.e. StatusCode is 200 (since AWS could invoke the lambda)
# BUT the lambda barfed with an error and therefore the FunctionError would not be None
self.logger.error(response_payload)
raise Exception(
"Error received when invoking AWS Lambda. See logs for further details."
max_attempts = Constants.MAX_AWS_LAMBDA_INVOKATION_ATTEMPTS
response_payload = None

for current_attempt in list(range(1, max_attempts+1, 1)):
self.logger.debug(f"\nRequest being sent to Lambda, attempt {current_attempt} of {max_attempts}:")
self.logger.debug(pay_load)

lambda_response = self.aws_lambda_client.invoke(
FunctionName=self.connection_data["function"],
InvocationType="RequestResponse",
LogType="None", # |'Tail', Set to Tail to include the execution log in the response
Payload=json.dumps(pay_load).encode(),
)

response_status_code = int(lambda_response["StatusCode"])
response_function_error = lambda_response.get("FunctionError")
self.logger.debug(f"\nResponse received from Lambda, attempt {current_attempt} of {max_attempts}:")
self.logger.debug(f'Response - StatusCode = "{response_status_code}"')
self.logger.debug(f'Response - FunctionError = "{response_function_error}"')

response_payload = json.loads(lambda_response["Payload"].read())

if response_status_code != 200 or response_function_error:
self.logger.error(
f'Error in response from aws lambda \'{self.connection_data["function"]}\', '
f'attempt {current_attempt} of {max_attempts}'
)
self.logger.error(f"Response - Status Code = {response_status_code}")
self.logger.error(f"Response - Error Function = {response_function_error}")
self.logger.error(f"Response - Error Details:")
# the below is risky as it may contain actual data if this line is reached in case of success
# however, the same Payload field is used to return actual error details in case of failure
# i.e. StatusCode is 200 (since AWS could invoke the lambda)
# BUT the lambda barfed with an error and therefore the FunctionError would not be None
self.logger.error(response_payload)

if current_attempt >= max_attempts:
raise Exception(
"Error received when invoking AWS Lambda. See logs for further details."
)

return response_payload
1 change: 1 addition & 0 deletions rdl/shared/Constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
APP_NAME = "Relational Data Loader"
DATA_PIPELINE_EXECUTION_SCHEMA_NAME = "rdl"
MAX_AWS_LAMBDA_INVOKATION_ATTEMPTS = 3


class FullRefreshReason:
Expand Down