-
Notifications
You must be signed in to change notification settings - Fork 95
/
purview.py
312 lines (278 loc) · 11.5 KB
/
purview.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
from ..util import AtlasBaseClient
class PurviewDiscoveryClient(AtlasBaseClient):
def __init__(self, endpoint_url, authentication, **kwargs):
super().__init__(**kwargs)
self.endpoint_url = endpoint_url
self.authentication = authentication
def autocomplete(
self, keywords=None, filter=None, api_version="2022-03-01-preview", **kwargs
):
"""
Execute an autocomplete search request on Azure Purview's
`/catalog/api/search/autocomplete` endpoint.
:param dict body:
An OPTIONAL fully formed json body. If provided, all other params
will be ignored except api-version.
:param str keywords:
The keywords applied to all fields that support autocomplete
operation. It must be at least 1 character, and no more than 100
characters.
:param dict filter:
A json object that includes and, not, or conditions and ultimately
a dict that contains attributeName, operator, and attributeValue.
:param int limit: The number of search results to return.
:param str api_version: The Purview API version to use.
:return: Autocomplete Search results with a value field.
:rtype: dict
"""
req_body = {}
if "body" in kwargs:
req_body.update(kwargs["body"])
elif keywords:
req_body = {"keywords": keywords}
if filter:
req_body.update({"filter": filter})
# Additional properties
for prop in ["limit"]:
if prop in kwargs:
req_body[prop] = kwargs[prop]
else:
raise RuntimeError(
"Failed to execute autocomplete query. Please provide either a keywords or a well formed JSON body."
)
atlas_endpoint = self.endpoint_url + "/search/autocomplete"
postResult = self._post_http(
atlas_endpoint,
json=req_body,
params={"api-version": api_version}
)
return postResult.body
# TODO: Having auth issues?
def browse(self, entityType=None, api_version="2022-03-01-preview", **kwargs):
"""
Execute a browse search for Purview based on the entity against the
`/catalog/api/browse endpoint`.
:param str entityType:
The entity type to browse as the root level entry point. This must
be a valid Purview built-in or custom type.
:param str path: The path to browse the next level child entities.
:param int limit: The number of search results to return.
:param int offset: The number of search results to skip.
:param str api_version: The Purview API version to use.
:return: Search query results with @search.count and value fields.
:rtype: dict
"""
req_body = {}
if "body" in kwargs:
req_body.update(kwargs["body"])
elif entityType:
req_body = {"entityType": entityType}
# Additional properties
for prop in ["limit", "offset"]:
if prop in kwargs:
req_body[prop] = kwargs[prop]
else:
RuntimeError(
"Failed to execute browse query. Please provide either an entityType or a well formed JSON body."
)
atlas_endpoint = self.endpoint_url + "/browse"
# TODO: Implement paging with offset and limit
postResult = self._post_http(
atlas_endpoint,
json=req_body,
params={"api-version": api_version}
)
return postResult.body
def query(
self,
keywords=None,
filter=None,
facets=None,
taxonomySetting=None,
api_version="2022-03-01-preview",
**kwargs
):
"""
Execute a search query against Azure Purview's `/catalog/api/search/query`
endpoint.
:param dict body:
An optional fully formed json body. If provided, all other params
will be ignored except api-version.
:param str keywords:
The keyword to search. You can use None or '*' for wildcard, or
a string to search.
:param dict filter:
A json object that includes and, not, or conditions and ultimately
a dict that contains attributeName, operator, and attributeValue.
:param dict facets:
The kind of aggregate count you want to retrieve. Should be a dict
that contains fields: count, facet, and sort.
:param dict taxonomySetting: Undocumented.
:param int limit: The number of search results to return.
:param int offset: The number of search results to skip.
:param str api_version: The Purview API version to use.
:return: Search query results with @search.count and value fields.
:rtype: dict
"""
req_body = {}
if "body" in kwargs:
req_body.update(kwargs["body"])
elif keywords or filter:
req_body = {
"keywords": keywords,
"filter": filter,
}
if facets:
req_body.update({"facets": facets})
if taxonomySetting:
req_body.update({"taxonomySetting": taxonomySetting})
# Additional properties
for prop in ["limit", "offset"]:
if prop in kwargs:
req_body[prop] = kwargs[prop]
else:
raise RuntimeError(
"Failed to execute search query. Please provide either a keyword or a well formed JSON body."
)
atlas_endpoint = self.endpoint_url + "/search/query"
# TODO: Implement paging with offset and limit
postResult = self._post_http(
atlas_endpoint,
json=req_body,
params={"api-version": api_version}
)
return postResult.body
def suggest(
self, keywords=None, filter=None, api_version="2022-03-01-preview", **kwargs
):
"""
Execute a sugest search request on Azure Purview's
`/catalog/api/search/suggest` endpoint.
:param dict body:
An optional fully formed json body. If provided, all other params
will be ignored except api-version.
:param str keywords:
The keywords applied to all fields that support autocomplete
operation. It must be at least 1 character, and no more than 100
characters.
:param dict filter:
A json object that includes and, not, or conditions and ultimately
a dict that contains attributeName, operator, and attributeValue.
:param int limit: The number of search results to return.
:param str api_version: The Purview API version to use.
:return: Suggest Search results with a value field.
:rtype: dict
"""
req_body = {}
if "body" in kwargs:
req_body.update(kwargs["body"])
elif keywords:
req_body = {"keywords": keywords}
if filter:
req_body.update({"filter": filter})
# Additional properties
for prop in ["limit"]:
if prop in kwargs:
req_body[prop] = kwargs[prop]
else:
raise RuntimeError(
"Failed to execute suggest query. Please provide either a keywords or a well formed JSON body."
)
atlas_endpoint = self.endpoint_url + "/search/suggest"
postResult = self._post_http(
atlas_endpoint,
json=req_body,
params={"api-version": api_version}
)
return postResult.body
def _search_generator(self, **kwargs):
"""
Generator to page through the search query results.
"""
offset = kwargs["starting_offset"] if "starting_offset" in kwargs else 0
while True:
results = self.query(
keywords=kwargs.get("keywords"),
filter=kwargs.get("filter"),
facets=kwargs.get("facets"),
taxonomySetting=kwargs.get("taxonomySetting"),
api_version=kwargs["api_version"],
limit=kwargs.get("limit", 1000),
offset=offset,
**self._requests_args
)
return_values = results["value"]
return_count = len(return_values)
if return_count == 0:
return
offset = offset + return_count
# if the new offset is larger than the total result count, we'll just
# return to avoid an additional call to the service.
# This can increase the performance when the total call number is small
if offset > results['@search.count']:
return
for sub_result in return_values:
try:
yield sub_result
except StopIteration:
return
def search_entities(
self,
query,
limit=50,
search_filter=None,
starting_offset=0,
api_version="2022-03-01-preview",
**kwargs
):
"""
Search entities based on a query and automatically handles limits and
offsets to page through results.
The limit provides how many records are returned in each batch with a
maximum of 1,000 entries per page.
:param str query: The search query to be executed.
:param int limit:
A non-zero integer representing how many entities to
return for each page of the search results.
:param dict search_filter:
A json object that includes and, not, or conditions and ultimately
a dict that contains attributeName, operator, and attributeValue.
:param dict facets:
The kind of aggregate count you want to retrieve. Should be a dict
that contains fields: count, facet, and sort.
:param dict taxonomySetting: Undocumented.
:param int offset: The number of search results to skip.
:param str api_version: The Purview API version to use.
Kwargs:
:kwarg dict body: An optional fully formed json body. If provided
query/keywords, limit, search_filter/filter, and
starting_offset/offset will be updated using the values found
in the body dictionary. Any additional keys provided in `body`
will be passed along as additional kwargs.
:return: The results of your search as a generator.
:rtype: Iterator(dict)
"""
if "body" in kwargs:
req_body = kwargs.pop("body")
if "keywords" in req_body:
query = req_body.pop("keywords")
if "limit" in req_body:
limit = req_body.pop("limit")
if "filter" in req_body:
search_filter = req_body.pop("filter")
if "offset" in req_body:
starting_offset = req_body.pop("offset")
kwargs.update(req_body)
if limit > 1000 or limit < 1:
raise ValueError(
"The limit parameter must be non-zero and less than 1,000."
)
search_generator = self._search_generator(
keywords=query,
filter=search_filter,
limit=limit,
starting_offset=starting_offset,
api_version=api_version,
**kwargs
)
return search_generator