Skip to content
This repository has been archived by the owner on Jun 29, 2022. It is now read-only.

Commit

Permalink
Backport validation improvements and resolve invalid range. Closes #404
Browse files Browse the repository at this point in the history
.  (#416)

* backport validation improvements and resolve invalid range. Closes #404.

* Add specific invalid range test.
  • Loading branch information
chambridge committed Oct 26, 2017
1 parent ec97364 commit dad3263
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 32 deletions.
134 changes: 102 additions & 32 deletions rho/utilities.py
Expand Up @@ -181,6 +181,7 @@ def _read_in_file(filename):

# Makes sure the hosts passed in are in a format Ansible
# understands.
# pylint: disable=too-many-locals, too-many-branches, too-many-statements
def read_ranges(ranges_or_path):
"""Process a range list from the user.
Expand All @@ -193,47 +194,116 @@ def read_ranges(ranges_or_path):
:returns: list of IP address ranges in Ansible format
"""
# pylint: disable=anomalous-backslash-in-string
regex_list = ['[0-9]*.[0-9]*.[0-9]*.\[[0-9]*:[0-9]*\]',
'[0-9]*.[0-9]*.\[[0-9]*:[0-9]*\].[0-9]*',
'[0-9]*.[0-9]*.\[[0-9]*:[0-9]*\].\[[0-9]*:[0-9]*\]',
'[a-zA-Z0-9-\.]+',
'[a-zA-Z0-9-\.]*\[[0-9]*:[0-9]*\]*[a-zA-Z0-9-\.]*',
'[a-zA-Z0-9-\.]*\[[a-zA-Z]*:[a-zA-Z]*\][a-zA-Z0-9-\.]*']

invalid_path = check_path_validity([ranges_or_path[0]])
if ranges_or_path and os.path.isfile(ranges_or_path[0]):
range_list = _read_in_file(ranges_or_path[0])
hosts = _read_in_file(ranges_or_path[0])
elif invalid_path == [] and len(ranges_or_path) == 1:
log.error(_("Couldn't interpret %s as host file because "
"no such file exists."), ranges_or_path[0])
sys.exit(1)
else:
range_list = ranges_or_path

normalized = []

for reg_item in range_list:
match_found = False
for reg in regex_list:
match = re.match(reg, reg_item)
if match and match.end() == len(reg_item):
normalized.append(reg_item)
match_found = True
break

if not match_found:
try:
normalized.append(cidr_to_ansible(reg_item))
match_found = True
except NotCIDRException:
pass

if not match_found:
log.error(_("Bad host name/range : '%s'" % (reg_item)))
sys.exit(1)
hosts = ranges_or_path

# Regex for octet, CIDR bit range, and check
# to see if it is like an IP/CIDR
octet_regex = r'(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])'
bit_range = r'(3[0-2]|[1-2][0-9]|[0-9])'
relaxed_ip_pattern = r'[0-9]*\.[0-9]*\.[0-9\[\]:]*\.[0-9\[\]:]*'
relaxed_cidr_pattern = r'[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*\/[0-9]*'
relaxed_invalid_ip_range = r'[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*-' \
r'[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*'

# type IP: 192.168.0.1
# type CIDR: 192.168.0.0/16
# type RANGE 1: 192.168.0.[1:15]
# type RANGE 2: 192.168.[2:18].1
# type RANGE 3: 192.168.[2:18].[4:46]
ip_regex_list = [
r'^{0}\.{0}\.{0}\.{0}$'.format(octet_regex),
r'^{0}\.{0}\.{0}\.{0}\/{1}$'.format(octet_regex, bit_range),
r'^{0}\.{0}\.{0}\.\[{0}:{0}\]$'.format(octet_regex),
r'^{0}\.{0}\.\[{0}:{0}\]\.{0}$'.format(octet_regex),
r'^{0}\.{0}\.\[{0}:{0}\]\.\[{0}:{0}\]$'.format(octet_regex)
]

# type HOST: abcd
# type HOST NUMERIC RANGE: abcd[2:4].foo.com
# type HOST ALPHA RANGE: abcd[a:f].foo.com
host_regex_list = [
r'[a-zA-Z0-9-\.]+',
r'[a-zA-Z0-9-\.]*\[[0-9]+:[0-9]+\]*[a-zA-Z0-9-\.]*',
r'[a-zA-Z0-9-\.]*\[[a-zA-Z]{1}:[a-zA-Z]{1}\][a-zA-Z0-9-\.]*']

normalized_hosts = []
host_errors = []

for host in hosts:
result = None
host_range = host

ip_match = re.match(relaxed_ip_pattern, host_range)
cidr_match = re.match(relaxed_cidr_pattern, host_range)
invalid_ip_range_match = re.match(relaxed_invalid_ip_range,
host_range)
is_likely_ip = ip_match and ip_match.end() == len(host_range)
is_likely_cidr = cidr_match and cidr_match.end() == len(host_range)
is_likely_invalid_ip_range = (invalid_ip_range_match and
invalid_ip_range_match.end() ==
len(host_range))

if is_likely_invalid_ip_range:
err_message = '{} is not a valid IP range format'
result = ValueError(err_message .format(host_range))

elif is_likely_ip or is_likely_cidr:
# This is formatted like an IP or CIDR
# (e.g. #.#.#.# or #.#.#.#/#)
for reg in ip_regex_list:
match = re.match(reg, host_range)
if match and match.end() == len(host_range):
result = host
break

if result is None or is_likely_cidr:
# Attempt to convert CIDR to ansible range
if is_likely_cidr:
try:
normalized_cidr = cidr_to_ansible(host_range)
result = normalized_cidr
except ValueError as validate_error:
result = validate_error
else:
err_message = '{} is not a valid IP or CIDR pattern'
result = ValueError(err_message.format(host_range))
else:
# Possibly a host addr
for reg in host_regex_list:
match = re.match(reg, host_range)
if match and match.end() == len(host_range):
result = host
break
if result is None:
result = ValueError(
'{} is invalid host'.format(host_range))

if isinstance(result, ValueError):
host_errors.append(result)
elif result is not None:
normalized_hosts.append(result)
else:
# This is an unexpected case. Allow/log for analysis
normalized_hosts.append(host)
logging.warning('%s did not match a pattern or produce error',
host_range)
if len(host_errors) is 0:
return normalized_hosts
else:
for host_error in host_errors:
print(host_error)
sys.exit(1)

return normalized
return normalized_hosts


class NotCIDRException(Exception):
Expand Down
5 changes: 5 additions & 0 deletions test/test_utilities.py
Expand Up @@ -97,6 +97,11 @@ def test_invalid_ranges(self):
with self.assertRaises(SystemExit):
utilities.read_ranges(range_list)

def test_issue404(self):
range_list = ['10.1.1.1-10.1.1.254']
with self.assertRaises(SystemExit):
utilities.read_ranges(range_list)

def test_cidr_range(self):
range_list = ['192.168.124.0/25']
self.assertEqual(
Expand Down

0 comments on commit dad3263

Please sign in to comment.