-
Notifications
You must be signed in to change notification settings - Fork 0
/
delete-package-versions.py
executable file
·311 lines (272 loc) · 9.63 KB
/
delete-package-versions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#!/usr/bin/env python3
import os
import sys
import requests
import base64
import json
import urllib.parse
# GitHub PAT used for performing the API calls
TOKEN = os.environ.get("INPUT_TOKEN")
# GitHub organization / ghcr.io repo owner
PACKAGE_OWNER = os.environ.get("INPUT_PACKAGE-OWNER")
# Name of the package, must include relative path
PACKAGE_NAME = os.environ.get("INPUT_PACKAGE-NAME")
# Number of versions to keep for the package
KEEP_VERSIONS = int(os.environ.get("INPUT_KEEP-VERSIONS", 5))
# Delete versions which are not referenced by any tag
DELETE_ORPHANS = (os.environ.get("INPUT_DELETE-ORPHANS", 'false') == 'true')
# Don't perform DELETE API calls
DRY_RUN = (os.environ.get("INPUT_DRY-RUN", 'false') == 'true')
# API response caches
all_versions = []
all_versions_tagged = []
# Encode a string to base64
def base64_encode(value):
return base64.b64encode(str(value).encode()).decode()
# Urlencode a string
def urlencode(value):
return urllib.parse.quote(str(value), safe='')
# Pretty print response errors
def response_errors(response):
status_code = response.status_code
try:
data = response.json()
if 'errors' in data:
for error in data.get('errors', []):
print(f"-> {error['code']}: {error['message']} (HTTP Status {status_code})")
elif 'message' in data:
print(f"!! {data['message']} (HTTP Status {status_code})")
except json.decoder.JSONDecodeError as e:
print(f"!! Unknown error (HTTP Status {status_code})")
print(response.content.decode())
# Perform an API call to the ghcr.io registry requesting the
# multi-arch manifest for the given SHA digest to find out
# all versions which are related to it.
def get_children_versions(sha):
response = requests.get(
f"https://ghcr.io/v2/{PACKAGE_OWNER}/{PACKAGE_NAME}/manifests/{sha}",
# The response Content-Type header will change when the manifest
# contains OCI annotations so it's mandatory to send the correct
# Accept headers.
headers={
"Accept": ','.join([
'application/vnd.docker.distribution.manifest.v2+json',
'application/vnd.oci.image.index.v1+json'
]),
"Authorization": f"Bearer {base64_encode(TOKEN)}"
}
)
if response.status_code == 404:
print(f"WARNING: No children package versions found for {sha}")
response_errors(response)
return []
elif response.status_code != 200:
print(f"ERROR: Failed to get children package versions for {sha}")
response_errors(response)
sys.exit(1)
result = []
for manifest in response.json()['manifests']:
for version in get_all_versions():
if version['name'] == manifest['digest']:
child_version = version
# Add the platform name as an extra information
child_version['platform'] = f"%s/%s" % (
manifest['platform']['os'],
manifest['platform']['architecture']
)
result.append(child_version)
break
return result
# Retrieve all versions for a package and stores the
# result into the cache for later retrieval
def get_all_versions():
global all_versions
if len(all_versions) == 0:
package_name = urlencode(PACKAGE_NAME)
response = requests.get(
f"https://api.github.com/orgs/{PACKAGE_OWNER}/packages/container/{package_name}/versions",
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {TOKEN}"
}
)
if response.status_code == 404:
# Package not found
return []
elif response.status_code != 200:
print(f"ERROR: Failed to fetch all package versions")
response_errors(response)
sys.exit(1)
# Store the value into cache
all_versions = response.json()
return all_versions
# Parses the response from get_all_versions() to retrieve
# all tagged versions together with their children manifests.
# Stores the information into cache for later retrieval.
def get_all_versions_tagged():
global all_versions_tagged
if len(all_versions_tagged) == 0:
for version in get_all_versions():
tags = get_version_tag(version)
if len(tags) > 0:
tagged_version = version
tagged_version['children'] = get_children_versions( version['name'] )
all_versions_tagged.append(tagged_version)
# Store the value into cache
return all_versions_tagged
# Slice the get_all_versions_tagged() result to return a list
# containing the versions to keep and the versions to delete,
# based on the KEEP_VERSIONS parameter
def get_versions_to_delete():
versions = get_all_versions_tagged()
return [
versions[ 0:KEEP_VERSIONS ], # versions to keep
versions[ KEEP_VERSIONS: ] # versions to delete
]
# Extract and compares all version ids from get_all_version()
# and get_all_versions_tagged() to retrieve all versions which
# do not have any associated tag.
def get_orphan_versions():
diff_version_ids = list(
set(extract_version_ids(get_all_versions())) -
set(extract_version_ids(get_all_versions_tagged()))
)
result = []
for diff_version_id in diff_version_ids:
for version in get_all_versions():
if diff_version_id == version['id']:
result.append(version)
break
return result
# Extract all 'id' fields from the given data and return
# them as a list
def extract_version_ids(data):
ids = []
if isinstance(data, dict):
if "id" in data:
ids.append(data["id"])
for key, value in data.items():
ids.extend(extract_version_ids(value))
elif isinstance(data, list):
for item in data:
ids.extend(extract_version_ids(item))
return sorted(ids)
# Shortcut for getting a specific tag for a package
def get_version_tag(version):
return ", ".join(
version['metadata']['container']['tags']
)
# Perform the API call to delete a version
def delete_version(version_id):
# Make to return true if DRY_RUN
if DRY_RUN:
return True
package_name = urlencode(PACKAGE_NAME)
response = requests.delete(
f"https://api.github.com/orgs/{PACKAGE_OWNER}/packages/container/{package_name}/versions/{version_id}",
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {TOKEN}"
}
)
if response.status_code == 204:
return True
else:
print(f"!! Failed to delete version {version_id}")
response_errors(response)
return False
# Perform the API call to delete a package
def delete_package():
# Make sure to return true if DRY_RUN
if DRY_RUN:
return True
package_name = urlencode(PACKAGE_NAME)
response = requests.delete(
f"https://api.github.com/orgs/{PACKAGE_OWNER}/packages/container/{package_name}",
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {TOKEN}"
}
)
if response.status_code == 204:
return True
else:
print(f"!! Failed to delete package {PACKAGE_OWNER}/{PACKAGE_NAME}")
response_errors(response)
return False
# Main script logic
def main():
# Parameters validation
if not PACKAGE_OWNER or not PACKAGE_NAME or not TOKEN:
print("Please set at least PACKAGE_OWNER, PACKAGE_NAME and TOKEN environment variables.")
sys.exit(1)
i=0
if DRY_RUN:
print("WARNING: Running in DRY-RUN mode")
print(f"Performing API calls for package {PACKAGE_OWNER}/{PACKAGE_NAME} ...")
# Loop over the get_version_to_delete() dict and perform the deletion
keep_versions, delete_versions = get_versions_to_delete()
# Counters
num_keep_versions = len(keep_versions)
num_delete_versions = len(delete_versions)
# Process package versions to keep
if num_keep_versions > 0:
print(f"The following {num_keep_versions} package version(s) will be kept:")
for keep_version in keep_versions:
keep_version_tag = get_version_tag(keep_version)
print( f" %-10s : %s (%s)" % (
keep_version['id'],
keep_version['name'],
keep_version_tag,
))
else:
print("There are no package versions to keep")
# GitHub will prevent deleting all versions for a package, returning 400
# when attempting to remove the last version, so a full package removal
# is required.
if num_delete_versions > 0:
print(f"Deleting package {PACKAGE_NAME} with ALL its versions!")
delete_package()
# Nothing left to do since the package has been deleted.
# We can safely skip all the rest.
sys.exit(0)
# Process package versions to delete
if num_delete_versions > 0:
print(f"Deleting {num_delete_versions} package version(s) ...")
for parent_version in delete_versions:
parent_version_tag = get_version_tag(parent_version)
# Delete root version
print( f" %-10s : %s (%s, multi-arch)" % (
parent_version['id'],
parent_version['name'],
parent_version_tag,
))
delete_version( parent_version['id'] )
# Delete children versions if any
for child_version in parent_version['children']:
print( f" %-10s : %s (%s, %s)" % (
child_version['id'],
child_version['name'],
parent_version_tag,
child_version['platform'],
))
delete_version( child_version['id'] )
else:
print("There are no package versions to delete")
# Process orphan package versions to delete
if DELETE_ORPHANS:
orphan_versions = get_orphan_versions()
num_orphan_versions = len(orphan_versions)
if num_orphan_versions > 0:
print(f"Deleting {num_orphan_versions} orphan container package version(s) ...")
for orphan_version in orphan_versions:
if delete_version( orphan_version['id'] ):
print(f" %-10s : %s" % (
orphan_version['id'],
orphan_version['name']
))
else:
print("There are no orphan package versions to delete")
# Main
if __name__ == "__main__":
main()