/
handlers.py
212 lines (162 loc) · 6.69 KB
/
handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from os_vif import objects as obj_vif
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes.cni.binding import base as b_base
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.handlers import dispatch as k_dis
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class CNIHandlerBase(k8s_base.ResourceEventHandler):
OBJECT_KIND = k_const.K8S_OBJ_POD
def __init__(self, cni, on_done):
self._cni = cni
self._callback = on_done
self._vifs = {}
def on_present(self, pod):
vifs = self._get_vifs(pod)
for ifname, vif in vifs.items():
self.on_vif(pod, vif, ifname)
if self.should_callback(pod, vifs):
self.callback()
@abc.abstractmethod
def should_callback(self, pod, vifs):
"""Called after all vifs have been processed
Should determine if the CNI is ready to call the callback
:param pod: dict containing Kubernetes Pod object
:param vifs: dict containing os_vif VIF objects and ifnames
:returns True/False
"""
raise NotImplementedError()
@abc.abstractmethod
def callback(self):
"""Called if should_callback returns True"""
raise NotImplementedError()
@abc.abstractmethod
def on_vif(self, pod, vif, ifname):
raise NotImplementedError()
def _get_vifs(self, pod):
# TODO(ivc): same as VIFHandler._get_vif
try:
annotations = pod['metadata']['annotations']
state_annotation = annotations[k_const.K8S_ANNOTATION_VIF]
except KeyError:
return {}
state_annotation = jsonutils.loads(state_annotation)
state = utils.extract_pod_annotation(state_annotation)
vifs_dict = state.vifs
LOG.debug("Got VIFs from annotation: %r", vifs_dict)
return vifs_dict
def _get_inst(self, pod):
return obj_vif.instance_info.InstanceInfo(
uuid=pod['metadata']['uid'], name=pod['metadata']['name'])
class AddHandler(CNIHandlerBase):
def __init__(self, cni, on_done):
LOG.debug("AddHandler called with CNI env: %r", cni)
super(AddHandler, self).__init__(cni, on_done)
def on_vif(self, pod, vif, ifname):
"""Called once for every vif of a Pod on every event.
If it is the first time we see this vif, plug it in.
:param pod: dict containing Kubernetes Pod object
:param vif: os_vif VIF object
:param ifname: string, name of the interfaces inside container
"""
if ifname not in self._vifs:
self._vifs[ifname] = vif
_vif = vif.obj_clone()
_vif.active = True
# set eth0's gateway as default
is_default_gateway = (ifname == self._cni.CNI_IFNAME)
b_base.connect(_vif, self._get_inst(pod),
ifname, self._cni.CNI_NETNS,
is_default_gateway=is_default_gateway,
container_id=self._cni.CNI_CONTAINERID)
def should_callback(self, pod, vifs):
"""Called after all vifs have been processed
Determines if CNI is ready to call the callback and stop watching for
more events. For AddHandler the callback should be called if there
is at least one VIF in the annotation and all the
VIFs recieved are marked active
:param pod: dict containing Kubernetes Pod object
:param vifs: dict containing os_vif VIF objects and ifnames
:returns True/False
"""
all_vifs_active = vifs and all(vif.active for vif in vifs.values())
if all_vifs_active:
if self._cni.CNI_IFNAME in self._vifs:
self.callback_vif = self._vifs[self._cni.CNI_IFNAME]
else:
self.callback_vif = self._vifs.values()[0]
LOG.debug("All VIFs are active, exiting. Will return %s",
self.callback_vif)
return True
else:
LOG.debug("Waiting for all vifs to become active")
return False
def callback(self):
self._callback(self.callback_vif)
class DelHandler(CNIHandlerBase):
def on_vif(self, pod, vif, ifname):
b_base.disconnect(vif, self._get_inst(pod),
self._cni.CNI_IFNAME, self._cni.CNI_NETNS,
container_id=self._cni.CNI_CONTAINERID)
def should_callback(self, pod, vifs):
"""Called after all vifs have been processed
Calls callback if there was at least one vif in the Pod
:param pod: dict containing Kubernetes Pod object
:param vifs: dict containing os_vif VIF objects and ifnames
:returns True/False
"""
if vifs:
return True
return False
def callback(self):
self._callback(None)
class CallbackHandler(CNIHandlerBase):
def __init__(self, on_vif, on_del=None):
super(CallbackHandler, self).__init__(None, on_vif)
self._del_callback = on_del
self._pod = None
self._callback_vifs = None
def on_vif(self, pod, vif, ifname):
pass
def should_callback(self, pod, vifs):
"""Called after all vifs have been processed
Calls callback if there was at least one vif in the Pod
:param pod: dict containing Kubernetes Pod object
:param vifs: dict containing os_vif VIF objects and ifnames
:returns True/False
"""
self._pod = pod
self._callback_vifs = vifs
if vifs:
return True
return False
def callback(self):
self._callback(self._pod, self._callback_vifs)
def on_deleted(self, pod):
LOG.debug("Got pod %s deletion event.", pod['metadata']['name'])
if self._del_callback:
self._del_callback(pod)
class CNIPipeline(k_dis.EventPipeline):
def _wrap_dispatcher(self, dispatcher):
return dispatcher
def _wrap_consumer(self, consumer):
return consumer