Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
60bc2ec
retry in case of error with edge
saartochner-lumigo Oct 4, 2020
e471345
split event and parsers
saartochner-lumigo Oct 4, 2020
151c500
Merge branch 'master' of github.com:lumigo-io/python_tracer into RD-3…
saartochner-lumigo Oct 5, 2020
8383ea1
split sync_hook to tracer and http hooks
saartochner-lumigo Oct 5, 2020
f56b826
refactor http hooks to be a wrapper
saartochner-lumigo Oct 5, 2020
613ef65
refactor http hooks to be a wrapper
saartochner-lumigo Oct 5, 2020
077f1be
refactor http hooks to be a wrapper
saartochner-lumigo Oct 5, 2020
1c0d3ec
increase coverage
saartochner-lumigo Oct 5, 2020
1764e73
refactor wrap once
saartochner-lumigo Oct 6, 2020
838e077
support pymongo
saartochner-lumigo Oct 8, 2020
feb6abd
Merge branch 'master' of github.com:lumigo-io/python_tracer into RD-3…
saartochner-lumigo Oct 11, 2020
846712f
Merge branch 'RD-3761-pymongo' of github.com:lumigo-io/python_tracer …
saartochner-lumigo Oct 11, 2020
fd8a27d
wip
saartochner-lumigo Oct 11, 2020
8f1114c
Merge branch 'RD-3761-pymongo' of github.com:lumigo-io/python_tracer …
saartochner-lumigo Oct 11, 2020
5a9c402
Merge branch 'master' into RD-3761-wrap-pymongo
saartochner-lumigo Oct 11, 2020
c5caf09
fix Dori's CR
saartochner-lumigo Oct 12, 2020
df61e65
fix Dori's CR
saartochner-lumigo Oct 12, 2020
865fcee
Merge branch 'RD-3761-wrap-pymongo' into RD-3690-redis-sqlalchemy-ela…
saartochner-lumigo Oct 12, 2020
2e83944
wip
saartochner-lumigo Oct 12, 2020
4d0a2fa
Merge branch 'RD-3761-wrap-pymongo' into RD-3690-redis-sqlalchemy-ela…
saartochner-lumigo Oct 12, 2020
3f0bf5c
Merge branch 'master' into RD-3761-wrap-pymongo
saartochner-lumigo Oct 12, 2020
f06a1c3
support redis
saartochner-lumigo Oct 12, 2020
188bf41
wip
saartochner-lumigo Oct 12, 2020
2809c3a
Merge branch 'RD-3761-wrap-pymongo' of github.com:lumigo-io/python_tr…
saartochner-lumigo Oct 12, 2020
4520a2e
wip
saartochner-lumigo Oct 12, 2020
03713d8
times
saartochner-lumigo Oct 13, 2020
b738fc5
Merge branch 'RD-3761-wrap-pymongo' of github.com:lumigo-io/python_tr…
saartochner-lumigo Oct 13, 2020
f837d38
IT
saartochner-lumigo Oct 13, 2020
a0f8ab0
Merge branch 'master' of github.com:lumigo-io/python_tracer into RD-3…
saartochner-lumigo Oct 19, 2020
1a81a51
wip
saartochner-lumigo Oct 19, 2020
dc34b79
wip
saartochner-lumigo Oct 19, 2020
3511adc
fix Nirhod's CR
saartochner-lumigo Oct 20, 2020
009afa1
fix Nirhod's CR
saartochner-lumigo Oct 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pytest-cov==2.6.1
capturer==2.4
attrs==19.1.0
requests==2.24.0
pymongo==3.11.0
pymongo==3.11.0
redispy==3.0.0
11 changes: 9 additions & 2 deletions src/lumigo_tracer/lumigo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,13 @@ def is_aws_environment():
return bool(os.environ.get("AWS_LAMBDA_FUNCTION_VERSION"))


def get_current_ms_time() -> int:
"""
:return: the current time in milliseconds
"""
return int(time.time() * 1000)


def ensure_str(s: Union[str, bytes]) -> str:
return s if isinstance(s, str) else s.decode()

Expand Down Expand Up @@ -388,7 +395,7 @@ def create_step_function_span(message_id: str):
"messageId": message_id,
"httpInfo": {"host": "StepFunction", "request": {"method": "", "body": ""}},
},
"started": int(time.time() * 1000),
"started": get_current_ms_time(),
}


