Permalink
Browse files

initial commit

  • Loading branch information...
0 parents commit d2e360712f15d16c6a6317ef70dcdf158fc2ce42 @arnehilmann arnehilmann committed Aug 1, 2012
No changes.
@@ -0,0 +1,58 @@
+from pythonbuilder.core import use_plugin, init, Author
+
+use_plugin("python.core")
+use_plugin("python.unittest")
+use_plugin("python.integrationtest")
+use_plugin("python.coverage")
+use_plugin("python.pychecker")
+use_plugin("python.pymetrics")
+use_plugin("python.pylint")
+use_plugin("python.distutils")
+use_plugin("python.pydev")
+
+use_plugin("copy_resources")
+use_plugin("filter_resources")
+
+default_task = ["analyze", "publish"]
+
+version = "1.0.1"
+summary = "A tool to expand hostnames based on a pattern language and DNS resolution"
+authors = [
+ Author("Arne Hilmann", "arne.hilmann@gmail.com"),
+ Author("Alexander Metzner", "alexander.metzner@gmail.com"),
+ Author("Udo Juettner", "udo.juettner@gmail.com")
+]
+
+url = "http://code.google.com/p/yadt"
+license = "GNU GPL v3"
+
+@init
+def set_properties (project):
+ project.depends_on("dnspython", "1.9.4")
+
+ project.set_property("coverage_break_build", True)
+
+ project.set_property("pychecker_break_build", True)
+
+ project.get_property("distutils_commands").append("bdist_rpm")
+
+ project.get_property("filter_resources_glob").append("**/hostexpand/__init__.py")
+
+ project.set_property("copy_resources_target", "$dir_dist")
+ project.get_property("copy_resources_glob").append("README")
+ project.get_property("copy_resources_glob").append("setup.cfg")
+
+ project.set_property('dir_dist_scripts', 'scripts')
+
+ project.get_property("distutils_commands").append("bdist_egg")
+ project.set_property("distutils_classifiers", [
+ 'Development Status :: 5 - Production/Stable',
+ 'Environment :: Console',
+ 'Intended Audience :: Developers',
+ 'Intended Audience :: System Administrators',
+ 'License :: OSI Approved :: GNU General Public License (GPL)',
+ 'Programming Language :: Python',
+ 'Topic :: System :: Networking',
+ 'Topic :: System :: Software Distribution',
+ 'Topic :: System :: Systems Administration'])
+
@@ -0,0 +1,4 @@
+[bdist_rpm]
+packager = arne.hilmann@gmail.com
+requires = python-dns
+release = 1%{?dist}
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+import sys
+import socket
+import dns.resolver
+
+class HostExpander(object):
+ IP = 'ip'
+ FQDN = 'fqdn'
+ SHORTNAME = 'shortname'
+
+ def __init__(self, nrformat=None, outputformat=None, start_nr=1, substract_prefix=None):
+ self.nrformat = nrformat
+ if not self.nrformat:
+ self.nrformat = '%02i'
+ self.outputformat = outputformat
+ if not self.outputformat:
+ self.outputformat = HostExpander.SHORTNAME
+ self.start_nr = start_nr
+ self.substract_prefix = substract_prefix if substract_prefix else set(['-', '!'])
+
+ def _expand_nr(self, name, start, end):
+ if not start:
+ start = self.start_nr
+ if not end:
+ end = sys.maxint
+ for i in xrange(int(start), int(end)):
+ nr = self.nrformat % i
+ yield name % nr
+
+ def _get_hostname(self, name):
+ if self.outputformat == HostExpander.IP:
+ return socket.gethostbyname(name)
+ elif self.outputformat == HostExpander.SHORTNAME:
+ return socket.getfqdn(name).split('.', 1)[0]
+ return socket.getfqdn(name)
+
+ def _get_hostnames(self, pattern, start=None, end=None):
+ for hostname in self._expand_nr(pattern, start, end):
+ try:
+ dns.resolver.query(hostname)
+ yield self._get_hostname(hostname)
+ except:
+ raise StopIteration()
+
+ def _expand_ranges(self, word):
+ word = word.replace('*', '[:]')
+ ranges_start = word.find('[')
+ if ranges_start < 0:
+ yield self._get_hostname(word)
+ else:
+ ranges_end = word.find(']', ranges_start)
+ prefix = word[:ranges_start]
+ suffix = word[ranges_end + 1:]
+ ranges = word[ranges_start + 1:ranges_end]
+ for r in ranges.split(','):
+ if r.find('..') >= 0:
+ pmin, pmax = r.split('..')
+ if pmax:
+ pmax = int(pmax) + 1
+ for name in self._get_hostnames('%s%%s%s' % (prefix, suffix), pmin, pmax):
+ yield name
+ elif r.find(':') >= 0:
+ pmin, pmax = r.split(':')
+ for name in self._get_hostnames('%s%%s%s' % (prefix, suffix), pmin, pmax):
+ yield name
+ else:
+ r = int(r)
+ for name in self._get_hostnames('%s%%s%s' % (prefix, suffix), r, r + 1):
+ yield name
+
+ def _expand_alternatives(self, word):
+ hosts = set()
+ alt_start = word.find('{')
+ if alt_start < 0:
+ hosts.update(self._expand_ranges(word))
+ else:
+ alt_end = word.find('}', alt_start)
+ prefix = word[:alt_start]
+ suffix = word[alt_end + 1:]
+ alternatives = word[alt_start + 1:alt_end]
+ for alternative in alternatives.split('|'):
+ hosts.update(self._expand_alternatives('%(prefix)s%(alternative)s%(suffix)s' % locals()))
+ return hosts
+
+ def expand(self, items):
+ if items is None:
+ raise ValueError("None value cannot be expanded.")
+ if type(items) == str:
+ items = [items]
+ results = set()
+ for item in items:
+ for word in item.split():
+ if word[0] in self.substract_prefix:
+ mode = '-'
+ word = word[1:]
+ else:
+ mode = '+'
+
+ hosts = set(self._expand_alternatives(word))
+
+ if mode == '+':
+ results.update(hosts)
+ else:
+ results.difference_update(hosts)
+ return sorted(results)
@@ -0,0 +1,2 @@
+VERSION="${version}"
+
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+
+import optparse
+
+from hostexpand import VERSION
+from hostexpand.HostExpander import HostExpander
+
+parser = optparse.OptionParser(version="hostexpand %s" % VERSION)
+parser.add_option('', '--nrformat', dest='nrformat', help='nr format')
+parser.add_option('', '--outputformat', dest='outputformat', help='output format')
+opts, args = parser.parse_args()
+
+expander = HostExpander(**vars(opts))
+
+print '\n'.join(expander.expand(args))
@@ -0,0 +1,106 @@
+#!/usr/bin/env python
+
+import unittest
+import dns.resolver
+import socket
+
+from mockito import unstub, when, verify, any as any_value
+
+from hostexpand.HostExpander import HostExpander
+
+class HostExpanderTestBase (unittest.TestCase):
+ def expand_and_assert (self, expander_expression, *expected_host_names):
+ actual = self.expander.expand(expander_expression)
+ self.assertEquals(list(expected_host_names), actual)
+
+ def tearDown(self):
+ unstub()
+
+
+class ShortnameHostExpanderTest (HostExpanderTestBase):
+ def setUp (self):
+ self.expander = HostExpander()
+
+ def test_should_raise_value_error_when_expanding_none (self):
+ self.assertRaises(ValueError, self.expander.expand, None)
+
+ def test_should_return_empty_set_when_expanding_empty_string (self):
+ self.expand_and_assert("")
+
+ def test_should_expand_constant_expression (self):
+ self.expand_and_assert("spam01", "spam01")
+
+ def test_should_expand_hostname_alternatives (self):
+ self.expand_and_assert("{spam|eggs}01", "eggs01", "spam01")
+
+ def test_should_expand_host_number_range_to_single_host (self):
+ when(dns.resolver).query("spam01").thenReturn(None)
+ self.expand_and_assert("spam[1:2]", "spam01")
+ verify(dns.resolver).query("spam01")
+
+ def test_should_expand_host_number_range_to_single_host_when_second_host_cannot_be_resolved (self):
+ when(dns.resolver).query("spam01").thenReturn(None)
+ when(dns.resolver).query("spam02").thenRaise(dns.resolver.NXDOMAIN("Caboom"))
+
+ self.expand_and_assert("spam[1:3]", "spam01")
+
+ verify(dns.resolver).query("spam01")
+ verify(dns.resolver).query("spam02")
+
+ def test_should_expand_host_number_range_with_open_starting_range (self):
+ when(dns.resolver).query(any_value()).thenReturn(None)
+
+ self.expand_and_assert("spam[..2]", "spam01", "spam02")
+
+ verify(dns.resolver, 0).query("spam00")
+ verify(dns.resolver).query("spam01")
+ verify(dns.resolver).query("spam02")
+
+ def test_should_expand_host_number_range_with_just_one_single_number_in_it (self):
+ when(dns.resolver).query('spam02').thenReturn(None)
+ self.expand_and_assert("spam[2]", "spam02")
+ verify(dns.resolver).query("spam02")
+
+ def test_should_expand_host_number_range_with_open_ending_range (self):
+ when(dns.resolver).query("spam02").thenReturn(None)
+ when(dns.resolver).query("spam03").thenReturn(None)
+ when(dns.resolver).query("spam04").thenRaise(dns.resolver.NXDOMAIN("Caboom"))
+
+ self.expand_and_assert("spam[2..]", "spam02", "spam03")
+
+ verify(dns.resolver).query("spam02")
+ verify(dns.resolver).query("spam03")
+ verify(dns.resolver).query("spam04")
+
+ def test_should_expand_host_number_range_with_multiple_exclusion_ranges (self):
+ when(dns.resolver).query(any_value()).thenReturn(None)
+
+ self.expand_and_assert("spam[1..7] -spam[2..3] !spam[5..6]", "spam01", "spam04", "spam07")
+
+ verify(dns.resolver).query("spam01")
+ verify(dns.resolver, 2).query("spam02")
+ verify(dns.resolver, 2).query("spam03")
+ verify(dns.resolver).query("spam04")
+ verify(dns.resolver, 2).query("spam05")
+ verify(dns.resolver, 2).query("spam06")
+ verify(dns.resolver).query("spam07")
+
+
+class FqdnHostExpanderTest (HostExpanderTestBase):
+ def setUp (self):
+ self.expander = HostExpander(outputformat=HostExpander.FQDN)
+
+ def test_should_expand_short_name_to_single_host_with_fqdn (self):
+ when(socket).getfqdn("spam01").thenReturn("spam01.long.domain.name")
+ self.expand_and_assert("spam01", "spam01.long.domain.name")
+ verify(socket).getfqdn("spam01")
+
+
+class IpHostExpanderTest (HostExpanderTestBase):
+ def setUp (self):
+ self.expander = HostExpander(outputformat=HostExpander.IP)
+
+ def test_should_expand_single_host_name_to_ip_address (self):
+ when(socket).gethostbyname("spam01").thenReturn("192.168.111.112")
+ self.expand_and_assert("spam01", "192.168.111.112")
+ verify(socket).gethostbyname("spam01")

0 comments on commit d2e3607

Please sign in to comment.