From 102cb323ad989c5df54437dbf86eea89a29cfb49 Mon Sep 17 00:00:00 2001 From: saartochner Date: Mon, 6 Jul 2020 19:02:45 +0300 Subject: [PATCH 1/6] faster http hooks --- .../parsers/http_data_classes.py | 6 ++-- src/lumigo_tracer/parsers/parser.py | 29 +++++++------------ src/lumigo_tracer/parsers/utils.py | 6 ++-- src/lumigo_tracer/spans_container.py | 5 ++-- src/lumigo_tracer/sync_http/sync_hook.py | 28 +++++++++++++++--- src/lumigo_tracer/utils.py | 10 ++++--- src/test/unit/parsers/test_parser.py | 26 +++++++---------- src/test/unit/parsers/test_utils.py | 2 +- src/test/unit/sync_http/test_sync_hook.py | 21 ++++++++++++-- 9 files changed, 79 insertions(+), 54 deletions(-) diff --git a/src/lumigo_tracer/parsers/http_data_classes.py b/src/lumigo_tracer/parsers/http_data_classes.py index 60a58775..49b92c0e 100644 --- a/src/lumigo_tracer/parsers/http_data_classes.py +++ b/src/lumigo_tracer/parsers/http_data_classes.py @@ -1,20 +1,18 @@ -from http import client from copy import deepcopy -from typing import Optional class HttpRequest: host: str method: str uri: str - headers: Optional[client.HTTPMessage] + headers: dict body: bytes def __init__(self, **kwargs): self.host = kwargs["host"] self.method = kwargs["method"] self.uri = kwargs["uri"] - self.headers = kwargs.get("headers") + self.headers = {k.lower(): v for k, v in (kwargs.get("headers") or {}).items()} self.body = kwargs.get("body") def clone(self, **kwargs): diff --git a/src/lumigo_tracer/parsers/parser.py b/src/lumigo_tracer/parsers/parser.py index 5ccf19f9..69665fd6 100644 --- a/src/lumigo_tracer/parsers/parser.py +++ b/src/lumigo_tracer/parsers/parser.py @@ -1,7 +1,6 @@ import uuid from typing import Type, Optional import time -import http.client from lumigo_tracer.parsers.utils import ( safe_split_get, @@ -35,9 +34,7 @@ class Parser: def parse_request(self, parse_params: HttpRequest) -> dict: if Configuration.verbose and parse_params and not should_scrub_domain(parse_params.host): additional_info = { - "headers": prepare_large_data( - dict(parse_params.headers.items() if parse_params.headers else {}) - ), + "headers": prepare_large_data(parse_params.headers), "body": prepare_large_data(parse_params.body), "method": parse_params.method, "uri": parse_params.uri, @@ -60,12 +57,10 @@ def parse_request(self, parse_params: HttpRequest) -> dict: "started": int(time.time() * 1000), } - def parse_response( - self, url: str, status_code: int, headers: Optional[http.client.HTTPMessage], body: bytes - ) -> dict: + def parse_response(self, url: str, status_code: int, headers: dict, body: bytes) -> dict: if Configuration.verbose and not should_scrub_domain(url): additional_info = { - "headers": prepare_large_data(dict(headers.items() if headers else {})), + "headers": prepare_large_data(headers), "body": prepare_large_data(body), "statusCode": status_code, } @@ -85,7 +80,7 @@ class ServerlessAWSParser(Parser): def parse_response(self, url: str, status_code: int, headers, body: bytes) -> dict: additional_info = {} - message_id = headers.get("x-amzn-RequestId") + message_id = headers.get("x-amzn-requestid") if message_id and self.should_add_message_id: additional_info["info"] = {"messageId": message_id} span_id = headers.get("x-amzn-requestid") or headers.get("x-amz-requestid") @@ -100,7 +95,7 @@ class DynamoParser(ServerlessAWSParser): should_add_message_id = False def parse_request(self, parse_params: HttpRequest) -> dict: - target: str = str(parse_params.headers.get("x-amz-target", "")) # type: ignore + target: str = parse_params.headers.get("x-amz-target", "") return recursive_json_join( { "info": { @@ -139,10 +134,8 @@ class LambdaParser(ServerlessAWSParser): def parse_request(self, parse_params: HttpRequest) -> dict: return recursive_json_join( { - "name": safe_split_get( - str(parse_params.headers.get("path", "")), "/", 3 # type: ignore - ), - "invocationType": parse_params.headers.get("x-amz-invocation-type"), # type: ignore + "name": safe_split_get(str(parse_params.headers.get("path", "")), "/", 3), + "invocationType": parse_params.headers.get("x-amz-invocation-type"), }, super().parse_request(parse_params), ) @@ -223,8 +216,8 @@ class ApiGatewayV2Parser(ServerlessAWSParser): # API-GW V1 covered by ServerlessAWSParser def parse_response(self, url: str, status_code: int, headers, body: bytes) -> dict: - aws_request_id = headers.get("x-amzn-RequestId") - apigw_request_id = headers.get("Apigw-Requestid") + aws_request_id = headers.get("x-amzn-requestid") + apigw_request_id = headers.get("apigw-requestid") message_id = aws_request_id or apigw_request_id return recursive_json_join( {"info": {"messageId": message_id}}, @@ -232,7 +225,7 @@ def parse_response(self, url: str, status_code: int, headers, body: bytes) -> di ) -def get_parser(url: str, headers: Optional[http.client.HTTPMessage] = None) -> Type[Parser]: +def get_parser(url: str, headers: Optional[dict] = None) -> Type[Parser]: service = safe_split_get(url, ".", 0) if service == "dynamodb": return DynamoParser @@ -249,6 +242,6 @@ def get_parser(url: str, headers: Optional[http.client.HTTPMessage] = None) -> T return SqsParser elif "execute-api" in url: return ApiGatewayV2Parser - elif url.endswith("amazonaws.com") or (headers and headers.get("x-amzn-RequestId")): + elif url.endswith("amazonaws.com") or (headers and headers.get("x-amzn-requestid")): return ServerlessAWSParser return Parser diff --git a/src/lumigo_tracer/parsers/utils.py b/src/lumigo_tracer/parsers/utils.py index 6ed52a21..f012b363 100644 --- a/src/lumigo_tracer/parsers/utils.py +++ b/src/lumigo_tracer/parsers/utils.py @@ -136,8 +136,10 @@ def recursive_json_join(d1: dict, d2: dict): * if key in d2 and is not dictionary, then the value is d2[key] * otherwise, join d1[key] and d2[key] """ + if d1 is None or d2 is None: + return d1 or d2 d = {} - for key in itertools.chain(d1.keys(), d2.keys()): + for key in set(itertools.chain(d1.keys(), d2.keys())): value = d1.get(key, d2.get(key)) if isinstance(value, dict): d[key] = recursive_json_join(d1.get(key, {}), d2.get(key, {})) @@ -293,7 +295,7 @@ def _parse_streams(event: dict) -> Dict[str, str]: def should_scrub_domain(url: str) -> bool: if url and Configuration.domains_scrubber: for regex in Configuration.domains_scrubber: - if re.match(regex, url, re.IGNORECASE): + if regex.match(url): return True return False diff --git a/src/lumigo_tracer/spans_container.py b/src/lumigo_tracer/spans_container.py index 41ded8cf..094e9eec 100644 --- a/src/lumigo_tracer/spans_container.py +++ b/src/lumigo_tracer/spans_container.py @@ -4,7 +4,6 @@ import uuid import signal import traceback -import http.client from typing import List, Dict, Tuple, Optional, Callable, Set from lumigo_tracer.parsers.event_parser import EventParser @@ -93,7 +92,7 @@ def __init__( }, self.base_msg, ) - self.previous_request: Tuple[Optional[http.client.HTTPMessage], bytes] = (None, b"") + self.previous_request: Tuple[Optional[dict], bytes] = (None, b"") self.previous_response_body: bytes = b"" self.http_span_ids_to_send: Set[str] = set() self.http_spans: List[Dict] = [] @@ -169,7 +168,7 @@ def update_event_end_time(self) -> None: self.http_spans[-1]["ended"] = int(time.time() * 1000) def update_event_response( - self, host: Optional[str], status_code: int, headers: http.client.HTTPMessage, body: bytes + self, host: Optional[str], status_code: int, headers: dict, body: bytes ) -> None: """ :param host: If None, use the host from the last span, otherwise this is the first chuck and we can empty diff --git a/src/lumigo_tracer/sync_http/sync_hook.py b/src/lumigo_tracer/sync_http/sync_hook.py index ddda25bb..f36cf728 100644 --- a/src/lumigo_tracer/sync_http/sync_hook.py +++ b/src/lumigo_tracer/sync_http/sync_hook.py @@ -27,6 +27,7 @@ CONTEXT_WRAPPED_BY_LUMIGO_KEY = "_wrapped_by_lumigo" MAX_READ_SIZE = 1024 already_wrapped = False +LUMIGO_HEADERS_HOOK_KEY = "_lumigo_headers_hook" def _request_wrapper(func, instance, args, kwargs): @@ -51,7 +52,12 @@ def _request_wrapper(func, instance, args, kwargs): with lumigo_safe_execute("parse request"): if isinstance(data, bytes) and _BODY_HEADER_SPLITTER in data: headers, body = data.split(_BODY_HEADER_SPLITTER, 1) - if _FLAGS_HEADER_SPLITTER in headers: + hooked_headers = getattr(instance, LUMIGO_HEADERS_HOOK_KEY, None) + if hooked_headers: + # we will get here only if _headers_reminder_wrapper ran first. remove its traces. + headers = dict(hooked_headers.items()) + setattr(instance, LUMIGO_HEADERS_HOOK_KEY, None) + elif _FLAGS_HEADER_SPLITTER in headers: request_info, headers = headers.split(_FLAGS_HEADER_SPLITTER, 1) headers = http.client.parse_headers(BytesIO(headers)) path_and_query_params = ( @@ -63,6 +69,8 @@ def _request_wrapper(func, instance, args, kwargs): ) uri = f"{host}{path_and_query_params}" host = host or headers.get("Host") + else: + headers = None with lumigo_safe_execute("add request event"): if headers: @@ -80,6 +88,15 @@ def _request_wrapper(func, instance, args, kwargs): return ret_val +def _headers_reminder_wrapper(func, instance, args, kwargs): + """ + This is the wrapper of the function `http.client.HTTPConnection.request` that gets the headers. + Remember the headers helps us to improve performances on requests that use this flow. + """ + setattr(instance, LUMIGO_HEADERS_HOOK_KEY, kwargs.get("headers")) + return func(*args, **kwargs) + + def _response_wrapper(func, instance, args, kwargs): """ This is the wrapper of the function that can be called only after that the http request was sent. @@ -87,7 +104,7 @@ def _response_wrapper(func, instance, args, kwargs): """ ret_val = func(*args, **kwargs) with lumigo_safe_execute("parse response"): - headers = ret_val.headers + headers = dict(ret_val.headers.items()) status_code = ret_val.code SpansContainer.get_span().update_event_response(instance.host, status_code, headers, b"") return ret_val @@ -101,7 +118,7 @@ def _read_wrapper(func, instance, args, kwargs): if ret_val: with lumigo_safe_execute("parse response.read"): SpansContainer.get_span().update_event_response( - None, instance.code, instance.headers, ret_val + None, instance.code, dict(instance.headers.items()), ret_val ) return ret_val @@ -115,7 +132,7 @@ def _read_stream_wrapper_generator(stream_generator, instance): for partial_response in stream_generator: with lumigo_safe_execute("parse response.read_chunked"): SpansContainer.get_span().update_event_response( - None, instance.status, instance.headers, partial_response + None, instance.status, dict(instance.headers.items()), partial_response ) yield partial_response @@ -281,6 +298,9 @@ def wrap_http_calls(): with lumigo_safe_execute("wrap http calls"): get_logger().debug("wrapping the http request") wrap_function_wrapper("http.client", "HTTPConnection.send", _request_wrapper) + wrap_function_wrapper( + "http.client", "HTTPConnection.request", _headers_reminder_wrapper + ) wrap_function_wrapper("botocore.awsrequest", "AWSRequest.__init__", _putheader_wrapper) wrap_function_wrapper("http.client", "HTTPConnection.getresponse", _response_wrapper) wrap_function_wrapper("http.client", "HTTPResponse.read", _read_wrapper) diff --git a/src/lumigo_tracer/utils.py b/src/lumigo_tracer/utils.py index 3c1c951d..3d032aa2 100644 --- a/src/lumigo_tracer/utils.py +++ b/src/lumigo_tracer/utils.py @@ -61,7 +61,7 @@ class Configuration: timeout_timer: bool = True timeout_timer_buffer: float = TIMEOUT_TIMER_BUFFER send_only_if_error: bool = False - domains_scrubber: Optional[List[str]] = None + domains_scrubber: Optional[List] = None def config( @@ -111,17 +111,19 @@ def config( Configuration.timeout_timer_buffer = TIMEOUT_TIMER_BUFFER Configuration.send_only_if_error = os.environ.get("SEND_ONLY_IF_ERROR", "").lower() == "true" if domains_scrubber: - Configuration.domains_scrubber = domains_scrubber + domains_scrubber_regex = domains_scrubber elif "LUMIGO_DOMAINS_SCRUBBER" in os.environ: try: - Configuration.domains_scrubber = json.loads(os.environ["LUMIGO_DOMAIN_SCRUBBER"]) + domains_scrubber_regex = json.loads(os.environ["LUMIGO_DOMAIN_SCRUBBER"]) except Exception: get_logger().critical( "Could not parse the specified domains scrubber, shutting down the reporter." ) Configuration.should_report = False + domains_scrubber_regex = [] else: - Configuration.domains_scrubber = DOMAIN_SCRUBBER_REGEXES + domains_scrubber_regex = DOMAIN_SCRUBBER_REGEXES + Configuration.domains_scrubber = [re.compile(r, re.IGNORECASE) for r in domains_scrubber_regex] def _is_span_has_error(span: dict) -> bool: diff --git a/src/test/unit/parsers/test_parser.py b/src/test/unit/parsers/test_parser.py index fc1af032..3ee1d2d2 100644 --- a/src/test/unit/parsers/test_parser.py +++ b/src/test/unit/parsers/test_parser.py @@ -1,11 +1,9 @@ from lumigo_tracer.parsers.parser import ServerlessAWSParser, Parser, get_parser, ApiGatewayV2Parser -import http.client def test_serverless_aws_parser_fallback_doesnt_change(): url = "https://kvpuorrsqb.execute-api.us-west-2.amazonaws.com" - headers = http.client.HTTPMessage() - headers.add_header("nothing", "relevant") + headers = {"nothing": "relevant"} serverless_parser = ServerlessAWSParser().parse_response(url, 200, headers=headers, body=b"") root_parser = Parser().parse_response(url, 200, headers=headers, body=b"") serverless_parser.pop("ended") @@ -15,21 +13,18 @@ def test_serverless_aws_parser_fallback_doesnt_change(): def test_get_parser_check_headers(): url = "api.rti.dev.toyota.com" - headers = http.client.HTTPMessage() - headers.add_header("x-amzn-requestid", "1234") + headers = {"x-amzn-requestid": "1234"} assert get_parser(url, headers) == ServerlessAWSParser def test_get_parser_apigw(): url = "https://ne3kjv28fh.execute-api.us-west-2.amazonaws.com/doriaviram" - headers = http.client.HTTPMessage() - assert get_parser(url, headers) == ApiGatewayV2Parser + assert get_parser(url, {}) == ApiGatewayV2Parser def test_apigw_parse_response(): parser = ApiGatewayV2Parser() - headers = http.client.HTTPMessage() - headers.add_header("Apigw-Requestid", "LY_66j0dPHcESCg=") + headers = {"apigw-requestid": "LY_66j0dPHcESCg="} result = parser.parse_response("dummy", 200, headers, body=b"") @@ -38,7 +33,7 @@ def test_apigw_parse_response(): "httpInfo": { "host": "dummy", "response": { - "headers": '{"Apigw-Requestid": "LY_66j0dPHcESCg="}', + "headers": '{"apigw-requestid": "LY_66j0dPHcESCg="}', "body": "", "statusCode": 200, }, @@ -48,18 +43,19 @@ def test_apigw_parse_response(): def test_apigw_parse_response_with_aws_request_id(): parser = ApiGatewayV2Parser() - headers = http.client.HTTPMessage() - headers.add_header("Apigw-Requestid", "LY_66j0dPHcESCg=") - headers.add_header("x-amzn-RequestId", "x-amzn-RequestId_LY_66j0dPHcESCg=") + headers = { + "apigw-requestid": "LY_66j0dPHcESCg=", + "x-amzn-requestid": "x-amzn-requestid_LY_66j0dPHcESCg=", + } result = parser.parse_response("dummy", 200, headers, body=b"") assert result["info"] == { - "messageId": "x-amzn-RequestId_LY_66j0dPHcESCg=", + "messageId": "x-amzn-requestid_LY_66j0dPHcESCg=", "httpInfo": { "host": "dummy", "response": { - "headers": '{"Apigw-Requestid": "LY_66j0dPHcESCg=", "x-amzn-RequestId": "x-amzn-RequestId_LY_66j0dPHcESCg="}', + "headers": '{"apigw-requestid": "LY_66j0dPHcESCg=", "x-amzn-requestid": "x-amzn-requestid_LY_66j0dPHcESCg="}', "body": "", "statusCode": 200, }, diff --git a/src/test/unit/parsers/test_utils.py b/src/test/unit/parsers/test_utils.py index 5206f06e..17337106 100644 --- a/src/test/unit/parsers/test_utils.py +++ b/src/test/unit/parsers/test_utils.py @@ -332,7 +332,7 @@ def test_safe_get(d, keys, result_value, default): [(["secret.*"], "lumigo.io", False), (["not-relevant", "secret.*"], "secret.aws.com", True)], ) def test_should_scrub_domain(regexes, url, expected): - Configuration.domains_scrubber = regexes + config(domains_scrubber=regexes) assert should_scrub_domain(url) == expected diff --git a/src/test/unit/sync_http/test_sync_hook.py b/src/test/unit/sync_http/test_sync_hook.py index 86334550..7ec9dc3e 100644 --- a/src/test/unit/sync_http/test_sync_hook.py +++ b/src/test/unit/sync_http/test_sync_hook.py @@ -95,7 +95,7 @@ def lambda_test_function(): assert "started" in http_spans[0] assert http_spans[0]["started"] > SpansContainer.get_span().function_span["started"] assert "ended" in http_spans[0] - assert "Content-Length" in http_spans[0]["info"]["httpInfo"]["request"]["headers"] + assert "content-length" in http_spans[0]["info"]["httpInfo"]["request"]["headers"] def test_lambda_wrapper_query_with_http_params(): @@ -141,7 +141,7 @@ def lambda_test_function(): http_spans = SpansContainer.get_span().http_spans assert http_spans assert http_spans[0]["info"]["httpInfo"]["request"]["body"] == "123456" - assert "Content-Length" in http_spans[0]["info"]["httpInfo"]["request"]["headers"] + assert "content-length" in http_spans[0]["info"]["httpInfo"]["request"]["headers"] def test_lambda_wrapper_no_headers(): @@ -278,7 +278,7 @@ def lambda_test_function(): assert lambda_test_function() == 1 http_events = SpansContainer.get_span().http_spans assert any( - '"Content-Type": "application/json"' + '"content-type": "application/json"' in event.get("info", {}).get("httpInfo", {}).get("request", {}).get("headers", "") for event in http_events ) @@ -548,6 +548,21 @@ def lambda_test_function(event, context): assert function_span["error"]["message"] == expected_message +def test_correct_headers_of_send_after_request(): + @lumigo_tracer() + def lambda_test_function(event, context): + d = {"a": "b", "myPassword": "123"} + conn = http.client.HTTPConnection("www.google.com") + conn.request("POST", "/", json.dumps(d), headers={"a": "b"}) + conn.send(b"GET\r\nc: d\r\n\r\nbody") + return {"lumigo": "rulz"} + + lambda_test_function({"key": "24"}, None) + spans = SpansContainer.get_span().http_spans + assert spans[0]["info"]["httpInfo"]["request"]["headers"] == json.dumps({"a": "b"}) + assert spans[1]["info"]["httpInfo"]["request"]["headers"] == json.dumps({"c": "d"}) + + def set_header_key(monkeypatch, header: str): monkeypatch.setattr(auto_tag_event, "AUTO_TAG_API_GW_HEADERS", [header]) From 8c9d8571641d757b936623a3364b3e3fe1077dfc Mon Sep 17 00:00:00 2001 From: saartochner Date: Mon, 6 Jul 2020 19:18:35 +0300 Subject: [PATCH 2/6] faster http hooks --- src/lumigo_tracer/spans_container.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lumigo_tracer/spans_container.py b/src/lumigo_tracer/spans_container.py index 094e9eec..25849704 100644 --- a/src/lumigo_tracer/spans_container.py +++ b/src/lumigo_tracer/spans_container.py @@ -182,6 +182,7 @@ def update_event_response( else: self.previous_response_body = b"" + headers = {k.lower(): v for k, v in headers.items()} parser = get_parser(host, headers)() # type: ignore if len(self.previous_response_body) < MAX_ENTRY_SIZE: self.previous_response_body += body From 70ce8cda63de5137c4828015489902fac2848c1e Mon Sep 17 00:00:00 2001 From: saartochner Date: Mon, 6 Jul 2020 19:22:55 +0300 Subject: [PATCH 3/6] faster http hooks --- src/lumigo_tracer/spans_container.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lumigo_tracer/spans_container.py b/src/lumigo_tracer/spans_container.py index 25849704..60add9cf 100644 --- a/src/lumigo_tracer/spans_container.py +++ b/src/lumigo_tracer/spans_container.py @@ -182,7 +182,7 @@ def update_event_response( else: self.previous_response_body = b"" - headers = {k.lower(): v for k, v in headers.items()} + headers = {k.lower(): v for k, v in headers.items()} if headers else {} parser = get_parser(host, headers)() # type: ignore if len(self.previous_response_body) < MAX_ENTRY_SIZE: self.previous_response_body += body From 60660d7ae8c0bc78eb180f4ffd7c13cef700ab94 Mon Sep 17 00:00:00 2001 From: saartochner Date: Tue, 7 Jul 2020 10:58:16 +0300 Subject: [PATCH 4/6] faster http hooks --- src/lumigo_tracer/sync_http/sync_hook.py | 3 ++- src/lumigo_tracer/utils.py | 4 ++++ src/test/unit/sync_http/test_sync_hook.py | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/lumigo_tracer/sync_http/sync_hook.py b/src/lumigo_tracer/sync_http/sync_hook.py index f36cf728..12636afd 100644 --- a/src/lumigo_tracer/sync_http/sync_hook.py +++ b/src/lumigo_tracer/sync_http/sync_hook.py @@ -17,6 +17,7 @@ get_logger, lumigo_safe_execute, is_aws_environment, + ensure_str, ) from lumigo_tracer.spans_container import SpansContainer, TimeoutMechanism from lumigo_tracer.parsers.http_data_classes import HttpRequest @@ -55,7 +56,7 @@ def _request_wrapper(func, instance, args, kwargs): hooked_headers = getattr(instance, LUMIGO_HEADERS_HOOK_KEY, None) if hooked_headers: # we will get here only if _headers_reminder_wrapper ran first. remove its traces. - headers = dict(hooked_headers.items()) + headers = {ensure_str(k): ensure_str(v) for k, v in hooked_headers.items()} setattr(instance, LUMIGO_HEADERS_HOOK_KEY, None) elif _FLAGS_HEADER_SPLITTER in headers: request_info, headers = headers.split(_FLAGS_HEADER_SPLITTER, 1) diff --git a/src/lumigo_tracer/utils.py b/src/lumigo_tracer/utils.py index 3d032aa2..0dc34364 100644 --- a/src/lumigo_tracer/utils.py +++ b/src/lumigo_tracer/utils.py @@ -237,6 +237,10 @@ def is_aws_environment(): return bool(os.environ.get("LAMBDA_RUNTIME_DIR")) +def ensure_str(s: Union[str, bytes]): + return s if isinstance(s, str) else s.decode() + + def format_frames(frames_infos: List[inspect.FrameInfo]) -> List[dict]: free_space = MAX_VARS_SIZE frames: List[dict] = [] diff --git a/src/test/unit/sync_http/test_sync_hook.py b/src/test/unit/sync_http/test_sync_hook.py index 7ec9dc3e..9c40ba0c 100644 --- a/src/test/unit/sync_http/test_sync_hook.py +++ b/src/test/unit/sync_http/test_sync_hook.py @@ -553,7 +553,7 @@ def test_correct_headers_of_send_after_request(): def lambda_test_function(event, context): d = {"a": "b", "myPassword": "123"} conn = http.client.HTTPConnection("www.google.com") - conn.request("POST", "/", json.dumps(d), headers={"a": "b"}) + conn.request("POST", "/", json.dumps(d), headers={"a": b"b"}) conn.send(b"GET\r\nc: d\r\n\r\nbody") return {"lumigo": "rulz"} From 4da506e6f989a22cab04bdc39a08a478eb76f61a Mon Sep 17 00:00:00 2001 From: saartochner Date: Tue, 7 Jul 2020 10:58:35 +0300 Subject: [PATCH 5/6] faster http hooks --- src/lumigo_tracer/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lumigo_tracer/utils.py b/src/lumigo_tracer/utils.py index 0dc34364..0b272ef2 100644 --- a/src/lumigo_tracer/utils.py +++ b/src/lumigo_tracer/utils.py @@ -237,7 +237,7 @@ def is_aws_environment(): return bool(os.environ.get("LAMBDA_RUNTIME_DIR")) -def ensure_str(s: Union[str, bytes]): +def ensure_str(s: Union[str, bytes]) -> str: return s if isinstance(s, str) else s.decode() From 3675dc214c5555a7bda7621266ebb9583787ec81 Mon Sep 17 00:00:00 2001 From: saartochner Date: Tue, 7 Jul 2020 11:51:16 +0300 Subject: [PATCH 6/6] faster http hooks --- src/lumigo_tracer/spans_container.py | 12 ++++++------ src/lumigo_tracer/utils.py | 26 +++++++++++++++++--------- src/test/conftest.py | 2 +- src/test/unit/test_main_utils.py | 13 +++++++++++-- src/test/unit/test_spans_container.py | 4 +--- 5 files changed, 36 insertions(+), 21 deletions(-) diff --git a/src/lumigo_tracer/spans_container.py b/src/lumigo_tracer/spans_container.py index 60add9cf..2002cbda 100644 --- a/src/lumigo_tracer/spans_container.py +++ b/src/lumigo_tracer/spans_container.py @@ -16,6 +16,7 @@ omit_keys, EXECUTION_TAGS_KEY, MAX_ENTRY_SIZE, + get_timeout_buffer, ) from lumigo_tracer import utils from lumigo_tracer.parsers.parser import get_parser, HTTP_TYPE, StepFunctionParser @@ -71,8 +72,6 @@ def __init__( "region": region, "parentId": request_id, "info": {"tracer": {"version": version}, "traceId": {"Root": trace_root}}, - "event": event, - "envs": envs, "token": Configuration.token, } self.function_span = recursive_json_join( @@ -81,6 +80,8 @@ def __init__( "type": "function", "name": name, "runtime": runtime, + "event": event, + "envs": envs, "memoryAllocated": memory_allocated, "readiness": "cold" if SpansContainer.is_cold else "warm", "info": { @@ -123,12 +124,11 @@ def start_timeout_timer(self, context=None) -> None: get_logger().info("Skip setting timeout timer - Could not get the remaining time.") return remaining_time = context.get_remaining_time_in_millis() / 1000 - if Configuration.timeout_timer_buffer >= remaining_time: + buffer = get_timeout_buffer(remaining_time) + if buffer >= remaining_time or remaining_time < 2: get_logger().debug("Skip setting timeout timer - Too short timeout.") return - TimeoutMechanism.start( - remaining_time - Configuration.timeout_timer_buffer, self.handle_timeout - ) + TimeoutMechanism.start(remaining_time - buffer, self.handle_timeout) def add_request_event(self, parse_params: HttpRequest): """ diff --git a/src/lumigo_tracer/utils.py b/src/lumigo_tracer/utils.py index 0b272ef2..82d6ac17 100644 --- a/src/lumigo_tracer/utils.py +++ b/src/lumigo_tracer/utils.py @@ -23,7 +23,7 @@ MAX_SIZE_FOR_REQUEST: int = int(os.environ.get("LUMIGO_MAX_SIZE_FOR_REQUEST", 900_000)) MAX_VARS_SIZE = 100_000 MAX_VAR_LEN = 200 -MAX_ENTRY_SIZE = 1024 +MAX_ENTRY_SIZE = 2048 FrameVariables = Dict[str, str] OMITTING_KEYS_REGEXES = [ ".*pass.*", @@ -45,7 +45,6 @@ LUMIGO_SECRET_MASKING_REGEX_BACKWARD_COMP = "LUMIGO_BLACKLIST_REGEX" LUMIGO_SECRET_MASKING_REGEX = "LUMIGO_SECRET_MASKING_REGEX" WARN_CLIENT_PREFIX = "Lumigo Warning" -TIMEOUT_TIMER_BUFFER = 0.7 NUMBER_OF_SPANS_IN_REPORT_OPTIMIZATION = 200 _logger: Union[logging.Logger, None] = None @@ -59,7 +58,7 @@ class Configuration: enhanced_print: bool = False is_step_function: bool = False timeout_timer: bool = True - timeout_timer_buffer: float = TIMEOUT_TIMER_BUFFER + timeout_timer_buffer: Optional[float] = None send_only_if_error: bool = False domains_scrubber: Optional[List] = None @@ -72,7 +71,7 @@ def config( enhance_print: bool = False, step_function: bool = False, timeout_timer: bool = True, - timeout_timer_buffer: float = None, + timeout_timer_buffer: Optional[float] = None, domains_scrubber: Optional[List[str]] = None, ) -> None: """ @@ -85,7 +84,8 @@ def config( :param enhance_print: Should we add prefix to the print (so the logs will be in the platform). :param step_function: Is this function is a part of a step function? :param timeout_timer: Should we start a timer to send the traced data before timeout acceded. - :param timeout_timer_buffer: The buffer that we take before reaching timeout to send the traces to lumigo (seconds). + :param timeout_timer_buffer: The buffer (seconds) that we take before reaching timeout to send the traces to lumigo. + The default is 10% of the duration of the lambda (with upper and lower bounds of 0.5 and 3 seconds). :param domains_scrubber: List of regexes. We will not collect data of requests with hosts that match it. """ if should_report is not None: @@ -103,12 +103,13 @@ def config( ) Configuration.timeout_timer = timeout_timer try: - Configuration.timeout_timer_buffer = float( # type: ignore - timeout_timer_buffer or os.environ.get("LUMIGO_TIMEOUT_BUFFER", TIMEOUT_TIMER_BUFFER) - ) + if "LUMIGO_TIMEOUT_BUFFER" in os.environ: + Configuration.timeout_timer_buffer = float(os.environ["LUMIGO_TIMEOUT_BUFFER"]) + else: + Configuration.timeout_timer_buffer = timeout_timer_buffer except Exception: warn_client("Could not configure LUMIGO_TIMEOUT_BUFFER. Using default value.") - Configuration.timeout_timer_buffer = TIMEOUT_TIMER_BUFFER + Configuration.timeout_timer_buffer = None Configuration.send_only_if_error = os.environ.get("SEND_ONLY_IF_ERROR", "").lower() == "true" if domains_scrubber: domains_scrubber_regex = domains_scrubber @@ -375,3 +376,10 @@ def is_api_gw_event(event: dict) -> bool: and event.get("requestContext", {}).get("domainName") # noqa and event.get("requestContext", {}).get("requestId") # noqa ) + + +def get_timeout_buffer(remaining_time: float): + buffer = Configuration.timeout_timer_buffer + if not buffer: + buffer = max(0.5, min(0.1 * remaining_time, 3)) + return buffer diff --git a/src/test/conftest.py b/src/test/conftest.py index f6f79947..0bb62711 100644 --- a/src/test/conftest.py +++ b/src/test/conftest.py @@ -71,4 +71,4 @@ def capture_all_logs(caplog): @pytest.fixture def context(): - return mock.Mock(get_remaining_time_in_millis=lambda: utils.TIMEOUT_TIMER_BUFFER * 1000 * 2) + return mock.Mock(get_remaining_time_in_millis=lambda: 1000 * 2) diff --git a/src/test/unit/test_main_utils.py b/src/test/unit/test_main_utils.py index c095e0cb..6f97239d 100644 --- a/src/test/unit/test_main_utils.py +++ b/src/test/unit/test_main_utils.py @@ -22,7 +22,7 @@ warn_client, WARN_CLIENT_PREFIX, SKIP_SCRUBBING_KEYS, - TIMEOUT_TIMER_BUFFER, + get_timeout_buffer, ) import json @@ -319,7 +319,7 @@ def test_config_enhanced_printstep_function_without_envs(monkeypatch, configurat def test_config_timeout_timer_buffer_with_exception(monkeypatch): monkeypatch.setenv("LUMIGO_TIMEOUT_BUFFER", "not float") config() - assert Configuration.timeout_timer_buffer == TIMEOUT_TIMER_BUFFER + assert Configuration.timeout_timer_buffer is None def test_warn_client_print(capsys): @@ -331,3 +331,12 @@ def test_warn_client_dont_print(capsys, monkeypatch): monkeypatch.setenv("LUMIGO_WARNINGS", "off") warn_client("message") assert capsys.readouterr().out == "" + + +@pytest.mark.parametrize( + "remaining_time, conf, expected", + ((3, 1, 1), (3, None, 0.5), (10, None, 1), (20, None, 2), (900, None, 3)), +) +def test_get_timeout_buffer(remaining_time, conf, expected): + Configuration.timeout_timer_buffer = conf + assert get_timeout_buffer(remaining_time) == expected diff --git a/src/test/unit/test_spans_container.py b/src/test/unit/test_spans_container.py index f6d7ea28..3cf49a2c 100644 --- a/src/test/unit/test_spans_container.py +++ b/src/test/unit/test_spans_container.py @@ -93,9 +93,7 @@ def test_timeout_mechanism_disabled_by_configuration(monkeypatch, context): def test_timeout_mechanism_too_short_time(monkeypatch, context): monkeypatch.setattr(Configuration, "timeout_timer", True) - monkeypatch.setattr( - context, "get_remaining_time_in_millis", lambda: utils.TIMEOUT_TIMER_BUFFER / 2 - ) + monkeypatch.setattr(context, "get_remaining_time_in_millis", lambda: 1000) SpansContainer.create_span() SpansContainer.get_span().start(context=context)