/
container.py
230 lines (187 loc) · 8.61 KB
/
container.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
"""Code related to managing kernels running in containers."""
import os
import signal
from abc import abstractmethod
from typing import Any, Dict, List, Optional, Set
import urllib3 # docker ends up using this and it causes lots of noise, so turn off warnings
from jupyter_client import localinterfaces
from overrides import overrides
from traitlets import Unicode, default
from .remote_provisioner import RemoteProvisionerBase
urllib3.disable_warnings()
local_ip = localinterfaces.public_ips()[0]
default_kernel_uid = "1000" # jovyan user is the default
default_kernel_gid = "100" # users group is the default
# These could be enforced via a PodSecurityPolicy, but those affect
# all pods so the cluster admin would need to configure those for
# all applications.
prohibited_uids = os.getenv("GP_PROHIBITED_UIDS", "0").split(",")
prohibited_gids = os.getenv("GP_PROHIBITED_GIDS", "0").split(",")
mirror_working_dirs = bool(os.getenv("GP_MIRROR_WORKING_DIRS", "false").lower() == "true")
class ContainerProvisionerBase(RemoteProvisionerBase):
"""Kernel provisioner for container-based kernels."""
image_name_env = "GP_IMAGE_NAME"
image_name = Unicode(
None,
config=True,
allow_none=True,
help="""The image name to use when launching container-based kernels.
(GP_IMAGE_NAME env var)""",
)
@default("image_name")
def _image_name_default(self):
return os.getenv(self.image_name_env)
executor_image_name_env = "GP_EXECUTOR_IMAGE_NAME"
executor_image_name = Unicode(
None,
config=True,
allow_none=True,
help="""The image name to use as the Spark executor image when launching
container-based kernels within Spark environments. (GP_EXECUTOR_IMAGE_NAME env var)""",
)
@default("executor_image_name")
def _executor_image_name_default(self):
return os.getenv(self.executor_image_name_env) or self.image_name
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.container_name = None
self.assigned_node_ip = None
@property
@overrides
def has_process(self) -> bool:
return self.container_name is not None
@overrides
async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
# Unset assigned_host, ip, and node_ip in pre-launch, otherwise, these screw up restarts
self.assigned_host = ""
self.assigned_ip = None
self.assigned_node_ip = None
kwargs = await super().pre_launch(**kwargs)
kwargs["env"]["KERNEL_IMAGE"] = self.image_name
kwargs["env"]["KERNEL_EXECUTOR_IMAGE"] = self.executor_image_name
if (
not mirror_working_dirs
): # If mirroring is not enabled, remove working directory if present
if "KERNEL_WORKING_DIR" in kwargs["env"]:
del kwargs["env"]["KERNEL_WORKING_DIR"]
self._enforce_prohibited_ids(**kwargs)
return kwargs
@overrides
def log_kernel_launch(self, cmd: List[str]) -> None:
self.log.info(
f"{self.__class__.__name__}: kernel launched. Kernel image: {self.image_name}, "
f"KernelID: {self.kernel_id}, cmd: '{cmd}'"
)
def _enforce_prohibited_ids(self, **kwargs):
"""Determine UID and GID with which to launch container and ensure they are not prohibited."""
kernel_uid = kwargs["env"].get("KERNEL_UID", default_kernel_uid)
kernel_gid = kwargs["env"].get("KERNEL_GID", default_kernel_gid)
if kernel_uid in prohibited_uids:
error_message = (
f"Kernel's UID value of '{kernel_uid}' has been denied via GP_PROHIBITED_UIDS!"
)
self.log_and_raise(PermissionError(error_message))
elif kernel_gid in prohibited_gids:
error_message = (
f"Kernel's GID value of '{kernel_gid}' has been denied via GP_PROHIBITED_GIDS!"
)
self.log_and_raise(PermissionError(error_message))
# Ensure the kernel's env has what it needs in case they came from defaults
kwargs["env"]["KERNEL_UID"] = kernel_uid
kwargs["env"]["KERNEL_GID"] = kernel_gid
@overrides
async def poll(self) -> Optional[int]:
"""Determines if container is still active.
Submitting a new kernel to the container manager will take a while to be Running.
Thus, kernel ID will probably not be available immediately for poll.
So will regard the container as active when no status is available or one of the initial
phases.
Returns None if the container cannot be found or its in an initial state. Otherwise,
return an exit code of 0.
"""
result = 0
container_status = await self.get_container_status(None)
# Do not check whether container_status is None
# EG couldn't restart kernels although connections exists.
# See https://github.com/jupyter/enterprise_gateway/issues/827
if container_status in self.get_initial_states():
result = None
return result
@overrides
async def send_signal(self, signum: int) -> None:
"""Send signal `signum` to container."""
if signum == 0:
return await self.poll()
elif signum == signal.SIGKILL:
return await self.kill()
else:
# This is very likely an interrupt signal, so defer to the super class
# which should use the communication port.
return await super().send_signal(signum)
@overrides
async def kill(self, restart: bool = False) -> None:
"""Kills a containerized kernel."""
result = None
if self.container_name: # We only have something to terminate if we have a name
result = await self.terminate_container_resources(restart=restart)
return result
@overrides
async def terminate(self, restart: bool = False) -> None:
"""Terminates a containerized kernel.
This method defers to kill() since there's no distinction between the
two in these environments.
"""
return await self.kill(restart=restart)
@overrides
async def shutdown_listener(self, restart: bool) -> None:
await super().shutdown_listener(restart)
if self.container_name: # We only have something to terminate if we have a name
await self.terminate_container_resources(restart)
@overrides
async def confirm_remote_startup(self):
"""Confirms the container has started and returned necessary connection information."""
self.start_time = RemoteProvisionerBase.get_current_time()
i = 0
ready_to_connect = False # we're ready to connect when we have a connection file to use
while not ready_to_connect:
i += 1
await self.handle_launch_timeout()
container_status = await self.get_container_status(str(i))
if container_status:
if self.assigned_host != "":
ready_to_connect = await self.receive_connection_info()
self.pid = (
0 # We won't send the process signals from container-based provisioners
)
self.pgid = 0
else:
self.detect_launch_failure()
@overrides
async def get_provisioner_info(self) -> Dict[str, Any]:
"""Captures the base information necessary for kernel persistence relative to containers."""
provisioner_info = await super().get_provisioner_info()
provisioner_info.update(
{
"assigned_node_ip": self.assigned_node_ip,
}
)
return provisioner_info
@overrides
async def load_provisioner_info(self, provisioner_info: dict) -> None:
"""Loads the base information necessary for kernel persistence relative to containers."""
await super().load_provisioner_info(provisioner_info)
self.assigned_node_ip = provisioner_info.get("assigned_node_ip")
@abstractmethod
def get_initial_states(self) -> Set[str]:
"""Return list of states indicating container is starting (includes running)."""
raise NotImplementedError
@abstractmethod
async def get_container_status(self, iteration: Optional[str]) -> str:
"""Return current container state."""
raise NotImplementedError
@abstractmethod
async def terminate_container_resources(self, restart: bool = False) -> Optional[bool]:
"""Terminate any artifacts created on behalf of the container's lifetime."""
raise NotImplementedError