diff --git a/iptc/easy.py b/iptc/easy.py index 3eb056c..7f17a4c 100644 --- a/iptc/easy.py +++ b/iptc/easy.py @@ -300,7 +300,7 @@ def encode_iptc_rule(rule_d, ipv6=False): # Avoid issues with matches that require basic parameters to be configured first for name in rule_attr: if name in rule_d: - _iptc_setrule(iptc_rule, name, rule_d[name]) + setattr(iptc_rule, name.replace('-', '_'), rule_d[name]) for name, value in rule_d.items(): try: if name in rule_attr: @@ -388,18 +388,6 @@ def _iptc_getchain(table, chain, ipv6=False, raise_exc=True): except Exception as e: if raise_exc: raise -def _iptc_setattr(object, name, value): - # Translate attribute name - name = name.replace('-', '_') - setattr(object, name, value) - -def _iptc_setattr_d(object, value_d): - for name, value in value_d.items(): - _iptc_setattr(object, name, value) - -def _iptc_setrule(iptc_rule, name, value): - _iptc_setattr(iptc_rule, name, value) - def _iptc_setmatch(iptc_rule, name, value): # Iterate list/tuple recursively if isinstance(value, list) or isinstance(value, tuple): @@ -408,21 +396,21 @@ def _iptc_setmatch(iptc_rule, name, value): # Assign dictionary value elif isinstance(value, dict): iptc_match = iptc_rule.create_match(name) - _iptc_setattr_d(iptc_match, value) + [iptc_match.set_parameter(k, v) for k, v in value.items()] # Assign value directly else: iptc_match = iptc_rule.create_match(name) - _iptc_setattr(iptc_match, name, value) + iptc_match.set_parameter(name, value) def _iptc_settarget(iptc_rule, value): - # Target is dictionary - Use only 1 pair key/value + # Target is dictionary - Use only 1st pair key/value if isinstance(value, dict): t_name, t_value = next(iter(value.items())) if t_name == 'goto': iptc_target = iptc_rule.create_target(t_value, goto=True) else: iptc_target = iptc_rule.create_target(t_name) - _iptc_setattr_d(iptc_target, t_value) + [iptc_target.set_parameter(k, v) for k, v in t_value.items()] # Simple target else: iptc_target = iptc_rule.create_target(value) diff --git a/tests/test_matches.py b/tests/test_matches.py index 063cfb4..2093ad2 100755 --- a/tests/test_matches.py +++ b/tests/test_matches.py @@ -471,6 +471,35 @@ def test_hashlimit(self): self.assertEqual(m.hashlimit_upto, "200/sec") self.assertEqual(m.hashlimit_burst, "5") +class TestRecentMatch(unittest.TestCase): + def setUp(self): + self.table = 'filter' + self.chain = 'iptc_test_recent' + iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True, raise_exc=False) + iptc.easy.add_chain(self.table, self.chain, ipv6=False, raise_exc=True) + + def tearDown(self): + iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True, raise_exc=False) + + def test_recent(self): + rule_d = { + 'protocol': 'udp', + 'recent': { + 'mask': '255.255.255.255', + 'update': '', + 'seconds': '60', + 'rsource': '', + 'name': 'UDP-PORTSCAN', + }, + 'target': { + 'REJECT':{ + 'reject-with': 'icmp-port-unreachable' + } + } + } + iptc.easy.add_rule(self.table, self.chain, rule_d) + rule2_d = iptc.easy.get_rule(self.table, self.chain, -1) + self.assertEqual(rule_d, rule2_d) def suite(): suite_match = unittest.TestLoader().loadTestsFromTestCase(TestMatch) @@ -488,6 +517,8 @@ def suite(): TestXTConntrackMatch) suite_hashlimit = unittest.TestLoader().loadTestsFromTestCase( TestHashlimitMatch) + suite_recent = unittest.TestLoader().loadTestsFromTestCase( + TestRecentMatch) extra_suites = [] if is_table6_available(iptc.Table6.FILTER): extra_suites += unittest.TestLoader().loadTestsFromTestCase( @@ -496,7 +527,7 @@ def suite(): return unittest.TestSuite([suite_match, suite_udp, suite_mark, suite_limit, suite_mport, suite_comment, suite_iprange, suite_state, suite_conntrack, - suite_hashlimit] + extra_suites) + suite_hashlimit, suite_recent] + extra_suites) def run_tests():