-
Notifications
You must be signed in to change notification settings - Fork 72
/
renderers.py
437 lines (415 loc) · 17.3 KB
/
renderers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
import json
from copy import deepcopy
from ipaddress import ip_interface, ip_network
from ...utils import sorted_dict
from ..base import BaseRenderer
from .timezones import timezones
class NetworkRenderer(BaseRenderer):
"""
Renders content importable with:
uci import network
"""
def _get_interfaces(self):
"""
converts interfaces object to UCI interface directives
"""
interfaces = self.config.get('interfaces', [])
# this line ensures interfaces are not entirely
# ignored if they do not contain any address
default_address = [{'proto': 'none'}]
# results container
uci_interfaces = []
for interface in interfaces:
counter = 1
is_bridge = False
# determine uci logical interface name
network = interface.get('network')
uci_name = interface['name'] if not network else network
# convert dot and dashes to underscore
uci_name = uci_name.replace('.', '_').replace('-', '_')
# determine if must be type bridge
if interface.get('type') == 'bridge':
is_bridge = True
bridge_members = ' '.join(interface['bridge_members'])
# ensure address list is not never empty, even when 'addresses' is []
address_list = interface.get('addresses')
if not address_list:
address_list = default_address
# address list defaults to empty list
for address in address_list:
# prepare new UCI interface directive
uci_interface = deepcopy(interface)
if network:
del uci_interface['network']
if 'mac' in uci_interface:
if interface.get('type') != 'wireless':
uci_interface['macaddr'] = interface['mac']
del uci_interface['mac']
if 'autostart' in uci_interface:
uci_interface['auto'] = interface['autostart']
del uci_interface['autostart']
if uci_interface.get('disabled'):
uci_interface['enabled'] = not interface['disabled']
del uci_interface['disabled']
if 'addresses' in uci_interface:
del uci_interface['addresses']
if 'type' in uci_interface:
del uci_interface['type']
if 'wireless' in uci_interface:
del uci_interface['wireless']
# default values
address_key = None
address_value = None
proto = self.__get_proto(uci_interface, address)
# add suffix if there is more than one config block
if counter > 1:
name = '{name}_{counter}'.format(name=uci_name, counter=counter)
else:
name = uci_name
if address.get('family') == 'ipv4':
address_key = 'ipaddr'
elif address.get('family') == 'ipv6':
address_key = 'ip6addr'
proto = proto.replace('dhcp', 'dhcpv6')
if address.get('address') and address.get('mask'):
address_value = '{address}/{mask}'.format(**address)
# update interface dict
uci_interface.update({
'name': name,
'ifname': interface['name'],
'proto': proto,
'dns': self.__get_dns_servers(uci_interface),
'dns_search': self.__get_dns_search(uci_interface)
})
# bridging
if is_bridge:
uci_interface['type'] = 'bridge'
# put bridge members in ifname attribute
if bridge_members:
uci_interface['ifname'] = bridge_members
# if no members, this is an empty bridge
else:
uci_interface['bridge_empty'] = True
del uci_interface['ifname']
# ensure type "bridge" is only given to one logical interface
is_bridge = False
# bridge has already been defined
# but we need to add more references to it
elif interface.get('type') == 'bridge':
# openwrt adds "br-"" prefix to bridge interfaces
# we need to take this into account when referring
# to these physical names
uci_interface['ifname'] = 'br-{0}'.format(interface['name'])
# delete bridge_members attribtue if bridge is empty
if uci_interface.get('bridge_members') is not None:
del uci_interface['bridge_members']
# add address if any (with correct option name)
if address_key and address_value:
uci_interface[address_key] = address_value
# merge additional address fields (discard default ones first)
address_copy = address.copy()
for key in ['address', 'mask', 'proto', 'family']:
if key in address_copy:
del address_copy[key]
uci_interface.update(address_copy)
# append to interface list
uci_interfaces.append(sorted_dict(uci_interface))
counter += 1
return uci_interfaces
def __get_proto(self, interface, address):
"""
determines interface "proto" option
"""
if 'proto' not in interface:
# proto defaults to static
return address.get('proto', 'static')
else:
# allow override
return interface['proto']
def _get_routes(self):
routes = self.config.get('routes', [])
# results container
uci_routes = []
counter = 1
# build uci_routes
for route in routes:
# prepare UCI route directive
uci_route = route.copy()
del uci_route['device']
del uci_route['next']
del uci_route['destination']
del uci_route['cost']
network = ip_interface(route['destination'])
version = 'route' if network.version == 4 else 'route6'
target = network.ip if network.version == 4 else network.network
uci_route.update({
'version': version,
'name': 'route{0}'.format(counter),
'interface': route['device'],
'target': str(target),
'gateway': route['next'],
'metric': route['cost'],
'source': route.get('source')
})
if network.version == 4:
uci_route['netmask'] = str(network.netmask)
uci_routes.append(sorted_dict(uci_route))
counter += 1
return uci_routes
def _get_ip_rules(self):
rules = self.config.get('ip_rules', [])
uci_rules = []
for rule in rules:
uci_rule = rule.copy()
src_net = None
dest_net = None
family = 4
if rule.get('src'):
src_net = ip_network(rule['src'])
if rule.get('dest'):
dest_net = ip_network(rule['dest'])
if dest_net or src_net:
family = dest_net.version if dest_net else src_net.version
uci_rule['block_name'] = 'rule{0}'.format(family).replace('4', '')
uci_rules.append(sorted_dict(uci_rule))
return uci_rules
def __get_dns_servers(self, uci):
# allow override
if 'dns' in uci:
return uci['dns']
# general setting
dns = self.config.get('dns_servers', None)
if dns:
return ' '.join(dns)
def __get_dns_search(self, uci):
# allow override
if 'dns_search' in uci:
return uci['dns_search']
# general setting
dns_search = self.config.get('dns_search', None)
if dns_search:
return ' '.join(dns_search)
def _get_switches(self):
uci_switches = []
for switch in self.config.get('switch', []):
uci_switch = sorted_dict(deepcopy(switch))
uci_switch['vlan'] = [sorted_dict(vlan) for vlan in uci_switch['vlan']]
uci_switches.append(uci_switch)
return uci_switches
def _get_globals(self):
ula_prefix = self.config.get('general', {}).get('ula_prefix', None)
if ula_prefix:
return {'ula_prefix': ula_prefix}
return {}
class SystemRenderer(BaseRenderer):
"""
Renders content importable with:
uci import system
"""
def _get_system(self):
general = self.config.get('general', {}).copy()
# ula_prefix is not related to system
if 'ula_prefix' in general:
del general['ula_prefix']
if general:
timezone_human = general.get('timezone', 'UTC')
timezone_value = timezones[timezone_human]
general.update({
'hostname': general.get('hostname', 'OpenWRT'),
'timezone': timezone_value,
})
return sorted_dict(general)
def _get_ntp(self):
return sorted_dict(self.config.get('ntp', {}))
def _get_leds(self):
uci_leds = []
for led in self.config.get('led', []):
uci_leds.append(sorted_dict(led))
return uci_leds
class WirelessRenderer(BaseRenderer):
"""
Renders content importable with:
uci import wireless
"""
def _get_radios(self):
radios = self.config.get('radios', [])
uci_radios = []
for radio in radios:
uci_radio = radio.copy()
# rename tx_power to txpower
if 'tx_power' in radio:
uci_radio['txpower'] = radio['tx_power']
del uci_radio['tx_power']
# rename driver to type
uci_radio['type'] = radio['driver']
del uci_radio['driver']
# determine hwmode option
uci_radio['hwmode'] = self.__get_hwmode(radio)
del uci_radio['protocol']
# check if using channel 0, that means "auto"
if uci_radio['channel'] is 0:
uci_radio['channel'] = 'auto'
# determine channel width
if radio['driver'] == 'mac80211':
uci_radio['htmode'] = self.__get_htmode(radio)
del uci_radio['channel_width']
# ensure country is uppercase
if uci_radio.get('country'):
uci_radio['country'] = uci_radio['country'].upper()
# append sorted dict
uci_radios.append(sorted_dict(uci_radio))
return uci_radios
def __get_hwmode(self, radio):
"""
possible return values are: 11a, 11b, 11g
"""
protocol = radio['protocol']
if protocol in ['802.11a', '802.11b', '802.11g']:
# return 11a, 11b or 11g
return protocol[4:]
# determine hwmode depending on channel used
if radio['channel'] is 0:
# when using automatic channel selection, we need an
# additional parameter to determine the frequency band
return radio.get('hwmode')
elif radio['channel'] <= 13:
return '11g'
else:
return '11a'
def __get_htmode(self, radio):
"""
only for mac80211 driver
"""
if radio['protocol'] == '802.11n':
return 'HT{0}'.format(radio['channel_width'])
elif radio['protocol'] == '802.11ac':
return 'VHT{0}'.format(radio['channel_width'])
# disables n
return 'NONE'
def _get_wifi_interfaces(self):
# select interfaces that have type == "wireless"
wifi_interfaces = [i for i in self.config.get('interfaces', [])
if 'wireless' in i]
# results container
uci_wifi_ifaces = []
for wifi_interface in wifi_interfaces:
wireless = wifi_interface['wireless']
# prepare UCI wifi-iface directive
uci_wifi = wireless.copy()
# inherit "disabled" attribute if present
uci_wifi['disabled'] = wifi_interface.get('disabled')
# add ifname
uci_wifi['ifname'] = wifi_interface['name']
# rename radio to device
uci_wifi['device'] = wireless['radio']
del uci_wifi['radio']
# mac address override
if 'mac' in wifi_interface:
uci_wifi['macaddr'] = wifi_interface['mac']
# map netjson wifi modes to uci wifi modes
modes = {
'access_point': 'ap',
'station': 'sta',
'adhoc': 'adhoc',
'monitor': 'monitor',
'802.11s': 'mesh'
}
uci_wifi['mode'] = modes[wireless['mode']]
# map advanced 802.11 netjson attributes to UCI
wifi_options = {
'ack_distance': 'distance',
'rts_threshold': 'rts',
'frag_threshold': 'frag'
}
for netjson_key, uci_key in wifi_options.items():
value = wireless.get(netjson_key)
if value is not None:
# ignore if 0 (autogenerated UIs might use 0 as default value)
if value > 0:
uci_wifi[uci_key] = value
del uci_wifi[netjson_key]
# determine encryption for wifi
if uci_wifi.get('encryption'):
del uci_wifi['encryption']
uci_encryption = self.__get_encryption(wireless)
uci_wifi.update(uci_encryption)
# attached networks (openwrt specific)
# by default the wifi interface is attached
# to its defining interface
# but this behaviour can be overridden
if not uci_wifi.get('network'):
# get network, default to ifname
network = wifi_interface.get('network', wifi_interface['name'])
uci_wifi['network'] = [network]
uci_wifi['network'] = ' '.join(uci_wifi['network'])\
.replace('.', '_')\
.replace('-', '_')
uci_wifi_ifaces.append(sorted_dict(uci_wifi))
return uci_wifi_ifaces
def __get_encryption(self, wireless):
encryption = wireless.get('encryption', {})
disabled = encryption.get('disabled', False)
uci = {}
encryption_map = {
'wep_open': 'wep-open',
'wep_shared': 'wep-shared',
'wpa_personal': 'psk',
'wpa2_personal': 'psk2',
'wpa_personal_mixed': 'psk-mixed',
'wpa_enterprise': 'wpa',
'wpa2_enterprise': 'wpa2',
'wpa_enterprise_mixed': 'wpa-mixed',
'wps': 'psk'
}
# if encryption disabled return empty dict
if not encryption or disabled:
return uci
# otherwise configure encryption
protocol = encryption['protocol']
# default to protocol raw value in order
# to allow customization by child classes
uci['encryption'] = encryption_map.get(protocol, protocol)
if protocol.startswith('wep'):
uci['key'] = '1'
uci['key1'] = encryption['key']
# tell hostapd/wpa_supplicant key is not hex format
if protocol == 'wep_open':
uci['key1'] = 's:{0}'.format(uci['key1'])
else:
uci['key'] = encryption['key']
# add ciphers
if encryption.get('ciphers'):
uci['encryption'] += '+{0}'.format('+'.join(encryption['ciphers']))
return uci
class DefaultRenderer(BaseRenderer):
"""
Default OpenWrt Renderer
Allows great flexibility in defining UCI configuration in JSON format
"""
def _get_custom_packages(self):
# determine config keys to ignore
ignore_list = list(self.backend.schema['properties'].keys())
ignore_list += self.backend.get_packages()
# determine custom packages
custom_packages = {}
for key, value in self.config.items():
if key not in ignore_list:
block_list = []
# sort each config block
if isinstance(value, list):
for block in value[:]:
# config block must be a dict
# with a key named "config_name"
# otherwise it's skipped with a warning
if not isinstance(block, dict) or 'config_name' not in block:
json_block = json.dumps(block, indent=4)
print('Unrecognized config block was skipped:\n\n'
'{0}\n\n'.format(json_block))
continue
block_list.append(sorted_dict(block))
# if not a list just skip
else: # pragma: nocover
continue
custom_packages[key] = block_list
# sort custom packages
return sorted_dict(custom_packages)