Permalink
Browse files

Merge pull request #54 from kloesing/pep8

Make code look more like PEP-8.
  • Loading branch information...
2 parents 6790fce + 8a36c9b commit 72571972d2df38b9645ddc78e4785fce48e5b72f @ioerror committed Dec 6, 2012
Showing with 68 additions and 60 deletions.
  1. +60 −56 blockfinder
  2. +8 −4 blockfindertest.py
View
@@ -26,6 +26,7 @@ try:
except ImportError:
antigravity = None
+
class DatabaseCache:
def __init__(self, cache_dir, verbose=False):
self.cache_dir = cache_dir
@@ -54,9 +55,9 @@ class DatabaseCache:
if not cache_version:
cache_version = "0.0.1"
if cache_version != self.db_version:
- print "The existing database cache uses version %s, " \
- "not the expected %s." % (cache_version, \
- self.db_version)
+ print("The existing database cache uses version %s, "
+ "not the expected %s." % (cache_version,
+ self.db_version))
return False
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
@@ -128,7 +129,7 @@ class DatabaseCache:
self.cursor.execute(sql, (source_type, ))
self.conn.commit()
- def insert_assignment(self, start_num, end_num, num_type, \
+ def insert_assignment(self, start_num, end_num, num_type,
country_code, source_type, source_name):
""" Insert an assignment into the database cache, without
commiting after the insertion. """
@@ -175,7 +176,7 @@ class DatabaseCache:
lookup_hex = '%033x' % long(lookup_num)
else:
lookup_hex = '%09x' % long(lookup_num)
- self.cursor.execute(sql, (num_type, source_type, lookup_hex, \
+ self.cursor.execute(sql, (num_type, source_type, lookup_hex,
lookup_hex))
row = self.cursor.fetchone()
if row:
@@ -209,8 +210,9 @@ class DatabaseCache:
long(row[5], 16) - 1, str(row[6]), str(row[7])))
return result
+
class DownloaderParser:
- def __init__(self, cache_dir, database_cache, user_agent, \
+ def __init__(self, cache_dir, database_cache, user_agent,
verbose=False):
self.cache_dir = cache_dir
self.database_cache = database_cache
@@ -277,8 +279,8 @@ class DownloaderParser:
expected_bytes = -1
if length_header:
expected_bytes = int(length_header)
- print "Fetching %d kilobytes" % \
- round(float(expected_bytes / 1024), 2)
+ print("Fetching %d kilobytes" %
+ round(float(expected_bytes / 1024), 2))
download_started = time.time()
output_file = open(os.path.join(self.cache_dir, filename), "wb")
received_bytes, seconds_elapsed = 0, 0
@@ -290,8 +292,8 @@ class DownloaderParser:
chunk = fetcher.read(1024)
if len(chunk) == 0:
if expected_bytes >= 0 and received_bytes != expected_bytes:
- print "Expected %s bytes, only received %s" % \
- (expected_bytes, received_bytes)
+ print("Expected %s bytes, only received %s" %
+ (expected_bytes, received_bytes))
print ""
break
received_bytes += len(chunk)
@@ -351,9 +353,9 @@ class DownloaderParser:
rir_file.close()
computed_checksum = str(hashlib.md5(rir_data).hexdigest())
if expected_checksum != computed_checksum:
- print "The computed md5 checksum of %s, %s, does *not* " \
- "match the provided checksum %s!" % \
- (rir_path, computed_checksum, expected_checksum)
+ print("The computed md5 checksum of %s, %s, does *not* "
+ "match the provided checksum %s!" %
+ (rir_path, computed_checksum, expected_checksum))
def parse_maxmind_files(self, maxmind_urls=None):
""" Parse locally cached MaxMind files and insert assignments to the
@@ -497,10 +499,11 @@ class DownloaderParser:
print repr(e), line
elif entry and "country:" in line:
country_code = line.replace("country:", "").strip()
- self.database_cache.insert_assignment(start_num, \
+ self.database_cache.insert_assignment(start_num,
end_num, num_type, country_code, 'lir', 'ripencc')
self.database_cache.commit_changes()
+
class Lookup:
def __init__(self, cache_dir, database_cache, verbose=False):
self.cache_dir = cache_dir
@@ -548,7 +551,7 @@ class Lookup:
def lookup_ipv6_address(self, lookup_ipaddr):
print "Reverse lookup for: " + str(lookup_ipaddr)
for source_type in ['maxmind', 'rir', 'lir']:
- cc = self.database_cache.fetch_country_code('ipv6', \
+ cc = self.database_cache.fetch_country_code('ipv6',
source_type, int(lookup_ipaddr))
if cc:
print source_type.upper(), "country code:", cc
@@ -565,7 +568,7 @@ class Lookup:
maxmind_cn = self.get_name_from_country_code(maxmind_cc)
if maxmind_cn:
print 'MaxMind country name:', maxmind_cn
- rir_cc = self.database_cache.fetch_country_code('ipv4', 'rir', \
+ rir_cc = self.database_cache.fetch_country_code('ipv4', 'rir',
int(lookup_ipaddr))
if rir_cc:
print 'RIR country code:', rir_cc
@@ -574,17 +577,17 @@ class Lookup:
print 'RIR country name:', rir_cn
else:
print 'Not found in RIR db'
- lir_cc = self.database_cache.fetch_country_code('ipv4', 'lir', \
+ lir_cc = self.database_cache.fetch_country_code('ipv4', 'lir',
int(lookup_ipaddr))
if lir_cc:
print 'LIR country code:', lir_cc
lir_cn = self.get_name_from_country_code(lir_cc)
if lir_cn:
print 'LIR country name:', lir_cn
if maxmind_cc and maxmind_cc != rir_cc:
- print "It appears that the RIR data conflicts with MaxMind's " \
- "data. MaxMind's data is likely closer to being " \
- "correct due to sub-delegation issues with LIR databases."
+ print("It appears that the RIR data conflicts with MaxMind's "
+ "data. MaxMind's data is likely closer to being "
+ "correct due to sub-delegation issues with LIR databases.")
def lookup_ip_address(self, lookup_str):
""" Return the country code and name for a given ip address. """
@@ -595,8 +598,8 @@ class Lookup:
elif isinstance(lookup_ipaddr, ipaddr.IPv6Address):
self.lookup_ipv6_address(lookup_ipaddr)
else:
- print "Did not recognize '%s' as either IPv4 or IPv6 " \
- "address." % lookup_str
+ print("Did not recognize '%s' as either IPv4 or IPv6 "
+ "address." % lookup_str)
except ValueError, e:
print "'%s' is not a valid IP address." % lookup_str
@@ -617,8 +620,8 @@ class Lookup:
if request == "ipv4" or request == "ipv6":
start_ipaddr = ipaddr.IPAddress(start_num)
end_ipaddr = ipaddr.IPAddress(end_num)
- result += [str(x) for x in \
- ipaddr.summarize_address_range( \
+ result += [str(x) for x in
+ ipaddr.summarize_address_range(
start_ipaddr, end_ipaddr)]
else:
result.append(str(start_num))
@@ -629,7 +632,7 @@ class Lookup:
look up to which country code(s) the same number ranges are
assigned in other source types. Print out the result showing
similarities and differences. """
- print ("\nLegend:\n"
+ print("\nLegend:\n"
" '<' = found assignment range with country code '%s'\n"
" '>' = overlapping assignment range with same country code\n"
" '*' = overlapping assignment range, first conflict\n"
@@ -681,22 +684,22 @@ class Lookup:
def main():
""" Where the magic starts. """
- usage = "Usage: %prog [options]\n\n" \
- "Example: %prog -v -t mm"
+ usage = ("Usage: %prog [options]\n\n"
+ "Example: %prog -v -t mm")
parser = optparse.OptionParser(usage)
- parser.add_option("-v", "--verbose", action="store_true", \
+ parser.add_option("-v", "--verbose", action="store_true",
dest="verbose", help = "be verbose", default=False)
- parser.add_option("-c", "--cache-dir", action="store", dest="dir", \
- help="set cache directory [default: %default]", \
+ parser.add_option("-c", "--cache-dir", action="store", dest="dir",
+ help="set cache directory [default: %default]",
default=str(os.path.expanduser('~')) + "/.blockfinder/")
- parser.add_option("--user-agent", action="store", dest="ua", \
+ parser.add_option("--user-agent", action="store", dest="ua",
help=('provide a User-Agent which will be used when '
- 'fetching delegation files [default: "%default"]'), \
+ 'fetching delegation files [default: "%default"]'),
default="Mozilla/5.0")
- parser.add_option("-x", "--hack-the-internet", action="store_true", \
+ parser.add_option("-x", "--hack-the-internet", action="store_true",
dest="hack_the_internet", help=optparse.SUPPRESS_HELP)
group = optparse.OptionGroup(parser, "Cache modes",
- "Pick at most one of these modes to initialize or update " \
+ "Pick at most one of these modes to initialize or update "
"the local cache. May not be combined with lookup modes.")
group.add_option("-m", "--init-maxmind", action="store_true",
dest="init_maxmind",
@@ -709,18 +712,18 @@ def main():
help=("import the specified MaxMind GeoIP database file into "
"the database cache using its file name as source "
"name"))
- group.add_option("-i", "--init-rir", \
- action="store_true", dest="init_del", \
+ group.add_option("-i", "--init-rir",
+ action="store_true", dest="init_del",
help="initialize or update delegation information")
- group.add_option("-d", "--reload-rir", action="store_true", \
- dest="reload_del", \
+ group.add_option("-d", "--reload-rir", action="store_true",
+ dest="reload_del",
help="use existing delegation files to update the database")
- group.add_option("-l", "--init-lir", action="store_true", \
+ group.add_option("-l", "--init-lir", action="store_true",
dest="init_lir",
help=("initialize or update lir information; can take up to "
"5 minutes"))
group.add_option("-z", "--reload-lir", action="store_true",
- dest="reload_lir", \
+ dest="reload_lir",
help=("use existing lir files to update the database; can "
"take up to 5 minutes"))
group.add_option("-o", "--download-cc", action="store_true",
@@ -729,20 +732,20 @@ def main():
dest="erase_cache", help="erase the local database cache")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Lookup modes",
- "Pick at most one of these modes to look up data in the " \
+ "Pick at most one of these modes to look up data in the "
"local cache. May not be combined with cache modes.")
- group.add_option("-4", "--ipv4", action="store", dest="ipv4", \
+ group.add_option("-4", "--ipv4", action="store", dest="ipv4",
help=("look up country code and name for the specified IPv4 "
"address"))
- group.add_option("-6", "--ipv6", action="store", dest="ipv6", \
+ group.add_option("-6", "--ipv6", action="store", dest="ipv6",
help=("look up country code and name for the specified IPv6 "
"address"))
- group.add_option("-a", "--asn", action="store", dest="asn", \
+ group.add_option("-a", "--asn", action="store", dest="asn",
help="look up country code and name for the specified ASN")
- group.add_option("-t", "--code", action="store", dest="cc", \
+ group.add_option("-t", "--code", action="store", dest="cc",
help=("look up all allocations in the delegation cache for "
"the specified two-letter country code"))
- group.add_option("-n", "--name", action="store", dest="cn", \
+ group.add_option("-n", "--name", action="store", dest="cn",
help=("look up all allocations in the delegation cache for "
"the specified full country name"))
group.add_option("-p", "--compare", action="store", dest="compare",
@@ -774,18 +777,18 @@ def main():
sys.exit(0)
if not database_cache.connect_to_database():
print "Could not connect to database."
- print "You may need to erase it using -e and then reload it " \
- "using -d/-z. Exiting."
+ print("You may need to erase it using -e and then reload it "
+ "using -d/-z. Exiting.")
sys.exit(1)
database_cache.set_db_version()
- downloader_parser = DownloaderParser(options.dir, database_cache, \
+ downloader_parser = DownloaderParser(options.dir, database_cache,
options.ua)
lookup = Lookup(options.dir, database_cache)
if options.ipv4 or options.ipv6 or options.asn or options.cc \
or options.cn or options.compare:
if downloader_parser.check_rir_file_mtimes():
- print "Your cached RIR files are older than 24 hours; you " \
- "probably want to update them."
+ print("Your cached RIR files are older than 24 hours; you "
+ "probably want to update them.")
if options.asn:
lookup.asn_lookup(options.asn)
elif options.ipv4:
@@ -797,18 +800,18 @@ def main():
if options.cc:
country = options.cc.upper()
elif not lookup.knows_country_names():
- print "Need to download country codes first before looking " \
- "up countries by name."
+ print("Need to download country codes first before looking "
+ "up countries by name.")
else:
country = lookup.get_country_code_from_name(options.cn)
if not country:
print "It appears your search did not match a country."
if country:
for request in ["ipv4", "ipv6", "asn"]:
- print "\n".join(lookup.fetch_rir_blocks_by_country(\
- request, country))
+ print("\n".join(lookup.fetch_rir_blocks_by_country(
+ request, country)))
elif options.compare:
- print ("Comparing assignments with overlapping assignments in other "
+ print("Comparing assignments with overlapping assignments in other "
"data sources...")
lookup.lookup_countries_in_different_source(options.compare)
elif options.init_maxmind or options.reload_maxmind:
@@ -839,6 +842,7 @@ def main():
downloader_parser.download_country_code_file()
database_cache.commit_and_close_database()
+
if __name__ == "__main__":
main()
View
@@ -6,6 +6,7 @@
import tempfile
import ipaddr
+
class BaseBlockfinderTest(unittest.TestCase):
def setUp(self):
self.base_test_dir = tempfile.mkdtemp()
@@ -24,6 +25,7 @@ def setUp(self):
def tearDown(self):
shutil.rmtree(self.base_test_dir, True)
+
class CheckReverseLookup(BaseBlockfinderTest):
def test_rir_ipv4_lookup(self):
self.assertEqual(self.database_cache.fetch_country_code('ipv4',
@@ -57,27 +59,29 @@ def test_lir_ipv6_lookup(self):
self.assertEqual(self.database_cache.fetch_country_code('ipv6',
'lir', int(ipaddr.IPv6Address('2001:670:0085::'))), 'FI')
+
class CheckBlockFinder(BaseBlockfinderTest):
def test_ipv4_bf(self):
known_ipv4_assignments = (
('MM', ['203.81.64.0/19', '203.81.160.0/20']),
('KP', ['175.45.176.0/22']))
for cc, values in known_ipv4_assignments:
- expected = [(int(ipaddr.IPv4Network(network_str).network), \
- int(ipaddr.IPv4Network(network_str).broadcast)) \
+ expected = [(int(ipaddr.IPv4Network(network_str).network),
+ int(ipaddr.IPv4Network(network_str).broadcast))
for network_str in values]
result = self.database_cache.fetch_assignments('ipv4', cc)
self.assertEqual(result, expected)
def test_ipv6_bf(self):
known_ipv6_assignments = ['2001:200::/35', '2001:200:2000::/35',
'2001:200:4000::/34', '2001:200:8000::/33']
- expected = [(int(ipaddr.IPv6Network(network_str).network), \
- int(ipaddr.IPv6Network(network_str).broadcast)) \
+ expected = [(int(ipaddr.IPv6Network(network_str).network),
+ int(ipaddr.IPv6Network(network_str).broadcast))
for network_str in known_ipv6_assignments]
result = self.database_cache.fetch_assignments('ipv6', 'JP')
self.assertEqual(result, expected)
+
if __name__ == '__main__':
for test_class in [CheckReverseLookup, CheckBlockFinder]:
test_suite = unittest.makeSuite(test_class)

0 comments on commit 7257197

Please sign in to comment.