-
Notifications
You must be signed in to change notification settings - Fork 10
RD-3763 - sqlalchemy #159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RD-3763 - sqlalchemy #159
Changes from all commits
60bc2ec
e471345
151c500
8383ea1
f56b826
613ef65
077f1be
1c0d3ec
1764e73
838e077
feb6abd
846712f
fd8a27d
8f1114c
5a9c402
c5caf09
df61e65
865fcee
2e83944
4d0a2fa
3f0bf5c
f06a1c3
188bf41
2809c3a
4520a2e
03713d8
b738fc5
f837d38
64f8b1d
578eabf
a0f8ab0
1a81a51
dc34b79
c88874d
145b19d
8cc3c64
8b6430f
7ecd115
f773f26
c9d82e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,3 +8,4 @@ attrs==19.1.0 | |
requests==2.24.0 | ||
pymongo==3.11.0 | ||
redispy==3.0.0 | ||
sqlalchemy==1.3.20 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import importlib | ||
import uuid | ||
|
||
from lumigo_tracer.libs.wrapt import wrap_function_wrapper | ||
from lumigo_tracer.lumigo_utils import ( | ||
lumigo_safe_execute, | ||
get_logger, | ||
lumigo_dumps, | ||
get_current_ms_time, | ||
) | ||
from lumigo_tracer.spans_container import SpansContainer | ||
|
||
try: | ||
from sqlalchemy.event import listen | ||
except Exception: | ||
listen = None | ||
|
||
|
||
SQL_SPAN = "mySql" | ||
|
||
|
||
def _before_cursor_execute(conn, cursor, statement, parameters, context, executemany): | ||
with lumigo_safe_execute("handle sqlalchemy before execute"): | ||
SpansContainer.get_span().add_span( | ||
{ | ||
"id": str(uuid.uuid4()), | ||
"type": SQL_SPAN, | ||
"started": get_current_ms_time(), | ||
"connectionParameters": { | ||
"host": conn.engine.url.host or conn.engine.url.database, | ||
"port": conn.engine.url.port, | ||
"database": conn.engine.url.database, | ||
"user": conn.engine.url.username, | ||
}, | ||
"query": lumigo_dumps(statement), | ||
"values": lumigo_dumps(parameters), | ||
} | ||
) | ||
|
||
|
||
def _after_cursor_execute(conn, cursor, statement, parameters, context, executemany): | ||
with lumigo_safe_execute("handle sqlalchemy after execute"): | ||
span = SpansContainer.get_span().get_last_span() | ||
if not span: | ||
get_logger().warning("Redis span ended without a record on its start") | ||
return | ||
span.update({"ended": get_current_ms_time(), "response": ""}) | ||
|
||
|
||
def _handle_error(context): | ||
with lumigo_safe_execute("handle sqlalchemy error"): | ||
span = SpansContainer.get_span().get_last_span() | ||
if not span: | ||
get_logger().warning("Redis span ended without a record on its start") | ||
return | ||
span.update( | ||
{ | ||
"ended": get_current_ms_time(), | ||
"error": lumigo_dumps( | ||
{ | ||
"type": context.original_exception.__class__.__name__, | ||
"args": context.original_exception.args, | ||
} | ||
), | ||
} | ||
) | ||
|
||
|
||
def execute_wrapper(func, instance, args, kwargs): | ||
result = func(*args, **kwargs) | ||
with lumigo_safe_execute("sqlalchemy: listen to engine"): | ||
listen(result, "before_cursor_execute", _before_cursor_execute) | ||
listen(result, "after_cursor_execute", _after_cursor_execute) | ||
listen(result, "handle_error", _handle_error) | ||
Comment on lines
+71
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you forgot to commit this change 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did! checkout line 80 |
||
return result | ||
|
||
|
||
def wrap_sqlalchemy(): | ||
with lumigo_safe_execute("wrap sqlalchemy"): | ||
if importlib.util.find_spec("sqlalchemy") and listen: | ||
get_logger().debug("wrapping sqlalchemy") | ||
wrap_function_wrapper( | ||
"sqlalchemy.engine.strategies", "DefaultEngineStrategy.create", execute_wrapper | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import uuid | ||
|
||
import pytest | ||
|
||
from sqlalchemy.exc import OperationalError | ||
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData | ||
from sqlalchemy.sql import select | ||
|
||
from lumigo_tracer.lumigo_utils import DEFAULT_MAX_ENTRY_SIZE | ||
from lumigo_tracer.spans_container import SpansContainer | ||
from lumigo_tracer.tracer import lumigo_tracer | ||
|
||
|
||
md = MetaData() | ||
Users = Table("users", md, Column("id", Integer, primary_key=True), Column("name", String)) | ||
|
||
|
||
@pytest.fixture | ||
def db(tmp_path): | ||
path = tmp_path / "file.db" | ||
engine = create_engine(fr"sqlite:///{path}") | ||
md.create_all(engine) | ||
yield f"sqlite:///{path}" | ||
|
||
|
||
def test_happy_flow(context, db): | ||
@lumigo_tracer(token="123") | ||
def lambda_test_function(event, context): | ||
engine = create_engine(db) | ||
conn = engine.connect() | ||
conn.execute(Users.insert().values(name="saart")) | ||
result = conn.execute(select([Users])) | ||
return result.fetchone() | ||
|
||
assert lambda_test_function({}, context) == (1, "saart") | ||
http_spans = SpansContainer.get_span().spans | ||
|
||
assert len(http_spans) == 2 | ||
assert http_spans[0]["query"] == '"INSERT INTO users (name) VALUES (?)"' | ||
assert http_spans[0]["values"] == '["saart"]' | ||
assert http_spans[0]["ended"] >= http_spans[0]["started"] | ||
|
||
assert http_spans[1]["query"] == '"SELECT users.id, users.name \\nFROM users"' | ||
assert http_spans[1]["values"] == "[]" | ||
assert http_spans[0]["ended"] >= http_spans[0]["started"] | ||
|
||
|
||
def test_non_existing_table(context, db): | ||
@lumigo_tracer(token="123") | ||
def lambda_test_function(event, context): | ||
others = Table("others", md, Column("id", Integer, primary_key=True)) | ||
engine = create_engine(db) | ||
conn = engine.connect() | ||
result = conn.execute(select([others])) | ||
return result.fetchone() | ||
|
||
with pytest.raises(OperationalError): | ||
lambda_test_function({}, context) | ||
|
||
http_spans = SpansContainer.get_span().spans | ||
|
||
assert len(http_spans) == 1 | ||
assert http_spans[0]["query"] == '"SELECT others.id \\nFROM others"' | ||
assert ( | ||
http_spans[0]["error"] == '{"type": "OperationalError", "args": ["no such table: others"]}' | ||
) | ||
assert http_spans[0]["ended"] >= http_spans[0]["started"] | ||
|
||
|
||
def test_pruning_long_strings(context, db): | ||
@lumigo_tracer(token="123") | ||
def lambda_test_function(event, context): | ||
engine = create_engine(db) | ||
conn = engine.connect() | ||
conn.execute(Users.insert().values(name="a" * (DEFAULT_MAX_ENTRY_SIZE * 5))) | ||
|
||
lambda_test_function({}, context) | ||
http_spans = SpansContainer.get_span().spans | ||
|
||
assert len(http_spans) == 1 | ||
assert len(http_spans[0]["values"]) <= DEFAULT_MAX_ENTRY_SIZE * 2 | ||
|
||
|
||
def test_exception_in_wrapper(context, db, monkeypatch): | ||
@lumigo_tracer(token="123") | ||
def lambda_test_function(event, context): | ||
engine = create_engine(db) | ||
conn = engine.connect() | ||
conn.execute(Users.insert().values(name="a" * (DEFAULT_MAX_ENTRY_SIZE * 5))) | ||
|
||
monkeypatch.setattr(uuid, "uuid4", lambda: 1 / 0) | ||
|
||
lambda_test_function({}, context) # No exception | ||
|
||
assert not SpansContainer.get_span().spans |
Uh oh!
There was an error while loading. Please reload this page.