-
Notifications
You must be signed in to change notification settings - Fork 1
/
convert_images.py
181 lines (143 loc) · 5.58 KB
/
convert_images.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import json
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from list_of_images_to_optimize import Image, images_to_optimize
CONVERTED_IMAGES_PREFIX = "ghcr.io/stargz-containers"
NUMBER_OF_THREADS = 1
PUSH = "--push" in sys.argv
def run(args: list):
print("--> Executing: ", " ".join(args))
subprocess.check_call(args)
def get_normalized_image_name(docker_image_name: str) -> str:
"""We add docker.io/library/ if necessary"""
if "/" not in docker_image_name:
return "docker.io/library/" + docker_image_name
else:
return "docker.io/" + docker_image_name
class ConversionJob:
def __init__(self, src_image: Image):
self.src_image = src_image
def ctr_remote_image_optimize(
self, optimize: bool = True, zstdchunked: bool = False
):
src_image_name = get_normalized_image_name(self.src_image.name)
additional_options = []
if self.src_image.entrypoint is not None:
additional_options += [
"--entrypoint",
json.dumps(self.src_image.entrypoint),
]
for mount_src, mount_dst in self.src_image.mount:
mount_src = (Path(__file__).parent / mount_src).absolute()
additional_options += [
"--mount",
f"type=bind,src={mount_src},dst={mount_dst},options=rbind",
]
for env_name, env_value in self.src_image.env.items():
additional_options += ["--env", f"{env_name}={env_value}"]
if not optimize:
additional_options.append("--no-optimize")
if zstdchunked:
additional_options.append("--zstdchunked")
run(
[
"ctr-remote",
"image",
"optimize",
"--oci",
"--period=30",
]
+ additional_options
+ [
src_image_name,
self.converted_image_name,
]
)
@property
def converted_image_name(self) -> str:
raise NotImplementedError("You need to subclass and implement this method")
def convert(self):
raise NotImplementedError("You need to subclass and implement this method")
def job_was_already_done(self) -> bool:
"""We check if the docker image already exists"""
try:
run(["crane", "digest", self.converted_image_name])
return True
except subprocess.CalledProcessError:
return False
def pull_convert_and_push_if_necessary(self):
if self.job_was_already_done():
print(f"--> Image {self.converted_image_name} is already in the registry")
return
print(f"--> Image {self.converted_image_name} not in registry, converting...")
self.pull_convert_and_push()
def pull_convert_and_push(self):
src_image_name = get_normalized_image_name(self.src_image.name)
run(["nerdctl", "pull", "-q", src_image_name])
self.convert()
if PUSH:
# we might need to sleep a bit to make sure the image is available for push
# I'm not sure if this is needed, but we had some flakyness in the
# push step in the CI, so it's what I tried.
time.sleep(5)
run(["nerdctl", "push", self.converted_image_name])
print(f"--> Pushed {self.converted_image_name} to registry")
class OriginalConversionJob(ConversionJob):
@property
def converted_image_name(self) -> str:
return f"{CONVERTED_IMAGES_PREFIX}/{self.src_image.name}-org"
def pull_convert_and_push(self):
src_image_name = get_normalized_image_name(self.src_image.name)
run(
[
"crane",
"copy",
"--platform",
"linux/amd64",
src_image_name,
self.converted_image_name,
]
)
print(f"--> Pushed {self.converted_image_name} to registry")
class StargzConversionJob(ConversionJob):
@property
def converted_image_name(self) -> str:
return f"{CONVERTED_IMAGES_PREFIX}/{self.src_image.name}-esgz-noopt"
def convert(self):
self.ctr_remote_image_optimize(optimize=False)
class EStargzConversionJob(ConversionJob):
@property
def converted_image_name(self) -> str:
return f"{CONVERTED_IMAGES_PREFIX}/{self.src_image.name}-esgz"
def convert(self):
self.ctr_remote_image_optimize()
class EStargzZstdchunkedConversionJob(ConversionJob):
@property
def converted_image_name(self) -> str:
return f"{CONVERTED_IMAGES_PREFIX}/{self.src_image.name}-zstdchunked"
def convert(self):
self.ctr_remote_image_optimize(zstdchunked=True)
def main():
conversion_jobs = []
for image_and_args in images_to_optimize:
if PUSH:
# this is just transferring layers. If we don't have permission to push,
# we can't do it.
conversion_jobs.append(OriginalConversionJob(image_and_args))
conversion_jobs += [
StargzConversionJob(image_and_args),
EStargzConversionJob(image_and_args),
EStargzZstdchunkedConversionJob(image_and_args),
]
if NUMBER_OF_THREADS == 1:
for job in conversion_jobs:
job.pull_convert_and_push_if_necessary()
else:
with ThreadPoolExecutor(max_workers=NUMBER_OF_THREADS) as pool:
pool.map(ConversionJob.pull_convert_and_push_if_necessary, conversion_jobs)
print("--> All done!")
if __name__ == "__main__":
main()