Skip to content

Commit

Permalink
Fix Lambda async invocation queue namespace (#10831)
Browse files Browse the repository at this point in the history
  • Loading branch information
joe4dev committed May 24, 2024
1 parent 55051cd commit b8290ff
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 31 deletions.
6 changes: 3 additions & 3 deletions localstack/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,9 +763,9 @@ def populate_edge_configuration(
INTERNAL_RESOURCE_ACCOUNT = os.environ.get("INTERNAL_RESOURCE_ACCOUNT") or "949334387222"

# Determine which implementation to use for the event rule / event filtering engine used by multiple services:
# EventBridge, EventBridge Pipes, Lambda Event Source Mapping, SNS
# Options: provider (default) | java
EVENT_RULE_ENGINE = os.environ.get("EVENT_RULE_ENGINE", "").strip()
# EventBridge, EventBridge Pipes, Lambda Event Source Mapping
# Options: python (default) | java (preview)
EVENT_RULE_ENGINE = os.environ.get("EVENT_RULE_ENGINE", "python").strip()

# -----
# SERVICE-SPECIFIC CONFIGS BELOW
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ def start(self, env_vars: dict[str, str]) -> None:
container_config.dns = config.LAMBDA_DOCKER_DNS
else:
if dns_server.is_server_running():
# Enable transparent endpoint injection by setting DNS to the embedded DNS re-writer in the Go init.
# Set the container DNS to LocalStack to resolve localhost.localstack.cloud and
# enable transparent endpoint injection (Pro image only).
container_config.dns = self.get_endpoint_from_executor()

lambda_hooks.start_docker_executor.run(container_config, self.function_version)
Expand Down
9 changes: 6 additions & 3 deletions localstack/services/lambda_/invocation/event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,14 @@ def start(self) -> None:
function_id = self.version_manager.function_version.id
# Truncate function name to ensure queue name limit of max 80 characters
function_name_short = function_id.function_name[:47]
queue_name = f"{function_name_short}-{md5(function_id.qualified_arn())}"
# The instance id MUST be unique to the function and a given LocalStack instance
queue_namespace = (
f"{function_id.qualified_arn()}-{self.version_manager.function.instance_id}"
)
queue_name = f"{function_name_short}-{md5(queue_namespace)}"
create_queue_response = sqs_client.create_queue(QueueName=queue_name)
self.event_queue_url = create_queue_response["QueueUrl"]
# Ensure no events are in new queues due to persistence and cloud pods
sqs_client.purge_queue(QueueUrl=self.event_queue_url)
# We don't need to purge the queue for persistence or cloud pods because the instance id is MUST be unique

self.poller = Poller(self.version_manager, self.event_queue_url)
self.poller_thread = FuncThread(
Expand Down
21 changes: 20 additions & 1 deletion localstack/services/lambda_/invocation/lambda_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from localstack.constants import AWS_REGION_US_EAST_1
from localstack.services.lambda_.api_utils import qualified_lambda_arn, unqualified_lambda_arn
from localstack.utils.archives import unzip
from localstack.utils.strings import long_uid
from localstack.utils.strings import long_uid, short_uid

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -601,6 +601,25 @@ class Function:
def latest(self) -> FunctionVersion:
return self.versions["$LATEST"]

# HACK to model a volatile variable that should be ignored for persistence
def __post_init__(self):
# Identifier unique to this function and LocalStack instance.
# A LocalStack restart or persistence load should create a new instance id.
# Used for retaining invoke queues across version updates for $LATEST, but separate unrelated instances.
self.instance_id = short_uid()

def __getstate__(self):
"""Ignore certain volatile fields for pickling.
# https://docs.python.org/3/library/pickle.html#handling-stateful-objects
"""
# Copy the object's state from self.__dict__ which contains
# all our instance attributes. Always use the dict.copy()
# method to avoid modifying the original state.
state = self.__dict__.copy()
# Remove the volatile entries.
del state["instance_id"]
return state


class ValidationException(CommonServiceException):
def __init__(self, message: str):
Expand Down
6 changes: 0 additions & 6 deletions localstack/services/lambda_/invocation/version_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import threading
import time
from concurrent.futures import Future
from typing import TYPE_CHECKING

from localstack import config
from localstack.aws.api.lambda_ import (
Expand Down Expand Up @@ -33,9 +32,6 @@
from localstack.utils.strings import long_uid, truncate
from localstack.utils.threads import FuncThread, start_thread

if TYPE_CHECKING:
from localstack.services.lambda_.invocation.lambda_service import LambdaService

LOG = logging.getLogger(__name__)


Expand All @@ -53,8 +49,6 @@ class LambdaVersionManager:
state: VersionState | None
provisioned_state: ProvisionedConcurrencyState | None # TODO: remove?
log_handler: LogHandler
# TODO not sure about this backlink, maybe a callback is better?
lambda_service: "LambdaService"
counting_service: CountingService
assignment_service: AssignmentService

Expand Down
2 changes: 2 additions & 0 deletions localstack/services/lambda_/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,8 @@ def delete_function(
destroy_code_if_not_used(code=version.config.code, function=function)
else:
# delete the whole function
# TODO: introduce locking for safe deletion: We could create a new version at the API layer before
# the old version gets cleaned up in the internal lambda service.
function = store.functions.pop(function_name)
for version in function.versions.values():
self.lambda_service.stop_version(qualified_arn=version.id.qualified_arn())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
import os
import time

import boto3

s3 = boto3.client("s3", endpoint_url=os.environ.get("AWS_ENDPOINT_URL"))
S3_BUCKET_NAME = os.environ.get("S3_BUCKET_NAME")
# Configurable identifier to test function updates
FUNCTION_VARIANT = os.environ.get("FUNCTION_VARIANT")


def handler(event, context):
sleep_duration = int(event.get("sleep_seconds", 0))
if sleep_duration > 0:
print(f"Sleeping for {sleep_duration} seconds ...")
time.sleep(sleep_duration)
print("... done sleeping")

request_prefix = event.get("request_prefix")
response = {
"function_version": context.function_version,
"request_id": context.aws_request_id,
"request_prefix": request_prefix,
"function_variant": FUNCTION_VARIANT,
}

# The side effect is required to test async invokes
if S3_BUCKET_NAME:
s3_key = f"{request_prefix}--{FUNCTION_VARIANT}"
response["s3_key"] = s3_key
s3.put_object(Bucket=S3_BUCKET_NAME, Key=s3_key, Body=json.dumps(response))

return response
201 changes: 197 additions & 4 deletions tests/aws/services/lambda_/test_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from localstack.utils.strings import short_uid, to_bytes, to_str
from localstack.utils.sync import retry, wait_until
from localstack.utils.testutil import create_lambda_archive
from tests.aws.services.lambda_.utils import get_s3_keys

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,6 +75,9 @@
TEST_LAMBDA_AWS_PROXY = os.path.join(THIS_FOLDER, "functions/lambda_aws_proxy.py")
TEST_LAMBDA_AWS_PROXY_FORMAT = os.path.join(THIS_FOLDER, "functions/lambda_aws_proxy_format.py")
TEST_LAMBDA_PYTHON_S3_INTEGRATION = os.path.join(THIS_FOLDER, "functions/lambda_s3_integration.py")
TEST_LAMBDA_PYTHON_S3_INTEGRATION_FUNCTION_VERSION = os.path.join(
THIS_FOLDER, "functions/lambda_s3_integration_function_version.py"
)
TEST_LAMBDA_INTEGRATION_NODEJS = os.path.join(THIS_FOLDER, "functions/lambda_integration.js")
TEST_LAMBDA_NODEJS = os.path.join(THIS_FOLDER, "functions/lambda_handler.js")
TEST_LAMBDA_NODEJS_ES6 = os.path.join(THIS_FOLDER, "functions/lambda_handler_es6.mjs")
Expand Down Expand Up @@ -540,12 +544,10 @@ def test_mixed_architecture(self, create_lambda_function, aws_client, snapshot):
payload = json.load(invoke_result_x86["Payload"])
assert payload.get("platform_machine") == "x86_64"

update_function_configuration_response = aws_client.lambda_.update_function_code(
update_function_code_response = aws_client.lambda_.update_function_code(
FunctionName=func_name, ZipFile=zip_file, Architectures=[Architecture.arm64]
)
snapshot.match(
"update_function_configuration_response", update_function_configuration_response
)
snapshot.match("update_function_code_response", update_function_code_response)
aws_client.lambda_.get_waiter(waiter_name="function_updated_v2").wait(
FunctionName=func_name
)
Expand Down Expand Up @@ -1778,6 +1780,49 @@ def _invoke_function():

assert not errored

@markers.aws.validated
def test_recreate_function(
self, aws_client, create_lambda_function, check_lambda_logs, snapshot
):
"""Recreating a function with the same name should not cause any resource cleanup issues or namespace collisions
Reproduces a GitHub issue: https://github.com/localstack/localstack/issues/10280"""
function_name = f"test-function-{short_uid()}"
create_function_response_one = create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_PYTHON_ECHO,
handler="lambda_echo.handler",
runtime=Runtime.python3_12,
)
snapshot.match("create_function_response_one", create_function_response_one)

aws_client.lambda_.delete_function(FunctionName=function_name)

# Immediately re-create the same function
create_function_response_two = create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_PYTHON_ECHO,
handler="lambda_echo.handler",
runtime=Runtime.python3_12,
)
snapshot.match("create_function_response_two", create_function_response_two)

# Validate that async invokes still work
invoke_response = aws_client.lambda_.invoke(
FunctionName=function_name,
InvocationType="Event",
)
invoke_response = read_streams(invoke_response)
assert 202 == invoke_response["StatusCode"]

# Assert that the function gets invoked by checking the logs.
# This also ensures that we wait until the invocation is done before deleting the function.
expected = [".*{}"]

def check_logs():
check_lambda_logs(function_name, expected_lines=expected)

retry(check_logs, retries=15)


class TestLambdaMultiAccounts:
@pytest.fixture
Expand Down Expand Up @@ -2602,6 +2647,154 @@ def test_lambda_handler_update(self, aws_client, create_lambda_function, snapsho
"invoke_result_handler_two_postpublish", invoke_result_handler_two_postpublish
)

# TODO: Fix this test by not stopping running invokes for function updates of $LATEST
@pytest.mark.skip(
reason="""Fails with 'Internal error while executing lambda' because
the current implementation stops all running invokes upon update."""
)
@markers.aws.validated
def test_function_update_during_invoke(self, aws_client, create_lambda_function, snapshot):
function_name = f"test-function-{short_uid()}"
environment_v1 = {"Variables": {"FUNCTION_VARIANT": "variant-1"}}
create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_PYTHON_S3_INTEGRATION_FUNCTION_VERSION,
runtime=Runtime.python3_12,
Environment=environment_v1,
)

errored = False

def _update_function():
nonlocal errored
try:
# Make it very likely that the invocation is being processed because the incoming invocation acquires
# an invocation lease quickly.
time.sleep(5)

environment_v2 = environment_v1.copy()
environment_v2["Variables"]["FUNCTION_VARIANT"] = "variant-2"
aws_client.lambda_.update_function_configuration(
FunctionName=function_name, Environment=environment_v2
)
waiter = aws_client.lambda_.get_waiter("function_updated_v2")
waiter.wait(FunctionName=function_name)

payload = {"request_prefix": "2-post-update"}
invoke_response_after = aws_client.lambda_.invoke(
FunctionName=function_name,
Payload=json.dumps(payload),
)
assert invoke_response_after["StatusCode"] == 200
payload = json.load(invoke_response_after["Payload"])
assert payload["function_variant"] == "variant-2"
assert payload["function_version"] == "$LATEST"
except Exception:
LOG.exception(f"Updating lambda function {function_name} failed.")
errored = True

# Start thread with upcoming function update (slightly delayed)
thread = threading.Thread(target=_update_function)
thread.start()

# Start an invocation with a sleep
payload = {"request_prefix": "1-sleep", "sleep_seconds": 20}
invoke_response_before = aws_client.lambda_.invoke(
FunctionName=function_name,
Payload=json.dumps(payload),
)
snapshot.match("invoke_response_before", invoke_response_before)

thread.join()
assert not errored

# TODO: Fix first invoke getting retried and ending up being executed against the new variant because the
# update stops the running function version. We should let running executions finish for $LATEST in this case.
# MAYBE: consider validating whether a code update behaves differently than a configuration update
@markers.aws.validated
def test_async_invoke_queue_upon_function_update(
self, aws_client, create_lambda_function, s3_create_bucket, snapshot
):
"""Test what happens with queued async invokes (i.e., event invokes) when updating a function.
We are using a combination of reserved concurrency and sleeps to design this test case predictable.
Observation: If we don't wait after sending the first invoke, some queued invokes can still be handled by an
old variant in some non-deterministic way.
"""
# HACK: workaround to ignore `$..async_invoke_history_sorted[0]` because indices don't work in the ignore list
snapshot.add_transformer(
snapshot.transform.regex("01-sleep--variant-2", "01-sleep--variant-1")
)
bucket_name = f"lambda-target-bucket-{short_uid()}"
s3_create_bucket(Bucket=bucket_name)

function_name = f"test-function-{short_uid()}"
environment_v1 = {
"Variables": {"S3_BUCKET_NAME": bucket_name, "FUNCTION_VARIANT": "variant-1"}
}
create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_PYTHON_S3_INTEGRATION_FUNCTION_VERSION,
runtime=Runtime.python3_12,
Environment=environment_v1,
)
# Add reserved concurrency limits the throughput and makes it easier to cause event invokes to queue up.
reserved_concurrency_response = aws_client.lambda_.put_function_concurrency(
FunctionName=function_name,
ReservedConcurrentExecutions=1,
)
assert reserved_concurrency_response["ResponseMetadata"]["HTTPStatusCode"] == 200

payload = {"request_prefix": f"{1:02}-sleep", "sleep_seconds": 22}
aws_client.lambda_.invoke(
FunctionName=function_name,
InvocationType="Event",
Payload=json.dumps(payload),
)
# Make it very likely that the invocation is being processed because the Lambda poller should pick up queued
# async invokes quickly using long polling.
time.sleep(2)

# Send async invocation, which should queue up before we update the function
num_invocations_before = 9
for index in range(num_invocations_before):
payload = {"request_prefix": f"{index + 2:02}-before"}
aws_client.lambda_.invoke(
FunctionName=function_name,
InvocationType="Event",
Payload=json.dumps(payload),
)

# Update the function variant while still having invokes in the async invoke queue
environment_v2 = environment_v1.copy()
environment_v2["Variables"]["FUNCTION_VARIANT"] = "variant-2"
aws_client.lambda_.update_function_configuration(
FunctionName=function_name, Environment=environment_v2
)
waiter = aws_client.lambda_.get_waiter("function_updated_v2")
waiter.wait(FunctionName=function_name)

# Send further async invocations after the update succeeded
num_invocations_after = 5
for index in range(num_invocations_after):
payload = {"request_prefix": f"{index + num_invocations_before + 2:02}-after"}
aws_client.lambda_.invoke(
FunctionName=function_name,
InvocationType="Event",
Payload=json.dumps(payload),
)

# +1 for the first sleep invocation
total_invocations = 1 + num_invocations_before + num_invocations_after

def assert_s3_objects():
s3_keys_output = get_s3_keys(aws_client, bucket_name)
assert len(s3_keys_output) >= total_invocations
return s3_keys_output

s3_keys = retry(assert_s3_objects, retries=20, sleep=5)
s3_keys_sorted = sorted(s3_keys)
snapshot.match("async_invoke_history_sorted", s3_keys_sorted)


# TODO: test if routing is static for a single invocation:
# Do retries for an event invoke, take the same "path" for every retry?
Expand Down

0 comments on commit b8290ff

Please sign in to comment.