/
client.py
350 lines (290 loc) · 13 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
from typing import List, Dict, Any
import asyncio
import backoff # type: ignore
import aiohttp
import requests
import spectacles.utils as utils
from spectacles.logger import GLOBAL_LOGGER as logger
from spectacles.exceptions import SpectaclesException, ApiConnectionError
JsonDict = Dict[str, Any]
class LookerClient:
"""Wraps some endpoints of the Looker API, issues requests and handles responses.
Args:
base_url: Base URL for the Looker instance, e.g. https://mycompany.looker.com.
client_id: Looker API client ID.
client_secret: Looker API client secret.
port: Desired API port to use for requests.
api_version: Desired API version to use for requests.
Attributes:
api_url: Combined URL used as a base for request building.
session: Persistent session to avoid re-authenticating.
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
port: int = 19999,
api_version: float = 3.1,
):
supported_api_versions = [3.0, 3.1]
if api_version not in supported_api_versions:
raise SpectaclesException(
f"API version {api_version} is not supported. "
"Please use one of these supported versions instead: "
f"{', '.join(str(ver) for ver in sorted(supported_api_versions))}"
)
self.base_url: str = base_url.rstrip("/")
self.api_url: str = f"{self.base_url}:{port}/api/{api_version}/"
self.session: requests.Session = requests.Session()
self.authenticate(client_id, client_secret, api_version)
def authenticate(
self, client_id: str, client_secret: str, api_version: float
) -> None:
"""Logs in to Looker's API using a client ID/secret pair and an API version.
Args:
client_id: Looker API client ID.
client_secret: Looker API client secret.
api_version: Desired API version to use for requests.
"""
logger.debug("Authenticating Looker API credentials")
url = utils.compose_url(self.api_url, path=["login"])
body = {"client_id": client_id, "client_secret": client_secret}
response = self.session.post(url=url, data=body)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as error:
details = utils.details_from_http_error(response)
raise ApiConnectionError(
f"Failed to authenticate to {url}\n"
f"Attempted authentication with client ID {client_id}\n"
f"Looker API error encountered: {error}\n"
+ "Message received from Looker's API: "
f'"{details}"'
)
access_token = response.json()["access_token"]
self.session.headers = {"Authorization": f"token {access_token}"}
logger.info(f"Connected using Looker API {api_version}")
def get_looker_release_version(self) -> str:
"""Gets the version number of connected Looker instance.
Returns:
str: Looker instance release version number (e.g. 6.22.12)
"""
logger.debug("Checking Looker instance release version")
url = utils.compose_url(self.api_url, path=["versions"])
response = self.session.get(url=url)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as error:
details = utils.details_from_http_error(response)
raise ApiConnectionError(
"Failed to get Looker instance release version\n"
f"Looker API error encountered: {error}\n"
+ "Message received from Looker's API: "
f'"{details}"'
)
return response.json()["looker_release_version"]
def validate_looker_release_version(self, required_version: str) -> bool:
"""Checks that the current Looker version meets a specified minimum.
Args:
required_version: Minimum instance version number (e.g. 6.22.12)
Returns:
bool: True if the current Looker version >= the required version
"""
current_version = self.get_looker_release_version()
logger.info(f"Looker instance version is {current_version}")
def expand_version(version: str):
return [int(number) for number in version.split(".")]
current = expand_version(current_version)
required = expand_version(required_version)
# If version is provided in format 6.20 or 7, extend with .0(s)
# e.g. 6.20 would become 6.20.0, 7 would become 7.0.0
if len(current) < 3:
current.extend([0] * (3 - len(current)))
for current_num, required_num in zip(current, required):
if current_num < required_num:
return False
elif current_num > required_num:
return True
# Loop exits successfully if current version == required version
return True
def update_session(self, project: str, branch: str) -> None:
"""Switches to a development mode session and checks out the desired branch.
Args:
project: Name of the Looker project to use.
branch: Name of the Git branch to check out.
"""
logger.debug("Updating session to use development workspace")
url = utils.compose_url(self.api_url, path=["session"])
body = {"workspace_id": "dev"}
response = self.session.patch(url=url, json=body)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as error:
details = utils.details_from_http_error(response)
raise ApiConnectionError(
f"Unable to update session to development workspace.\n"
f"Looker API error encountered: {error}\n"
+ "Message received from Looker's API: "
f'"{details}"'
)
logger.debug(f"Setting Git branch to {branch}")
url = utils.compose_url(self.api_url, path=["projects", project, "git_branch"])
body = {"name": branch}
response = self.session.put(url=url, json=body)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as error:
details = utils.details_from_http_error(response)
raise ApiConnectionError(
f"Unable to checkout Git branch {branch}. "
"If you have uncommitted changes on the current branch, "
"please commit or revert them, then try again.\n\n"
f"Looker API error encountered: {error}\n"
+ "Message received from Looker's API: "
f'"{details}"'
)
logger.info(f"Checked out branch {branch}")
def get_lookml_models(self) -> List[JsonDict]:
"""Gets all models and explores from the LookmlModel endpoint.
Returns:
List[JsonDict]: JSON response containing LookML models and explores.
"""
logger.debug(f"Getting all models and explores from {self.base_url}")
url = utils.compose_url(self.api_url, path=["lookml_models"])
response = self.session.get(url=url)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as error:
details = utils.details_from_http_error(response)
raise ApiConnectionError(
f"Unable to retrieve explores.\n"
f"Looker API error encountered: {error}\n"
+ "Message received from Looker's API: "
f'"{details}"'
)
return response.json()
def get_lookml_dimensions(self, model: str, explore: str) -> List[str]:
"""Gets all dimensions for an explore from the LookmlModel endpoint.
Args:
model: Name of LookML model to query.
explore: Name of LookML explore to query.
Returns:
List[str]: Names of all the dimensions in the specified explore. Dimension
names are returned in the format 'explore_name.dimension_name'.
"""
logger.debug(f"Getting all dimensions from explore {explore}")
url = utils.compose_url(
self.api_url, path=["lookml_models", model, "explores", explore]
)
response = self.session.get(url=url)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as error:
details = utils.details_from_http_error(response)
raise ApiConnectionError(
f'Unable to get dimensions for explore "{explore}".\n'
f"Looker API error encountered: {error}\n"
+ "Message received from Looker's API: "
f'"{details}"'
)
return response.json()["fields"]["dimensions"]
@backoff.on_exception(
backoff.expo, (aiohttp.ClientError, asyncio.TimeoutError), max_tries=2
)
async def create_query(
self,
session: aiohttp.ClientSession,
model: str,
explore: str,
dimensions: List[str],
) -> int:
"""Creates a Looker async query for one or more specified dimensions.
The query created is a SELECT query, selecting all dimensions specified for a
certain model and explore. Looker builds the query using the `sql` field in the
LookML for each dimension.
If a ClientError or TimeoutError is received, attempts to retry.
Args:
session: Existing asychronous HTTP session.
model: Name of LookML model to query.
explore: Name of LookML explore to query.
dimensions: Names of the LookML dimensions in the specified explore to
query.
Returns:
int: ID for the created query.
"""
# Using old-style string formatting so that strings are formatted lazily
logger.debug(
"Creating async query for %s/%s/%s",
model,
explore,
"*" if len(dimensions) > 1 else dimensions[0],
)
body = {"model": model, "view": explore, "fields": dimensions, "limit": 1}
url = utils.compose_url(self.api_url, path=["queries"])
async with session.post(url=url, json=body) as response:
result = await response.json()
response.raise_for_status()
query_id = result["id"]
logger.debug(
"Query for %s/%s/%s created as query %d",
model,
explore,
"*" if len(dimensions) > 1 else dimensions[0],
query_id,
)
return query_id
@backoff.on_exception(
backoff.expo, (aiohttp.ClientError, asyncio.TimeoutError), max_tries=2
)
async def create_query_task(
self, session: aiohttp.ClientSession, query_id: int
) -> str:
"""Runs a previously created query asynchronously and returns the query task ID.
If a ClientError or TimeoutError is received, attempts to retry.
Args:
session: Existing asychronous HTTP session.
query_id: ID of a previously created query to run.
Returns:
str: ID for the query task, used to check on the status of the query, which
is being run asynchronously.
"""
# Using old-style string formatting so that strings are formatted lazily
logger.debug("Starting query %d", query_id)
body = {"query_id": query_id, "result_format": "json_detail"}
url = utils.compose_url(self.api_url, path=["query_tasks"])
async with session.post(
url=url, json=body, params={"cache": "false"}
) as response:
result = await response.json()
response.raise_for_status()
query_task_id = result["id"]
logger.debug("Query %d is running under query task %s", query_id, query_task_id)
return query_task_id
def get_query_task_multi_results(self, query_task_ids: List[str]) -> JsonDict:
"""Returns query task results.
If a ClientError or TimeoutError is received, attempts to retry.
Args:
session: Existing asychronous HTTP session.
query_task_ids: IDs for the query tasks running asynchronously.
Returns:
List[JsonDict]: JSON response from the query task.
"""
# Using old-style string formatting so that strings are formatted lazily
logger.debug(
"Attempting to get results for %d query tasks", len(query_task_ids)
)
url = utils.compose_url(self.api_url, path=["query_tasks", "multi_results"])
response = self.session.get(
url=url, params={"query_task_ids": ",".join(query_task_ids)}
)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as error:
details = utils.details_from_http_error(response)
raise ApiConnectionError(
f"Looker API error encountered: {error}\n"
+ "Message received from Looker's API: "
f'"{details}"'
)
return response.json()