/
database_commands.py
330 lines (301 loc) · 14.6 KB
/
database_commands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# -*- coding: utf-8 -*- #
from pyravendb.data.indexes import IndexQuery
from pyravendb.store.session_query import QueryOperator
from pyravendb.data.patches import ScriptedPatchRequest
from pyravendb.data.indexes import IndexDefinition
from pyravendb.custom_exceptions import exceptions
from pyravendb.tools.utils import Utils
import collections
class DatabaseCommands(object):
def __init__(self, request_handler):
self._requests_handler = request_handler
self.admin_commands = self.Admin(self._requests_handler)
def change_database(self, database):
self._requests_handler.database = database
self._requests_handler.copy = True
@staticmethod
def run_async(func, func_parameter=()):
"""
@param func: The instance of the function you want to run
:type instancemethod
@param func_parameter: The function parameter
:type tuple
"""
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(func, func_parameter)
return async_result.get()
def get(self, key_or_keys, includes=None, metadata_only=False, force_read_from_master=False):
"""
@param key_or_keys: the key of the documents you want to retrieve (key can be a list of ids)
:type str or list
@param includes: array of paths in documents in which server should look for a 'referenced' document
:type list
@param metadata_only: specifies if only document metadata should be returned
:type bool
@return: A list of the id or ids we looked for (if they exists)
:rtype: dict
@param force_read_from_master: If True the reading also will be from the master
:type bool
"""
if key_or_keys is None:
raise ValueError("None Key is not valid")
path = "queries/?"
method = "GET"
data = None
if includes:
path += "".join("&include=" + Utils.quote_key(item) for item in includes)
# make get method handle a multi document requests in a single request
if isinstance(key_or_keys, list):
key_or_keys = collections.OrderedDict.fromkeys(key_or_keys)
if metadata_only:
path += "&metadata-only=True"
# If it is too big, we drop to POST (note that means that we can't use the HTTP cache any longer)
if (sum(len(x) for x in key_or_keys)) > 1024:
method = "POST"
data = list(key_or_keys)
else:
path += "".join("&id=" + Utils.quote_key(item) for item in key_or_keys)
else:
path += "&id={0}".format(Utils.quote_key(key_or_keys))
response = self._requests_handler.http_request_handler(path, method, data=data,
force_read_from_master=force_read_from_master)
if response.status_code == 200:
response = response.json()
return response
def delete(self, key, etag=None):
if key is None:
raise ValueError("None Key is not valid")
if not isinstance(key, str):
raise ValueError("key must be {0}".format(type("")))
headers = {}
if etag is not None:
headers["If-None-Match"] = etag
key = Utils.quote_key(key)
path = "docs/{0}".format(key)
response = self._requests_handler.http_request_handler(path, "DELETE", headers=headers)
if response.status_code != 204:
raise exceptions.ErrorResponseException(response.json()["Error"])
def put(self, key, document, metadata=None, etag=None):
"""
@param key: unique key under which document will be stored
:type str
@param document: document data
:type dict
@param metadata: document metadata
:type dict
@param etag: current document etag, used for concurrency checks (null to skip check)
:type str
@return: json file
:rtype: dict
"""
headers = None
if document is None:
document = {}
if metadata is None:
metadata = {}
if not isinstance(key, str):
raise ValueError("key must be {0}".format(type("")))
if not isinstance(document, dict) or not isinstance(metadata, dict):
raise ValueError("document and metadata must be dict")
data = [{"Key": key, "Document": document, "Metadata": metadata, "AdditionalData": None,
"Method": "PUT", "Etag": etag}]
if etag:
headers = {"if-None-Match": etag}
response = self._requests_handler.http_request_handler("bulk_docs", "POST", data=data, headers=headers).json()
if "Error" in response:
if "ActualEtag" in response:
raise exceptions.FetchConcurrencyException(response["Error"])
raise exceptions.ErrorResponseException(response["Error"][:85])
return response
def batch(self, commands_array):
data = []
for command in commands_array:
if not hasattr(command, 'command'):
raise ValueError("Not a valid command")
data.append(command.to_json())
response = self._requests_handler.http_request_handler("bulk_docs", "POST", data=data).json()
if "Error" in response:
raise ValueError(response["Error"])
return response
def put_index(self, index_name, index_def, overwrite=False):
"""
@param index_name:The name of the index
@param index_def: IndexDefinition class a definition of a RavenIndex
@param overwrite: if set to True overwrite
"""
if index_name is None:
raise ValueError("None index_name is not valid")
if not isinstance(index_def, IndexDefinition):
raise ValueError("index_def must be IndexDefinition type")
index_name = Utils.quote_key(index_name)
path = "indexes/{0}?definition=yes".format(index_name)
response = self._requests_handler.http_request_handler(path, "GET")
if not overwrite and response.status_code != 404:
raise exceptions.InvalidOperationException("Cannot put index:{0},index already exists".format(index_name))
data = index_def.to_json()
return self._requests_handler.http_request_handler(path, "PUT", data=data).json()
def get_index(self, index_name, force_read_from_master=False):
"""
@param index_name: Name of the index you like to get or delete
:type str
@return: json or None
:rtype: dict
@param force_read_from_master: If True the reading also will be from the master
:type bool
"""
path = "indexes/{0}?definition=yes".format(Utils.quote_key(index_name))
response = self._requests_handler.http_request_handler(path, "GET",
force_read_from_master=force_read_from_master)
if response.status_code != 200:
return None
return response.json()
def delete_index(self, index_name):
"""
@param index_name: Name of the index you like to get or delete
:type str
@return: json or None
:rtype: dict
"""
if not index_name:
raise ValueError("None or empty index_name is invalid")
path = "indexes/{0}".format(Utils.quote_key(index_name))
self._requests_handler.http_request_handler(path, "DELETE")
def update_by_index(self, index_name, query, scripted_patch=None, options=None):
"""
@param index_name: name of an index to perform a query on
:type str
@param query: query that will be performed
:type IndexQuery
@param options: various operation options e.g. AllowStale or MaxOpsPerSec
:type BulkOperationOptions
@param scripted_patch: JavaScript patch that will be executed on query results( Used only when update)
:type ScriptedPatchRequest
@return: json
:rtype: dict
"""
if not isinstance(query, IndexQuery):
raise ValueError("query must be IndexQuery Type")
path = Utils.build_path(index_name, query, options)
if scripted_patch:
if not isinstance(scripted_patch, ScriptedPatchRequest):
raise ValueError("scripted_patch must be ScriptedPatchRequest Type")
scripted_patch = scripted_patch.to_json()
response = self._requests_handler.http_request_handler(path, "EVAL", data=scripted_patch)
if response.status_code != 200 and response.status_code != 202:
raise response.raise_for_status()
return response.json()
def delete_by_index(self, index_name, query, options=None):
"""
@param index_name: name of an index to perform a query on
:type str
@param query: query that will be performed
:type IndexQuery
@param options: various operation options e.g. AllowStale or MaxOpsPerSec
:type BulkOperationOptions
@return: json
:rtype: dict
"""
path = Utils.build_path(index_name, query, options)
response = self._requests_handler.http_request_handler(path, "DELETE")
if response.status_code != 200 and response.status_code != 202:
try:
raise exceptions.ErrorResponseException(response.json()["Error"][:100])
except ValueError:
raise response.raise_for_status()
return response.json()
def patch(self, key, scripted_patch, etag=None, ignore_missing=True, default_metadata=None, patch_default=None):
from pyravendb.d_commands import commands_data
if default_metadata is None:
default_metadata = {}
batch_result = self.batch([
commands_data.ScriptedPatchCommandData(key, scripted_patch, etag, default_metadata, patch_default)])
if not ignore_missing and batch_result[0]["PatchResult"] == "DocumentDoesNotExists" and patch_default is None:
raise exceptions.DocumentDoesNotExistsException("Document with key {0} does not exist.".format(key))
return batch_result
def query(self, index_name, index_query, includes=None, metadata_only=False, index_entries_only=False,
force_read_from_master=False):
"""
@param index_name: A name of an index to query
@param force_read_from_master: If True the reading also will be from the master
:type bool
:type str
@param index_query: A query definition containing all information required to query a specified index.
:type IndexQuery
@param includes: An array of relative paths that specify related documents ids
which should be included in a query result.
:type list
@param metadata_only: True if returned documents should include only metadata without a document body.
:type bool
@param index_entries_only: True if query results should contain only index entries.
:type bool
@return:json
:rtype:dict
"""
if not index_name:
raise ValueError("index_name cannot be None or empty")
if index_query is None:
raise ValueError("None query is invalid")
if not isinstance(index_query, IndexQuery):
raise ValueError("query must be IndexQuery type")
path = "indexes/{0}?&pageSize={1}".format(Utils.quote_key(index_name), index_query.page_size)
if index_query.default_operator is QueryOperator.AND:
path += "&operator={0}".format(index_query.default_operator.value)
if index_query.query:
path += "&query={0}".format(Utils.quote_key(index_query.query))
if index_query.sort_hints:
for hint in index_query.sort_hints:
path += "&{0}".format(hint)
if index_query.sort_fields:
for field in index_query.sort_fields:
path += "&sort={0}".format(field)
if index_query.fetch:
for item in index_query.fetch:
path += "&fetch={0}".format(item)
if metadata_only:
path += "&metadata-only=true"
if index_entries_only:
path += "&debug=entries"
if includes and len(includes) > 0:
path += "".join("&include=" + item for item in includes)
response = self._requests_handler.http_request_handler(path, "GET",
force_read_from_master=force_read_from_master).json()
if "Error" in response:
raise exceptions.ErrorResponseException(response["Error"][:100])
return response
# For Admin use only (create or delete databases)
class Admin(object):
def __init__(self, requests_handler):
self.requests_handler = requests_handler
def create_database(self, database_document):
"""
Creates a database
@param database_document: has to be DatabaseDocument type
"""
if "Raven/DataDir" not in database_document.settings:
raise exceptions.InvalidOperationException("The Raven/DataDir setting is mandatory")
db_name = database_document.database_id.replace("Raven/Databases/", "")
Utils.name_validation(db_name)
path = "databases/{0}".format(Utils.quote_key(db_name))
response = self.requests_handler.http_request_handler(path, "PUT", database_document.to_json(),
admin=True)
if response.status_code == 502:
raise exceptions.ErrorResponseException(
"Connection failed please check your connection to {0}".format(self.requests_handler.url))
if response.status_code != 200:
raise exceptions.ErrorResponseException(
"Database with the name '{0}' already exists".format(database_document.database_id))
return response
def delete_database(self, db_name, hard_delete=False):
db_name = db_name.replace("Rave/Databases/", "")
path = "databases/{0}".format(Utils.quote_key(db_name))
if hard_delete:
path += "?hard-delete=true"
response = self.requests_handler.http_request_handler(path, "DELETE", admin=True)
if response.content != '' and response.content != b'':
raise exceptions.ErrorResponseException(response.content)
return response
def get_store_statistics(self):
response = self.requests_handler.http_request_handler("stats", "GET", admin=True)
if response.status_code == 200:
return response.json()