Expand Down Expand Up @@ -487,7 +494,7 @@ def aws_dump(d: Any, decimal_safe=False, **kwargs) -> str:


def lumigo_dumps(
d: Union[bytes, str, dict, OrderedDict, list],
d: Union[bytes, str, dict, OrderedDict, list, None],
max_size: Optional[int] = None,
regexes: Optional[Pattern[str]] = None,
enforce_jsonify: bool = False,
Expand Down
9 changes: 5 additions & 4 deletions src/lumigo_tracer/spans_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
get_logger,
_is_span_has_error,
create_step_function_span,
get_current_ms_time,
)
from lumigo_tracer import lumigo_utils
from lumigo_tracer.parsing_utils import parse_trace_id, safe_split_get, recursive_json_join
Expand Down Expand Up @@ -158,7 +159,7 @@ def update_event_end_time(self) -> None:
This function assumes synchronous execution - we update the last http event.
"""
if self.spans:
self.spans[-1]["ended"] = int(time.time() * 1000)
self.spans[-1]["ended"] = get_current_ms_time()

def update_event_times(
self, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None
Expand Down Expand Up @@ -215,7 +216,7 @@ def end(self, ret_val=None, event: Optional[dict] = None, context=None) -> Optio
TimeoutMechanism.stop()
reported_rtt = None
self.previous_request = None
self.function_span.update({"ended": int(time.time() * 1000)})
self.function_span.update({"ended": get_current_ms_time()})
if Configuration.is_step_function:
self.add_step_end_event(ret_val)
parsed_ret_val = None
Expand Down Expand Up @@ -287,7 +288,7 @@ def create_span(cls, event=None, context=None, force=False) -> "SpansContainer":
trace_root, transaction_id, suffix = parse_trace_id(os.environ.get("_X_AMZN_TRACE_ID", ""))
remaining_time = getattr(context, "get_remaining_time_in_millis", lambda: MAX_LAMBDA_TIME)()
cls._span = SpansContainer(
started=int(time.time() * 1000),
started=get_current_ms_time(),
name=os.environ.get("AWS_LAMBDA_FUNCTION_NAME"),
runtime=os.environ.get("AWS_EXECUTION_ENV"),
region=os.environ.get("AWS_REGION"),
Expand All @@ -300,7 +301,7 @@ def create_span(cls, event=None, context=None, force=False) -> "SpansContainer":
request_id=getattr(context, "aws_request_id", ""),
account=safe_split_get(getattr(context, "invoked_function_arn", ""), ":", 4, ""),
trigger_by=parse_triggered_by(event),
max_finish_time=int(time.time() * 1000) + remaining_time,
max_finish_time=get_current_ms_time() + remaining_time,
**additional_info,
)
return cls._span
Expand Down
6 changes: 5 additions & 1 deletion src/lumigo_tracer/wrappers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from .http.sync_http_wrappers import wrap_http_calls
from .pymongo.pymongo_wrapper import wrap_pymongo
from .redis.redis_wrapper import wrap_redis


already_wrapped = False


def wrap():
def wrap(force: bool = False):
global already_wrapped
if not already_wrapped:
# Never wrap http calls twice - it will create duplicate body
wrap_http_calls()
if force or not already_wrapped:
wrap_pymongo()
wrap_redis()
already_wrapped = True
13 changes: 9 additions & 4 deletions src/lumigo_tracer/wrappers/http/http_parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import uuid
from typing import Type, Optional
import time

from lumigo_tracer.parsing_utils import (
safe_split_get,
Expand All @@ -12,7 +11,13 @@
safe_get,
should_scrub_domain,
)
from lumigo_tracer.lumigo_utils import Configuration, lumigo_dumps, md5hash, get_logger
from lumigo_tracer.lumigo_utils import (
Configuration,
lumigo_dumps,
md5hash,
get_logger,
get_current_ms_time,
)
from lumigo_tracer.wrappers.http.http_data_classes import HttpRequest

HTTP_TYPE = "http"
Expand Down Expand Up @@ -55,7 +60,7 @@ def parse_request(self, parse_params: HttpRequest) -> dict:
"request": additional_info,
}
},
"started": int(time.time() * 1000),
"started": get_current_ms_time(),
}

def parse_response(self, url: str, status_code: int, headers: dict, body: bytes) -> dict:
Expand All @@ -71,7 +76,7 @@ def parse_response(self, url: str, status_code: int, headers: dict, body: bytes)
return {
"type": HTTP_TYPE,
"info": {"httpInfo": {"host": url, "response": additional_info}},
"ended": int(time.time() * 1000),
"ended": get_current_ms_time(),
}


Expand Down
2 changes: 1 addition & 1 deletion src/lumigo_tracer/wrappers/http/sync_http_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def _putheader_wrapper(func, instance, args, kwargs):

def wrap_http_calls():
with lumigo_safe_execute("wrap http calls"):
get_logger().debug("wrapping the http request")
get_logger().debug("wrapping http requests")
wrap_function_wrapper("http.client", "HTTPConnection.send", _http_send_wrapper)
wrap_function_wrapper("http.client", "HTTPConnection.request", _headers_reminder_wrapper)
if importlib.util.find_spec("botocore"):
Expand Down
11 changes: 7 additions & 4 deletions src/lumigo_tracer/wrappers/pymongo/pymongo_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Dict

import time
import uuid

from lumigo_tracer.lumigo_utils import lumigo_safe_execute, get_logger, lumigo_dumps
from lumigo_tracer.lumigo_utils import (
lumigo_safe_execute,
get_logger,
lumigo_dumps,
get_current_ms_time,
)
from lumigo_tracer.spans_container import SpansContainer

try:
Expand All @@ -27,7 +30,7 @@ def started(self, event):
{
"id": span_id,
"type": self.MONGO_SPAN,
"started": int(time.time() * 1000),
"started": get_current_ms_time(),
"databaseName": event.database_name,
"commandName": event.command_name,
"request": lumigo_dumps(event.command),
Expand Down
Empty file.
88 changes: 88 additions & 0 deletions src/lumigo_tracer/wrappers/redis/redis_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import importlib
from typing import Optional, Dict, List, Union
import uuid

from lumigo_tracer.libs.wrapt import wrap_function_wrapper
from lumigo_tracer.lumigo_utils import (
lumigo_safe_execute,
get_logger,
lumigo_dumps,
get_current_ms_time,
)
from lumigo_tracer.spans_container import SpansContainer


REDIS_SPAN = "redis"


def command_started(
command: str, request_args: Union[Dict, List[Dict]], connection_options: Optional[Dict]
):
span_id = str(uuid.uuid4())
host = (connection_options or {}).get("host")
port = (connection_options or {}).get("port")
SpansContainer.get_span().add_span(
{
"id": span_id,
"type": REDIS_SPAN,
"started": get_current_ms_time(),
"requestCommand": command,
"requestArgs": lumigo_dumps(request_args),
"connectionOptions": {"host": host, "port": port},
}
)


def command_finished(ret_val: Dict):
span = SpansContainer.get_span().get_last_span()
if not span:
get_logger().warning("Redis span ended without a record on its start")
return
span.update({"ended": get_current_ms_time(), "response": lumigo_dumps(ret_val)})


def command_failed(exception: Exception):
span = SpansContainer.get_span().get_last_span()
if not span:
get_logger().warning("Redis span ended without a record on its start")
return
span.update(
{"ended": get_current_ms_time(), "error": exception.args[0] if exception.args else None}
)


def execute_command_wrapper(func, instance, args, kwargs):
with lumigo_safe_execute("redis start"):
command = args[0] if args else None
request_args = args[1:] if args and len(args) > 1 else None
connection_options = instance.connection_pool.connection_kwargs
command_started(command, request_args, connection_options)
try:
ret_val = func(*args, **kwargs)
command_finished(ret_val)
except Exception as e:
command_failed(e)
raise


def execute_wrapper(func, instance, args, kwargs):
with lumigo_safe_execute("redis start"):
commands = instance.command_stack
command = [cmd[0] for cmd in commands if cmd] or None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here, the value here might be both list or None, is this ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here yes :-)

request_args = [cmd[1:] for cmd in commands if cmd and len(cmd) > 1]
connection_options = instance.connection_pool.connection_kwargs
command_started(lumigo_dumps(command), request_args, connection_options)
try:
ret_val = func(*args, **kwargs)
command_finished(ret_val)
except Exception as e:
command_failed(e)
raise


def wrap_redis():
with lumigo_safe_execute("wrap redis"):
if importlib.util.find_spec("redis"):
get_logger().debug("wrapping redis")
wrap_function_wrapper("redis.client", "Redis.execute_command", execute_command_wrapper)
wrap_function_wrapper("redis.client", "Pipeline.execute", execute_wrapper)
83 changes: 83 additions & 0 deletions src/test/unit/wrappers/redis/test_redis_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from types import SimpleNamespace

import pytest

from lumigo_tracer.spans_container import SpansContainer
from lumigo_tracer.wrappers.redis.redis_wrapper import execute_command_wrapper, execute_wrapper


@pytest.fixture
def instance():
return SimpleNamespace(
connection_pool=SimpleNamespace(connection_kwargs={"host": "lumigo"}), command_stack=None
)


def func(*args, **kwargs):
return True


def test_execute_command_wrapper_happy_flow(instance):
execute_command_wrapper(func, instance, ["SET", {"a": 1}, "b"], {})

spans = SpansContainer.get_span().spans
assert len(spans) == 1
assert spans[0]["requestCommand"] == "SET"
assert spans[0]["requestArgs"] == '[{"a": 1}, "b"]'
assert spans[0]["connectionOptions"] == {"host": "lumigo", "port": None}
assert spans[0]["ended"] >= spans[0]["started"]
assert spans[0]["response"] == "true"
assert "error" not in spans[0]


def test_execute_command_wrapper_failing_command(instance):
with pytest.raises(ZeroDivisionError):
execute_command_wrapper(lambda *args, **kwargs: 1 / 0, instance, ["SET", {"a": 1}], {})

spans = SpansContainer.get_span().spans
assert len(spans) == 1
assert spans[0]["requestCommand"] == "SET"
assert spans[0]["ended"] >= spans[0]["started"]
assert spans[0]["error"] == "division by zero"
assert "response" not in spans[0]


def test_execute_command_wrapper_unexpected_params(instance):
execute_command_wrapper(func, instance, {"not": "list"}, {})

spans = SpansContainer.get_span().spans
assert len(spans) == 0


def test_execute_wrapper_happy_flow(instance, monkeypatch):
monkeypatch.setattr(instance, "command_stack", [["SET", {"a": 1}], ["GET", "a"]])
execute_wrapper(func, instance, [], {})

spans = SpansContainer.get_span().spans
assert len(spans) == 1
assert spans[0]["requestCommand"] == '["SET", "GET"]'
assert spans[0]["requestArgs"] == '[[{"a": 1}], ["a"]]'
assert spans[0]["ended"] >= spans[0]["started"]
assert spans[0]["response"] == "true"
assert "error" not in spans[0]


def test_execute_wrapper_failing_command(instance, monkeypatch):
monkeypatch.setattr(instance, "command_stack", [["SET", {"a": 1}], ["GET", "a"]])
with pytest.raises(ZeroDivisionError):
execute_wrapper(lambda *args, **kwargs: 1 / 0, instance, [], {})

spans = SpansContainer.get_span().spans
assert len(spans) == 1
assert spans[0]["requestCommand"] == '["SET", "GET"]'
assert spans[0]["ended"] >= spans[0]["started"]
assert spans[0]["error"] == "division by zero"
assert "response" not in spans[0]


def test_execute_wrapper_unexpected_params(instance, monkeypatch):
monkeypatch.setattr(instance, "command_stack", [{"not": "list"}])
execute_wrapper(func, instance, [], {})

spans = SpansContainer.get_span().spans
assert len(spans) == 0
6 changes: 5 additions & 1 deletion src/test/unit/wrappers/test_no_wrapping_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ def test_wrapping_without_libraries(monkeypatch):
wrapper = importlib.reload(lumigo_tracer.wrappers.pymongo.pymongo_wrapper)
assert wrapper.LumigoMongoMonitoring is None

lumigo_tracer.wrappers.wrap() # should succeed
monkeypatch.setitem(sys.modules, "redis", None)
importlib.reload(lumigo_tracer.wrappers.redis.redis_wrapper)

lumigo_tracer.wrappers.wrap(force=True) # should succeed

monkeypatch.undo()
importlib.reload(lumigo_tracer.wrappers.pymongo.pymongo_wrapper)
importlib.reload(lumigo_tracer.wrappers.redis.redis_wrapper)