forked from getmoto/moto
-
Notifications
You must be signed in to change notification settings - Fork 1
/
models.py
218 lines (176 loc) · 6.58 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
from __future__ import unicode_literals
import functools
import inspect
import re
try:
import boto3
except ImportError:
boto3 = None
from httpretty import HTTPretty
from .responses import metadata_response
from .utils import convert_regex_to_flask_path
class MockAWS(object):
nested_count = 0
original_create_transfer_manager = None
def __init__(self, backends):
self.backends = backends
if self.__class__.nested_count == 0:
HTTPretty.reset()
def __call__(self, func, reset=True):
if inspect.isclass(func):
return self.decorate_class(func)
return self.decorate_callable(func, reset)
def __enter__(self):
self.start()
def __exit__(self, *args):
self.stop()
def start(self, reset=True):
self.__class__.nested_count += 1
if reset:
for backend in self.backends.values():
backend.reset()
if not HTTPretty.is_enabled():
HTTPretty.enable()
if boto3 and self.__class__.original_create_transfer_manager is None:
boto3.client('s3') # Ensure that boto.s3 exists
original_create_transfer_manager = boto3.s3.transfer.create_transfer_manager
self.__class__.original_create_transfer_manager = original_create_transfer_manager
def patched_create_transfer_manager(client, config, *args, **kwargs):
config.use_threads = False
return original_create_transfer_manager(client, config, *args, **kwargs)
boto3.s3.transfer.create_transfer_manager = patched_create_transfer_manager
for method in HTTPretty.METHODS:
backend = list(self.backends.values())[0]
for key, value in backend.urls.items():
HTTPretty.register_uri(
method=method,
uri=re.compile(key),
body=value,
)
# Mock out localhost instance metadata
HTTPretty.register_uri(
method=method,
uri=re.compile('http://169.254.169.254/latest/meta-data/.*'),
body=metadata_response
)
def stop(self):
self.__class__.nested_count -= 1
if self.__class__.nested_count < 0:
raise RuntimeError('Called stop() before start().')
if self.__class__.nested_count == 0:
HTTPretty.disable()
HTTPretty.reset()
if boto3:
boto3.s3.transfer.create_transfer_manager = self.__class__.original_create_transfer_manager
self.__class__.original_create_transfer_manager = None
def decorate_callable(self, func, reset):
def wrapper(*args, **kwargs):
self.start(reset=reset)
try:
result = func(*args, **kwargs)
finally:
self.stop()
return result
functools.update_wrapper(wrapper, func)
wrapper.__wrapped__ = func
return wrapper
def decorate_class(self, klass):
for attr in dir(klass):
if attr.startswith("_"):
continue
attr_value = getattr(klass, attr)
if not hasattr(attr_value, "__call__"):
continue
# Check if this is a classmethod. If so, skip patching
if inspect.ismethod(attr_value) and attr_value.__self__ is klass:
continue
try:
setattr(klass, attr, self(attr_value, reset=False))
except TypeError:
# Sometimes we can't set this for built-in types
continue
return klass
class Model(type):
def __new__(self, clsname, bases, namespace):
cls = super(Model, self).__new__(self, clsname, bases, namespace)
cls.__models__ = {}
for name, value in namespace.items():
model = getattr(value, "__returns_model__", False)
if model is not False:
cls.__models__[model] = name
for base in bases:
cls.__models__.update(getattr(base, "__models__", {}))
return cls
@staticmethod
def prop(model_name):
""" decorator to mark a class method as returning model values """
def dec(f):
f.__returns_model__ = model_name
return f
return dec
class BaseBackend(object):
def reset(self):
self.__dict__ = {}
self.__init__()
@property
def _url_module(self):
backend_module = self.__class__.__module__
backend_urls_module_name = backend_module.replace("models", "urls")
backend_urls_module = __import__(backend_urls_module_name, fromlist=['url_bases', 'url_paths'])
return backend_urls_module
@property
def urls(self):
"""
A dictionary of the urls to be mocked with this service and the handlers
that should be called in their place
"""
url_bases = self._url_module.url_bases
unformatted_paths = self._url_module.url_paths
urls = {}
for url_base in url_bases:
for url_path, handler in unformatted_paths.items():
url = url_path.format(url_base)
urls[url] = handler
return urls
@property
def url_paths(self):
"""
A dictionary of the paths of the urls to be mocked with this service and
the handlers that should be called in their place
"""
unformatted_paths = self._url_module.url_paths
paths = {}
for unformatted_path, handler in unformatted_paths.items():
path = unformatted_path.format("")
paths[path] = handler
return paths
@property
def url_bases(self):
"""
A list containing the url_bases extracted from urls.py
"""
return self._url_module.url_bases
@property
def flask_paths(self):
"""
The url paths that will be used for the flask server
"""
paths = {}
for url_path, handler in self.url_paths.items():
url_path = convert_regex_to_flask_path(url_path)
paths[url_path] = handler
return paths
def decorator(self, func=None):
if func:
return MockAWS({'global': self})(func)
else:
return MockAWS({'global': self})
class base_decorator(object):
mock_backend = MockAWS
def __init__(self, backends):
self.backends = backends
def __call__(self, func=None):
if func:
return self.mock_backend(self.backends)(func)
else:
return self.mock_backend(self.backends)