Skip to content
This repository was archived by the owner on Aug 15, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/vyos/vpp/nat/nat44.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ def delete_outside_interface(self, interface_out):
self.interface_out = interface_out
self.delete_nat44_interface_outside()

def add_nat_address(self, address):
self.translation_pool = address
self.add_nat44_address_range()

def delete_nat_address(self, address):
self.translation_pool = address
self.delete_nat44_address_range()

def add_nat44_static_mapping(
self, local_ip, external_ip, local_port, external_port, protocol
):
Expand Down
66 changes: 48 additions & 18 deletions src/conf_mode/vpp_nat_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,32 @@ def get_config(config=None) -> dict:
)
diff = get_config_diff(conf)

for rule in changed_rules:
base_rule = base + ['rule', rule]
tmp = node_changed(
conf,
base_rule,
key_mangling=('-', '_'),
recursive=True,
expand_nodes=Diff.DELETE | Diff.ADD,
)

if 'inside_interface' in tmp:
new, old = diff.get_value_diff(base_rule + ['inside-interface'])
in_iface_add.append(new) if new else None
in_iface_del.append(old) if old else None
if 'outside_interface' in tmp:
new, old = diff.get_value_diff(base_rule + ['outside-interface'])
out_iface_add.append(new) if new else None
out_iface_del.append(old) if old else None
if changed_rules:
for rule in changed_rules:
base_rule = base + ['rule', rule]
tmp = node_changed(
conf,
base_rule,
key_mangling=('-', '_'),
recursive=True,
expand_nodes=Diff.DELETE | Diff.ADD,
)

if 'inside_interface' in tmp:
new, old = diff.get_value_diff(base_rule + ['inside-interface'])
in_iface_add.append(new) if new else None
in_iface_del.append(old) if old else None
if 'outside_interface' in tmp:
new, old = diff.get_value_diff(base_rule + ['outside-interface'])
out_iface_add.append(new) if new else None
out_iface_del.append(old) if old else None

else:
changed_rules = list(config['rule'].keys())

for rule_config in config.get('rule', {}).values():
in_iface_add.append(rule_config['inside_interface'])
out_iface_add.append(rule_config['outside_interface'])

final_in_iface_add = list(set(in_iface_add) - set(in_iface_del))
final_in_iface_del = list(set(in_iface_del) - set(in_iface_add))
Expand All @@ -120,6 +128,9 @@ def verify(config):
if 'remove' in config:
return None

addresses_with_ports = set()
addresses_without_ports = set()

required_keys = {'inside_interface', 'outside_interface'}
for rule, rule_config in config['rule'].items():
missing_keys = required_keys - rule_config.keys()
Expand All @@ -142,6 +153,21 @@ def verify(config):
'Source and destination ports must either both be specified, or neither must be specified'
)

address = rule_config['external']['address']
port = rule_config['external'].get('port')
error_msg = f'Configuration error in rule {rule}: external address/port is already in use!'
if port:
pair = (address, port)
if pair in addresses_with_ports or address in addresses_without_ports:
raise ConfigError(error_msg)
addresses_with_ports.add(pair)
else:
if address in addresses_without_ports or any(
addr == address for addr, _ in addresses_with_ports
):
raise ConfigError(error_msg)
addresses_without_ports.add(address)


def generate(config):
pass
Expand All @@ -167,6 +193,8 @@ def apply(config):
external_port=int(rule_config.get('external', {}).get('port', 0)),
protocol=protocol_map[rule_config.get('protocol', 'all')],
)
if rule_config.get('external', {}).get('port'):
n.delete_nat_address(rule_config.get('external').get('address'))

if 'remove' in config:
return None
Expand All @@ -187,6 +215,8 @@ def apply(config):
external_port=int(rule_config.get('external', {}).get('port', 0)),
protocol=protocol_map[rule_config.get('protocol', 'all')],
)
if rule_config.get('external').get('port'):
n.add_nat_address(rule_config.get('external').get('address'))


if __name__ == '__main__':
Expand Down