This repository has been archived by the owner on Jul 10, 2023. It is now read-only.
/
psycopg2.py
65 lines (50 loc) · 1.85 KB
/
psycopg2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import wrapt
import copy
from thundra import config
from thundra.integrations.postgre import PostgreIntegration
postgre_integration = PostgreIntegration()
class PostgreCursorWrapper(wrapt.ObjectProxy):
def __init__(self, cursor, connection_wrapper):
super(PostgreCursorWrapper, self).__init__(cursor)
self._self_connection = connection_wrapper
def execute(self, *args, **kwargs):
return postgre_integration.run_and_trace(
self.__wrapped__.execute,
self._self_connection,
args,
kwargs,
)
def callproc(self, *args, **kwargs):
return postgre_integration.run_and_trace(
self.__wrapped__.callproc,
self._self_connection,
args,
kwargs,
)
def __enter__(self):
# raise appropriate error if api not supported (should reach the user)
self.__wrapped__.__enter__
return self
class PostgreConnectionWrapper(wrapt.ObjectProxy):
def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
return PostgreCursorWrapper(cursor, self)
def _wrapper(wrapped, instance, args, kwargs):
connection = wrapped(*args, **kwargs)
return PostgreConnectionWrapper(connection)
def _wrapper_register_type(wrapped, instance, args, kwargs):
_args = list(copy.copy(args))
if len(_args) == 2 and isinstance(_args[1], (PostgreConnectionWrapper, PostgreCursorWrapper)):
_args[1] = _args[1].__wrapped__
return wrapped(*_args, **kwargs)
def patch():
if not config.rdb_integration_disabled():
wrapt.wrap_function_wrapper(
'psycopg2.extensions',
'register_type',
_wrapper_register_type
)
wrapt.wrap_function_wrapper(
'psycopg2',
'connect',
_wrapper)