/
node_state_manager.py
194 lines (154 loc) · 6.56 KB
/
node_state_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""
Copyright 2020 The Magma Authors.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
from typing import Dict, NamedTuple
import netifaces
from google.protobuf.timestamp_pb2 import ( # pylint: disable=no-name-in-module
Timestamp,
)
from lte.protos.session_manager_pb2 import (
UPFAssociationState,
UPFFeatureSet,
UPFNodeState,
UserPlaneIPResourceSchema,
)
from magma.pipelined.set_interface_client import (
send_node_state_association_request,
)
from ryu.lib import hub
EXP_BASE = 3
class NodeStateManager:
"""
This controller manages node state information
and reports to SMF.
"""
TEID_RANGE_INDICATION = 0
TEID_RANGE_VALUE = 0
ASSOC_MAX_RETRIES = 40
class LocalNodeConfig(NamedTuple):
downlink_ip: str
node_identifier: str
def __init__(self, loop, sessiond_setinterface, config):
self.config = self._get_config(config)
self._loop = loop
# setinterface for sending node association message
self._sessiond_setinterface = sessiond_setinterface
# Local counters and state information
self._smf_assoc_version = 1
self._assoc_message_count = 0
self._smf_assoc_state = UPFAssociationState.STARTED
# Fill the node ID for sending association message
self._node_id = self.config.node_identifier
self._assoc_mon_thread = None
self._recovery_timestamp = Timestamp()
logging.info(" NGServicer : Node state manager launched ")
def _get_teid_pool_range(self):
# TEID_RANGE_INDICATION = 0 shows wild card as per 15.8, section 8.2.82
return self.TEID_RANGE_INDICATION, self.TEID_RANGE_VALUE
def _get_config(self, config_dict: Dict) -> NamedTuple:
def get_enodeb_if_ip(ng_params):
if ng_params and ng_params.get('downlink_ip_address', None):
return ng_params['downlink_ip_address']
enode_if_ip = netifaces.ifaddresses(config_dict['enodeb_iface'])
return enode_if_ip[netifaces.AF_INET][0]['addr']
def get_node_identifier(ng_params):
if ng_params:
return ng_params
return get_enodeb_if_ip(ng_params)
return self.LocalNodeConfig(
downlink_ip=get_enodeb_if_ip(config_dict),
node_identifier=get_node_identifier(config_dict['upf_node_identifier']),
)
def _send_messsage_wrapper(self, node_message):
return send_node_state_association_request(
node_message,\
self._sessiond_setinterface,
)
def _send_association_request_message(self, assoc_message):
# Build the message
node_message = UPFNodeState(upf_id=self._node_id)
node_message.associaton_state.CopyFrom(assoc_message)
if self._send_messsage_wrapper(node_message) == True:
self._smf_assoc_state = assoc_message.assoc_state
if self._smf_assoc_state != UPFAssociationState.RELEASE:
self._smf_assoc_version += 1
self._assoc_message_count += 1
return True
return False
def send_association_setup_message(self):
teid_range_indicate, teid_range_value =\
self._get_teid_pool_range()
# Create Node association setup message
assoc_message = \
UPFAssociationState(
state_version=self._smf_assoc_version,
assoc_state=UPFAssociationState.ESTABLISHED,
feature_set=UPFFeatureSet(f_teid=True),
recovery_time_stamp=self._recovery_timestamp.GetCurrentTime(),
ip_resource_schema=[
UserPlaneIPResourceSchema(
ipv4_address=self.config.downlink_ip,
teid_range_indication=teid_range_indicate,
teid_range=teid_range_value,
),
],
)
self._assoc_mon_thread = hub.spawn(self._monitor_association, assoc_message)
def _monitor_association(self, assoc_message: UPFAssociationState, poll_interval: int = 3):
"""
Polling to establish smf association
"""
retry_count = 0
assoc_established = False
while assoc_established == False:
assoc_established =\
self._send_association_request_message(assoc_message)
if assoc_established == False:
retry_count += 1
if retry_count == self.ASSOC_MAX_RETRIES:
logging.info(" Max Attempt to SMF connection failed. Reattempting..")
retry_count = 0
poll_interval = pow(EXP_BASE, retry_count)
hub.sleep(poll_interval)
def send_association_release_message(self):
# If setup is not established no need to release
if self._smf_assoc_state != UPFAssociationState.ESTABLISHED:
return
# Create Node association release message
assoc_message = UPFAssociationState(
state_version=self._smf_assoc_version + 1,
assoc_state=UPFAssociationState.RELEASE,
)
self._send_association_request_message(assoc_message)
self._smf_assoc_state = UPFAssociationState.RELEASE
self._smf_assoc_version = 0
# In case of restarts
def get_node_assoc_message(self):
node_message = UPFNodeState(upf_id=self._node_id)
teid_range_indicate, teid_range_value =\
self._get_teid_pool_range()
# Create Node association setup message
assoc_message = \
UPFAssociationState(
state_version=self._smf_assoc_version,
assoc_state=UPFAssociationState.ESTABLISHED,
feature_set=UPFFeatureSet(f_teid=True),
recovery_time_stamp=self._recovery_timestamp.GetCurrentTime(),
ip_resource_schema=[
UserPlaneIPResourceSchema(
ipv4_address=self.config.downlink_ip,
teid_range_indication=teid_range_indicate,
teid_range=teid_range_value,
),
],
)
node_message.associaton_state.CopyFrom(assoc_message)
return node_message