-
Notifications
You must be signed in to change notification settings - Fork 41
/
update-crownlabs-image-list.py
346 lines (274 loc) · 12.4 KB
/
update-crownlabs-image-list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/env python3
import argparse
import json
import logging
import sched
import time
import grequests
import kubernetes
import requests
class ImageListUpdater:
"""
This class periodically requests of the list of images from the Docker registy
and saves the obtained information as a Kubernetes object.
"""
def __init__(self, image_list_requestor, image_list_saver, registry_adv_name):
"""
Initializes the object.
:param image_list_requestor: the handler used to request the list of images from the Docker registry.
:param image_list_saver: the handler used to save the retrieved information as a Kubernetes object.
:param registry_adv_name: the Docker registry host name advertised in the ImageList object.
"""
self.image_list_requestor = image_list_requestor
self.image_list_saver = image_list_saver
self.registry_adv_name = registry_adv_name
self.scheduler = sched.scheduler(time.time, time.sleep)
def run_update_process(self, interval):
"""
Starts the scheduler loop to request and save the image list.
:param interval: The interval (in seconds) between one execution and the following.
"""
self._run_periodically(interval, self.update)
self.scheduler.run()
def update(self):
"""
Performs the actual update process.
"""
start = time.time()
logger.debug("Starting the update process")
try:
# Obtain the list of images from the Docker registry
images = self.image_list_requestor.get_image_list()
except (requests.exceptions.RequestException, ValueError):
logger.exception("Failed to retrieve data from upstream")
return
try:
self.image_list_saver.update_image_list({
"registryName": self.registry_adv_name,
"images": ImageListUpdater.__process_image_list(images),
})
except kubernetes.client.rest.ApiException:
logger.exception("Failed to save data as ImageList")
return
logger.info(f"Update process correctly completed in {time.time() - start:.2f} seconds")
def _run_periodically(self, interval, action, *args, **kwargs):
"""
Runs a given action periodically.
:param interval: the interval between multiple executions (in seconds).
:param action: the function to be executed periodically.
:param kwargs: keyworded arguments passed to the action function.
"""
logger.debug(f"Executing '{action.__name__}': next scheduled in {interval} seconds")
# Schedule the next executen
self.scheduler.enter(
delay=interval, priority=1, action=self._run_periodically,
argument=(interval, action), kwargs=kwargs)
# Execute the action
action(**kwargs)
@staticmethod
def __process_image_list(images):
"""
Processes the list of images returned from upstream to remove the "latest" tags
and converts it to the correct format expected by Kubernetes.
:param images: the list of images retrieved from the Docker registry.
:return: a list of images, suitable to be saved as ImageList.
"""
converted_images = []
for image in images:
# Remove the "latest" tags
try:
image.get("tags", []).remove("latest")
except ValueError:
pass
# Are there still any tags?
if image.get("tags", []):
converted_images.append({
"name": image.get("name"),
"versions": image.get("tags"),
})
return converted_images
class ImageListRequestor:
"""
This class interacts with the docker registry to get the list of images currently available.
"""
def __init__(self, url, username=None, password=None):
"""
Initializes the object.
:param url: the url used to contact the Docker registry.
:param username: the username used to access the Docker registry (optional).
:param password: the password used to access the Docker registry (optional).
"""
self.url = url
self.auth = (username, password)
def get_image_list(self):
"""
Performs the requests upstream to retrieve the information about the images.
:return: an object representing the information retrieved.
"""
logger.debug("Requesting the registry catalog upstream")
repositories = self._do_single_get(
ImageListRequestor.__get_catalog_path()
).get("repositories", [])
logger.debug("Requesting the image details upstream")
images = self._do_parallel_gets(
ImageListRequestor.__map_repositories_to_paths(repositories)
)
return images
def _do_single_get(self, path):
"""
Performs a GET to the target path and returns the result.
:param path: the path to be retrieved.
:return: the json object extracted from the response.
"""
return requests.get(url=f"{self.url}{path}", auth=self.auth).json()
def _do_parallel_gets(self, paths):
"""
Performs a set of parallel GETs to the target paths and returns the results.
:param paths: the paths to be retrieved.
:return: the set of json object extracted from the response.
"""
requests = (grequests.get(f"{self.url}{path}", auth=self.auth) for path in paths)
return (response.json() for response in grequests.imap(requests))
@staticmethod
def __get_catalog_path():
"""
Returns the URL path corresponding to the catalog.
:return: the URL path corresponding to the catalog.
"""
return "/v2/_catalog"
@staticmethod
def __map_repositories_to_paths(repositories):
"""
Returns the URL paths to obtain detailed information about the repositories.
:param repositories: the set of repositories of interest.
:return: the URL paths corresponding to the input repositories.
"""
return (f"/v2/{repo}/tags/list" for repo in repositories)
class ImageListSaver:
"""
Saves the list of images retrieved from the Docker registry as a Kubernetes object.
"""
def __init__(self, name):
"""
Initializes the object and loads the Kubernetes configuration.
:param name: The name assigned to the ImageList resource.
"""
self.name = name
try:
# Configuration loaded from a kube config
kubernetes.config.load_kube_config()
except kubernetes.config.config_exception.ConfigException:
# Configuration loaded from within a pod
kubernetes.config.load_incluster_config()
def update_image_list(self, image_list_spec):
"""
Updates the content or creates a new ImageList object.
:param image_list_spec: the content of the ImageList object to be updated.
"""
resource_version = self._get_image_list_version()
if resource_version:
self._update_image_list(image_list_spec, resource_version)
else:
self._create_image_list(image_list_spec)
def _get_image_list_version(self):
"""
Gets the current version of the ImageList.
:returns: the current version of the image list or None
"""
api_instance = kubernetes.client.CustomObjectsApi()
try:
data = api_instance.get_cluster_custom_object(
**ImageListSaver.__get_imagelist_args(), name=self.name
)
except kubernetes.client.rest.ApiException:
return None
resource_version = data.get("metadata", {}).get("resourceVersion", None)
logger.debug(f"Retrieved ImageList resource version: '{resource_version}'")
return resource_version
def _create_image_list(self, image_list_spec):
"""
Creates a new ImageList object.
:param image_list_spec: the content of the ImageList object to be created.
"""
api_instance = kubernetes.client.CustomObjectsApi()
api_instance.create_cluster_custom_object(
**ImageListSaver.__get_imagelist_args(),
body=self._create_image_list_object(image_list_spec)
)
logger.debug(f"ImageList '{self.name}' correctly created'")
def _update_image_list(self, image_list_spec, resource_version):
"""
Updates an existing ImageList object.
:param image_list_spec: the content of the ImageList object to be updated.
:param resource_version: the version of the resource to be updated.
"""
api_instance = kubernetes.client.CustomObjectsApi()
api_instance.replace_cluster_custom_object(
**ImageListSaver.__get_imagelist_args(), name=self.name,
body=self._create_image_list_object(image_list_spec, resource_version)
)
logger.debug(f"ImageList '{self.name}' correctly updated'")
def _create_image_list_object(self, image_list_spec, resource_version=None):
"""
Creates a new ImageList object, given the spec body.
:param image_list_spec: the content of the ImageList object to be created.
:param resource_version: the version of the resource to be updated.
:returns: the ImageList json representation.
"""
return {
"apiVersion": "crownlabs.polito.it/v1alpha1",
"kind": "ImageList",
"metadata": {
"name": self.name,
"resourceVersion": resource_version,
},
"spec": image_list_spec,
}
@staticmethod
def __get_imagelist_args():
"""
Returns the parameters describing the ImageList API.
"""
return {"group": "crownlabs.polito.it", "version": "v1alpha1", "plural": "imagelists"}
# Initialize the logger object
logger = logging.getLogger("list-crownlabs-images")
def configure_logger():
"""
Configures the logger object with the required configuration
"""
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)
logger.handlers.clear()
logger.addHandler(console_handler)
if __name__ == "__main__":
# Configure the logger
configure_logger()
# Parse the command line arguments
parser = argparse.ArgumentParser(description=\
"Periodically requests the list of images from a Docker registry and stores it as a Kubernetes CR")
parser.add_argument("--advertised-registry-name", required=True,
help="the host name of the Docker registry where the images can be retrieved")
parser.add_argument("--image-list-name", required=True,
help="the name assigned to the resulting ImageList object")
parser.add_argument("--registry-url", required=True, help="the URL used to contact the Docker registry")
parser.add_argument("--registry-username", help="the username used to access the Docker registry")
parser.add_argument("--registry-password", help="the password used to access the Docker registry")
parser.add_argument("--update-interval", required=True, type=int,
help="the interval (in seconds) between one update and the following")
args = parser.parse_args()
# Create the object reading the list of the images from the Docker registry
logger.info(f"Upstream Docker registry: '{args.registry_url}' - Username: '{args.registry_username}'")
image_list_requestor = ImageListRequestor(args.registry_url, args.registry_username, args.registry_password)
# Create the object saving the retrieved information as a Kubernetes object
logger.info(f"Target ImageList object: '{args.image_list_name}'")
image_list_saver = ImageListSaver(args.image_list_name)
# Create the object periodically perforing the update process
image_list_updater = ImageListUpdater(image_list_requestor, image_list_saver, args.advertised_registry_name)
logger.info(f"Starting the update process")
try:
image_list_updater.run_update_process(args.update_interval)
except KeyboardInterrupt:
logger.info("Received stop signal. Exiting")