Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/lumigo_tracer/parsers/http_data_classes.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
from http import client
from copy import deepcopy
from typing import Optional


class HttpRequest:
host: str
method: str
uri: str
headers: Optional[client.HTTPMessage]
headers: dict
body: bytes

def __init__(self, **kwargs):
self.host = kwargs["host"]
self.method = kwargs["method"]
self.uri = kwargs["uri"]
self.headers = kwargs.get("headers")
self.headers = {k.lower(): v for k, v in (kwargs.get("headers") or {}).items()}
Copy link
Contributor Author

@saartochner-lumigo saartochner-lumigo Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lowering everything once (instead of the underline __getter__ function) ~6s

self.body = kwargs.get("body")

def clone(self, **kwargs):
Expand Down
29 changes: 11 additions & 18 deletions src/lumigo_tracer/parsers/parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import uuid
from typing import Type, Optional
import time
import http.client

from lumigo_tracer.parsers.utils import (
safe_split_get,
Expand Down Expand Up @@ -35,9 +34,7 @@ class Parser:
def parse_request(self, parse_params: HttpRequest) -> dict:
if Configuration.verbose and parse_params and not should_scrub_domain(parse_params.host):
additional_info = {
"headers": prepare_large_data(
dict(parse_params.headers.items() if parse_params.headers else {})
),
"headers": prepare_large_data(parse_params.headers),
"body": prepare_large_data(parse_params.body),
"method": parse_params.method,
"uri": parse_params.uri,
Expand All @@ -60,12 +57,10 @@ def parse_request(self, parse_params: HttpRequest) -> dict:
"started": int(time.time() * 1000),
}

def parse_response(
self, url: str, status_code: int, headers: Optional[http.client.HTTPMessage], body: bytes
) -> dict:
def parse_response(self, url: str, status_code: int, headers: dict, body: bytes) -> dict:
if Configuration.verbose and not should_scrub_domain(url):
additional_info = {
"headers": prepare_large_data(dict(headers.items() if headers else {})),
"headers": prepare_large_data(headers),
"body": prepare_large_data(body),
"statusCode": status_code,
}
Expand All @@ -85,7 +80,7 @@ class ServerlessAWSParser(Parser):

def parse_response(self, url: str, status_code: int, headers, body: bytes) -> dict:
additional_info = {}
message_id = headers.get("x-amzn-RequestId")
message_id = headers.get("x-amzn-requestid")
if message_id and self.should_add_message_id:
additional_info["info"] = {"messageId": message_id}
span_id = headers.get("x-amzn-requestid") or headers.get("x-amz-requestid")
Expand All @@ -100,7 +95,7 @@ class DynamoParser(ServerlessAWSParser):
should_add_message_id = False

def parse_request(self, parse_params: HttpRequest) -> dict:
target: str = str(parse_params.headers.get("x-amz-target", "")) # type: ignore
target: str = parse_params.headers.get("x-amz-target", "")
return recursive_json_join(
{
"info": {
Expand Down Expand Up @@ -139,10 +134,8 @@ class LambdaParser(ServerlessAWSParser):
def parse_request(self, parse_params: HttpRequest) -> dict:
return recursive_json_join(
{
"name": safe_split_get(
str(parse_params.headers.get("path", "")), "/", 3 # type: ignore
),
"invocationType": parse_params.headers.get("x-amz-invocation-type"), # type: ignore
"name": safe_split_get(str(parse_params.headers.get("path", "")), "/", 3),
"invocationType": parse_params.headers.get("x-amz-invocation-type"),
},
super().parse_request(parse_params),
)
Expand Down Expand Up @@ -223,16 +216,16 @@ class ApiGatewayV2Parser(ServerlessAWSParser):
# API-GW V1 covered by ServerlessAWSParser

def parse_response(self, url: str, status_code: int, headers, body: bytes) -> dict:
aws_request_id = headers.get("x-amzn-RequestId")
apigw_request_id = headers.get("Apigw-Requestid")
aws_request_id = headers.get("x-amzn-requestid")
apigw_request_id = headers.get("apigw-requestid")
message_id = aws_request_id or apigw_request_id
return recursive_json_join(
{"info": {"messageId": message_id}},
super().parse_response(url, status_code, headers, body),
)


def get_parser(url: str, headers: Optional[http.client.HTTPMessage] = None) -> Type[Parser]:
def get_parser(url: str, headers: Optional[dict] = None) -> Type[Parser]:
service = safe_split_get(url, ".", 0)
if service == "dynamodb":
return DynamoParser
Expand All @@ -249,6 +242,6 @@ def get_parser(url: str, headers: Optional[http.client.HTTPMessage] = None) -> T
return SqsParser
elif "execute-api" in url:
return ApiGatewayV2Parser
elif url.endswith("amazonaws.com") or (headers and headers.get("x-amzn-RequestId")):
elif url.endswith("amazonaws.com") or (headers and headers.get("x-amzn-requestid")):
return ServerlessAWSParser
return Parser
6 changes: 4 additions & 2 deletions src/lumigo_tracer/parsers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ def recursive_json_join(d1: dict, d2: dict):
* if key in d2 and is not dictionary, then the value is d2[key]
* otherwise, join d1[key] and d2[key]
"""
if d1 is None or d2 is None:
return d1 or d2
d = {}
for key in itertools.chain(d1.keys(), d2.keys()):
for key in set(itertools.chain(d1.keys(), d2.keys())):
value = d1.get(key, d2.get(key))
if isinstance(value, dict):
d[key] = recursive_json_join(d1.get(key, {}), d2.get(key, {}))
Expand Down Expand Up @@ -293,7 +295,7 @@ def _parse_streams(event: dict) -> Dict[str, str]:
def should_scrub_domain(url: str) -> bool:
if url and Configuration.domains_scrubber:
for regex in Configuration.domains_scrubber:
if re.match(regex, url, re.IGNORECASE):
Copy link
Contributor Author

@saartochner-lumigo saartochner-lumigo Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compile regexes once (~3s)

if regex.match(url):
return True
return False

Expand Down
18 changes: 9 additions & 9 deletions src/lumigo_tracer/spans_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import uuid
import signal
import traceback
import http.client
from typing import List, Dict, Tuple, Optional, Callable, Set

from lumigo_tracer.parsers.event_parser import EventParser
Expand All @@ -17,6 +16,7 @@
omit_keys,
EXECUTION_TAGS_KEY,
MAX_ENTRY_SIZE,
get_timeout_buffer,
)
from lumigo_tracer import utils
from lumigo_tracer.parsers.parser import get_parser, HTTP_TYPE, StepFunctionParser
Expand Down Expand Up @@ -72,8 +72,6 @@ def __init__(
"region": region,
"parentId": request_id,
"info": {"tracer": {"version": version}, "traceId": {"Root": trace_root}},
"event": event,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send the event and envs only on functions spans

"envs": envs,
"token": Configuration.token,
}
self.function_span = recursive_json_join(
Expand All @@ -82,6 +80,8 @@ def __init__(
"type": "function",
"name": name,
"runtime": runtime,
"event": event,
"envs": envs,
"memoryAllocated": memory_allocated,
"readiness": "cold" if SpansContainer.is_cold else "warm",
"info": {
Expand All @@ -93,7 +93,7 @@ def __init__(
},
self.base_msg,
)
self.previous_request: Tuple[Optional[http.client.HTTPMessage], bytes] = (None, b"")
self.previous_request: Tuple[Optional[dict], bytes] = (None, b"")
self.previous_response_body: bytes = b""
self.http_span_ids_to_send: Set[str] = set()
self.http_spans: List[Dict] = []
Expand Down Expand Up @@ -124,12 +124,11 @@ def start_timeout_timer(self, context=None) -> None:
get_logger().info("Skip setting timeout timer - Could not get the remaining time.")
return
remaining_time = context.get_remaining_time_in_millis() / 1000
if Configuration.timeout_timer_buffer >= remaining_time:
buffer = get_timeout_buffer(remaining_time)
if buffer >= remaining_time or remaining_time < 2:
get_logger().debug("Skip setting timeout timer - Too short timeout.")
return
TimeoutMechanism.start(
remaining_time - Configuration.timeout_timer_buffer, self.handle_timeout
)
TimeoutMechanism.start(remaining_time - buffer, self.handle_timeout)

def add_request_event(self, parse_params: HttpRequest):
"""
Expand Down Expand Up @@ -169,7 +168,7 @@ def update_event_end_time(self) -> None:
self.http_spans[-1]["ended"] = int(time.time() * 1000)

def update_event_response(
self, host: Optional[str], status_code: int, headers: http.client.HTTPMessage, body: bytes
self, host: Optional[str], status_code: int, headers: dict, body: bytes
) -> None:
"""
:param host: If None, use the host from the last span, otherwise this is the first chuck and we can empty
Expand All @@ -183,6 +182,7 @@ def update_event_response(
else:
self.previous_response_body = b""

headers = {k.lower(): v for k, v in headers.items()} if headers else {}
parser = get_parser(host, headers)() # type: ignore
if len(self.previous_response_body) < MAX_ENTRY_SIZE:
self.previous_response_body += body
Expand Down
29 changes: 25 additions & 4 deletions src/lumigo_tracer/sync_http/sync_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
get_logger,
lumigo_safe_execute,
is_aws_environment,
ensure_str,
)
from lumigo_tracer.spans_container import SpansContainer, TimeoutMechanism
from lumigo_tracer.parsers.http_data_classes import HttpRequest
Expand All @@ -27,6 +28,7 @@
CONTEXT_WRAPPED_BY_LUMIGO_KEY = "_wrapped_by_lumigo"
MAX_READ_SIZE = 1024
already_wrapped = False
LUMIGO_HEADERS_HOOK_KEY = "_lumigo_headers_hook"


def _request_wrapper(func, instance, args, kwargs):
Expand All @@ -51,7 +53,12 @@ def _request_wrapper(func, instance, args, kwargs):
with lumigo_safe_execute("parse request"):
if isinstance(data, bytes) and _BODY_HEADER_SPLITTER in data:
headers, body = data.split(_BODY_HEADER_SPLITTER, 1)
if _FLAGS_HEADER_SPLITTER in headers:
hooked_headers = getattr(instance, LUMIGO_HEADERS_HOOK_KEY, None)
if hooked_headers:
Copy link
Contributor Author

@saartochner-lumigo saartochner-lumigo Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of parse the headers from the bytes - use the hook (~20s)

# we will get here only if _headers_reminder_wrapper ran first. remove its traces.
headers = {ensure_str(k): ensure_str(v) for k, v in hooked_headers.items()}
setattr(instance, LUMIGO_HEADERS_HOOK_KEY, None)
elif _FLAGS_HEADER_SPLITTER in headers:
request_info, headers = headers.split(_FLAGS_HEADER_SPLITTER, 1)
headers = http.client.parse_headers(BytesIO(headers))
path_and_query_params = (
Expand All @@ -63,6 +70,8 @@ def _request_wrapper(func, instance, args, kwargs):
)
uri = f"{host}{path_and_query_params}"
host = host or headers.get("Host")
else:
headers = None

with lumigo_safe_execute("add request event"):
if headers:
Expand All @@ -80,14 +89,23 @@ def _request_wrapper(func, instance, args, kwargs):
return ret_val


def _headers_reminder_wrapper(func, instance, args, kwargs):
"""
This is the wrapper of the function `http.client.HTTPConnection.request` that gets the headers.
Remember the headers helps us to improve performances on requests that use this flow.
"""
setattr(instance, LUMIGO_HEADERS_HOOK_KEY, kwargs.get("headers"))
return func(*args, **kwargs)


def _response_wrapper(func, instance, args, kwargs):
"""
This is the wrapper of the function that can be called only after that the http request was sent.
Note that we don't examine the response data because it may change the original behaviour (ret_val.peek()).
"""
ret_val = func(*args, **kwargs)
with lumigo_safe_execute("parse response"):
headers = ret_val.headers
headers = dict(ret_val.headers.items())
status_code = ret_val.code
SpansContainer.get_span().update_event_response(instance.host, status_code, headers, b"")
return ret_val
Expand All @@ -101,7 +119,7 @@ def _read_wrapper(func, instance, args, kwargs):
if ret_val:
with lumigo_safe_execute("parse response.read"):
SpansContainer.get_span().update_event_response(
None, instance.code, instance.headers, ret_val
None, instance.code, dict(instance.headers.items()), ret_val
)
return ret_val

Expand All @@ -115,7 +133,7 @@ def _read_stream_wrapper_generator(stream_generator, instance):
for partial_response in stream_generator:
with lumigo_safe_execute("parse response.read_chunked"):
SpansContainer.get_span().update_event_response(
None, instance.status, instance.headers, partial_response
None, instance.status, dict(instance.headers.items()), partial_response
)
yield partial_response

Expand Down Expand Up @@ -281,6 +299,9 @@ def wrap_http_calls():
with lumigo_safe_execute("wrap http calls"):
get_logger().debug("wrapping the http request")
wrap_function_wrapper("http.client", "HTTPConnection.send", _request_wrapper)
wrap_function_wrapper(
"http.client", "HTTPConnection.request", _headers_reminder_wrapper
)
wrap_function_wrapper("botocore.awsrequest", "AWSRequest.__init__", _putheader_wrapper)
wrap_function_wrapper("http.client", "HTTPConnection.getresponse", _response_wrapper)
wrap_function_wrapper("http.client", "HTTPResponse.read", _read_wrapper)
Expand Down
Loading