/
runtime_environment.py
299 lines (263 loc) 路 12.2 KB
/
runtime_environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import binascii
import logging
import os
import random
import string
import time
from datetime import date, datetime
from enum import Enum, auto
from threading import RLock, Timer
from typing import TYPE_CHECKING, Dict, Literal, Optional
from localstack import config
from localstack.aws.api.lambda_ import TracingMode
from localstack.aws.connect import connect_to
from localstack.services.awslambda.invocation.executor_endpoint import ServiceEndpoint
from localstack.services.awslambda.invocation.lambda_models import Credentials, FunctionVersion
from localstack.services.awslambda.invocation.runtime_executor import (
RuntimeExecutor,
get_runtime_executor,
)
from localstack.utils.strings import to_str
if TYPE_CHECKING:
from localstack.services.awslambda.invocation.version_manager import QueuedInvocation
STARTUP_TIMEOUT_SEC = config.LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT
HEX_CHARS = [str(num) for num in range(10)] + ["a", "b", "c", "d", "e", "f"]
LOG = logging.getLogger(__name__)
class RuntimeStatus(Enum):
INACTIVE = auto()
STARTING = auto()
READY = auto()
RUNNING = auto()
FAILED = auto()
STOPPED = auto()
InitializationType = Literal["on-demand", "provisioned-concurrency"]
class InvalidStatusException(Exception):
def __init__(self, message: str):
super().__init__(message)
def generate_runtime_id() -> str:
return "".join(random.choices(string.hexdigits[:16], k=32)).lower()
# TODO: add status callback
class RuntimeEnvironment:
runtime_executor: RuntimeExecutor
status_lock: RLock
status: RuntimeStatus
initialization_type: InitializationType
last_returned: datetime
startup_timer: Optional[Timer]
keepalive_timer: Optional[Timer]
def __init__(
self,
function_version: FunctionVersion,
initialization_type: InitializationType,
service_endpoint: ServiceEndpoint,
):
self.id = generate_runtime_id()
self.status = RuntimeStatus.INACTIVE
self.status_lock = RLock()
self.function_version = function_version
self.initialization_type = initialization_type
self.runtime_executor = get_runtime_executor()(
self.id, function_version, service_endpoint=service_endpoint
)
self.last_returned = datetime.min
self.startup_timer = None
self.keepalive_timer = Timer(0, lambda *args, **kwargs: None)
def get_log_group_name(self) -> str:
return f"/aws/lambda/{self.function_version.id.function_name}"
def get_log_stream_name(self) -> str:
return f"{date.today():%Y/%m/%d}/[{self.function_version.id.qualifier}]{self.id}"
def get_environment_variables(self) -> Dict[str, str]:
"""
Returns the environment variable set for the runtime container
:return: Dict of environment variables
"""
credentials = self.get_credentials()
env_vars = {
# 1) Public AWS defined runtime environment variables (in same order):
# https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html
# a) Reserved environment variables
# _HANDLER conditionally added below
# TODO: _X_AMZN_TRACE_ID
"AWS_DEFAULT_REGION": self.function_version.id.region,
"AWS_REGION": self.function_version.id.region,
# AWS_EXECUTION_ENV conditionally added below
"AWS_LAMBDA_FUNCTION_NAME": self.function_version.id.function_name,
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE": self.function_version.config.memory_size,
"AWS_LAMBDA_FUNCTION_VERSION": self.function_version.id.qualifier,
"AWS_LAMBDA_INITIALIZATION_TYPE": self.initialization_type,
"AWS_LAMBDA_LOG_GROUP_NAME": self.get_log_group_name(),
"AWS_LAMBDA_LOG_STREAM_NAME": self.get_log_stream_name(),
# Access IDs for role
"AWS_ACCESS_KEY_ID": credentials["AccessKeyId"],
"AWS_SECRET_ACCESS_KEY": credentials["SecretAccessKey"],
"AWS_SESSION_TOKEN": credentials["SessionToken"],
# AWS_LAMBDA_RUNTIME_API is set in the runtime interface emulator (RIE)
"LAMBDA_TASK_ROOT": "/var/task",
"LAMBDA_RUNTIME_DIR": "/var/runtime",
# b) Unreserved environment variables
# LANG
# LD_LIBRARY_PATH
# NODE_PATH
# PYTHONPATH
# GEM_PATH
"AWS_XRAY_CONTEXT_MISSING": "LOG_ERROR",
# TODO: allow configuration of xray address
"AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1:2000",
# not 100% sure who sets these two
# extensions are not supposed to have them in their envs => TODO: test if init removes them
"_AWS_XRAY_DAEMON_PORT": "2000",
"_AWS_XRAY_DAEMON_ADDRESS": "127.0.0.1",
# AWS_LAMBDA_DOTNET_PREJIT
"TZ": ":UTC",
# 2) Public AWS RIE interface: https://github.com/aws/aws-lambda-runtime-interface-emulator
"AWS_LAMBDA_FUNCTION_TIMEOUT": self.function_version.config.timeout,
# 3) Public LocalStack endpoint
"LOCALSTACK_HOSTNAME": self.runtime_executor.get_endpoint_from_executor(),
"EDGE_PORT": str(config.EDGE_PORT),
"AWS_ENDPOINT_URL": f"http://{self.runtime_executor.get_endpoint_from_executor()}:{config.EDGE_PORT}",
# 4) Internal LocalStack runtime API
"LOCALSTACK_RUNTIME_ID": self.id,
"LOCALSTACK_RUNTIME_ENDPOINT": self.runtime_executor.get_runtime_endpoint(),
# used by the init to spawn the x-ray daemon
# LOCALSTACK_USER conditionally added below
}
# Conditionally added environment variables
# config.handler is None for image lambdas and will be populated at runtime (e.g., by RIE)
if self.function_version.config.handler:
env_vars["_HANDLER"] = self.function_version.config.handler
# Not defined for custom runtimes (e.g., provided, provided.al2)
if self.function_version.config.runtime:
env_vars["AWS_EXECUTION_ENV"] = f"Aws_Lambda_{self.function_version.config.runtime}"
if self.function_version.config.environment:
env_vars.update(self.function_version.config.environment)
if config.LAMBDA_INIT_DEBUG:
# Disable dropping privileges because it breaks debugging
env_vars["LOCALSTACK_USER"] = ""
if config.LAMBDA_INIT_USER:
env_vars["LOCALSTACK_USER"] = config.LAMBDA_INIT_USER
if config.DEBUG:
env_vars["LOCALSTACK_INIT_LOG_LEVEL"] = "debug"
if config.LAMBDA_INIT_POST_INVOKE_WAIT_MS:
env_vars["LOCALSTACK_POST_INVOKE_WAIT_MS"] = int(config.LAMBDA_INIT_POST_INVOKE_WAIT_MS)
return env_vars
# Lifecycle methods
def start(self) -> None:
"""
Starting the runtime environment
"""
with self.status_lock:
if self.status != RuntimeStatus.INACTIVE:
raise InvalidStatusException("Runtime Handler can only be started when inactive")
self.status = RuntimeStatus.STARTING
try:
self.runtime_executor.start(self.get_environment_variables())
except Exception:
self.errored()
raise
self.startup_timer = Timer(STARTUP_TIMEOUT_SEC, self.timed_out)
self.startup_timer.start()
def stop(self) -> None:
"""
Stopping the runtime environment
"""
with self.status_lock:
if self.status in [RuntimeStatus.INACTIVE, RuntimeStatus.STOPPED]:
raise InvalidStatusException("Runtime Handler cannot be shutdown before started")
self.runtime_executor.stop()
self.status = RuntimeStatus.STOPPED
self.keepalive_timer.cancel()
# Status methods
def set_ready(self) -> None:
with self.status_lock:
if self.status != RuntimeStatus.STARTING:
raise InvalidStatusException(
f"Runtime Handler can only be set active while starting. Current status: {self.status}"
)
self.status = RuntimeStatus.READY
if self.startup_timer:
self.startup_timer.cancel()
self.startup_timer = None
def invocation_done(self) -> None:
self.last_returned = datetime.now()
with self.status_lock:
if self.status != RuntimeStatus.RUNNING:
raise InvalidStatusException("Runtime Handler can only be set ready while running")
self.status = RuntimeStatus.READY
if self.initialization_type == "on-demand":
self.keepalive_timer = Timer(
config.LAMBDA_KEEPALIVE_MS / 1000, self.keepalive_passed
)
self.keepalive_timer.start()
def keepalive_passed(self) -> None:
LOG.debug(
"Executor %s for function %s hasn't received any invocations in a while. Stopping.",
self.id,
self.function_version.qualified_arn,
)
self.stop()
def timed_out(self) -> None:
LOG.warning(
"Executor %s for function %s timed out during startup",
self.id,
self.function_version.qualified_arn,
)
self.startup_timer = None
self.errored()
def errored(self) -> None:
with self.status_lock:
if self.status != RuntimeStatus.STARTING:
raise InvalidStatusException("Runtime Handler can only error while starting")
self.status = RuntimeStatus.FAILED
if self.startup_timer:
self.startup_timer.cancel()
try:
self.runtime_executor.stop()
except Exception:
LOG.debug("Unable to shutdown runtime handler '%s'", self.id)
def invoke(self, invocation_event: "QueuedInvocation") -> None:
with self.status_lock:
if self.status != RuntimeStatus.READY:
raise InvalidStatusException("Invoke can only happen if status is ready")
self.status = RuntimeStatus.RUNNING
self.keepalive_timer.cancel()
invoke_payload = {
"invoke-id": invocation_event.invocation.request_id, # TODO: rename to request-id
"invoked-function-arn": invocation_event.invocation.invoked_arn,
"payload": to_str(invocation_event.invocation.payload),
"trace-id": self._generate_trace_header(),
}
self.runtime_executor.invoke(payload=invoke_payload)
def get_credentials(self) -> Credentials:
sts_client = connect_to().sts.request_metadata(service_principal="lambda")
# TODO we should probably set a maximum alive duration for environments, due to the session expiration
return sts_client.assume_role(
RoleArn=self.function_version.config.role,
RoleSessionName=self.function_version.id.function_name,
DurationSeconds=43200,
)["Credentials"]
def _generate_trace_id(self):
"""https://docs.aws.amazon.com/xray/latest/devguide/xray-api-sendingdata.html#xray-api-traceids"""
# TODO: add test for start time
original_request_epoch = int(time.time())
timestamp_hex = hex(original_request_epoch)[2:]
version_number = "1"
unique_id = binascii.hexlify(os.urandom(12)).decode("utf-8")
return f"{version_number}-{timestamp_hex}-{unique_id}"
def _generate_trace_header(self):
"""
https://docs.aws.amazon.com/lambda/latest/dg/services-xray.html
"The sampling rate is 1 request per second and 5 percent of additional requests."
Currently we implement a simpler, more predictable strategy.
If TracingMode is "Active", we always sample the request. (Sampled=1)
TODO: implement passive tracing
TODO: use xray sdk here
"""
if self.function_version.config.tracing_config_mode == TracingMode.Active:
sampled = "1"
else:
sampled = "0"
root_trace_id = self._generate_trace_id()
parent = binascii.b2a_hex(os.urandom(8)).decode(
"utf-8"
) # TODO: segment doesn't actually exist at the moment
return f"Root={root_trace_id};Parent={parent};Sampled={sampled}"