Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

* Added basic unit tests

* Fixed some bugs in the rir and lir function

* Found a way to speed up the reverse ip lookup function

* Refactored some code to get it to cooperate with unit tests
  • Loading branch information...
commit 23b0c21f160fdc6c0e5e7eefdadc6922feac462f 1 parent c56992e
@hellais hellais authored
Showing with 141 additions and 28 deletions.
  1. +90 −28 blockfinder
  2. +1 −0  blockfinder.py
  3. +50 −0 blockfindertest.py
View
118 blockfinder
@@ -168,8 +168,7 @@ def get_possible_match_entries(cc,cache_dir):
cursor.close()
return count
-def use_sql_database(request, cc, cache_dir):
-
+def use_sql_database_call(request, cc, cache_dir):
""" Use the sqlite database that is created after fetching delegations
to output information for a given request """
conn = sqlite3.connect(cache_dir + "sqlitedb")
@@ -179,18 +178,25 @@ def use_sql_database(request, cc, cache_dir):
text ="""select start,value from """ + request + """ where cc=?"""
cc = (cc,)
cursor.execute(text,cc)
+ result = []
for row in cursor:
if request == "ipv4":
- print str(row[0]) + "/" + str(calculate_ipv4_subnet(int(row[1])))
+ result.append(str(row[0]) + "/" + str(calculate_ipv4_subnet(int(row[1]))))
elif request == "ipv6":
- print str(row[0]) + "/" + str(int(row[1]))
+ result.append(str(row[0]) + "/" + str(int(row[1])))
else:
- print str(int(row[0]))
+ result.append(str(int(row[0])))
if verbose:
print "We found %d possible entries in our delegation cache." % get_possible_match_entries(cc, cache_dir)
cursor.execute("""select count(*) from """ + request + """ where cc=?""", cc )
print "We found %d matching entries in our delegation cache." % int (cursor.fetchone()[0] )
cursor.close()
+ return result
+
+def use_sql_database(request, cc, cache_dir):
+ result = use_sql_database_call(request, cc, cache_dir)
+ for i in result:
+ print i
def get_md5_from_delegation_md5_file(cache_dir, delegation_file):
""" Returns the md5sum from the delegation md5 file
@@ -374,9 +380,16 @@ def ip_address_to_dec(ip_addr):
decimal = int(total,16)
return decimal
-
-
-
+def ip_address_valid(ip_addr):
+ ipv4arr = ip_addr.split('.')
+ if len(ipv4arr) == 4:
+ for items in ipv4arr:
+ if int(items) > 255:
+ return False
+ return True
+ else:
+ return False
+
def geoip_lookup(cache_dir, ip_addr):
if len(ip_addr.split('.')) != 4:
# This would work with the CVS version of the GeoIP code
@@ -394,9 +407,63 @@ def geoip_lookup(cache_dir, ip_addr):
cc_name = gi.country_name_by_addr(ip_addr)
return cc,cc_name
+def rir_lookup(ip_addr, cache_dir):
+
+ ipv4arr = ip_addr.split('.')
+ result = []
+
+ conn = sqlite3.connect(cache_dir +"sqlitedb")
+ cursor = conn.cursor()
+
+ cursor.execute('select * from ipv4 WHERE start LIKE ?', ( ipv4arr[0] + "." + ipv4arr[1] + ".%",))
+ row = cursor.fetchone()
+ if row is None:
+ cursor.execute('select * from ipv4 WHERE start LIKE ?', ( ipv4arr[0] + ".%",))
+ row = cursor.fetchone()
+
+ while(row is not None):
+ if(ip_address_to_dec(row[2]) <= ip_address_to_dec(ip_addr) < (ip_address_to_dec(row[2])+row[
+3])):
+ rir_cc = row[1]
+ rir_cc_name = get_name_from_country_code(cache_dir, row[1])
+ result.append(rir_cc)
+ result.append(rir_cc_name)
+ return result
+ row = cursor.fetchone()
+ cursor.close()
+ return 1
+
+def lir_lookup(ip_addr, cache_dir):
+ conn = sqlite3.connect(cache_dir +"sqlitedb")
+ cursor = conn.cursor()
+
+ ipv4arr = ip_addr.split('.')
+ result = []
+
+ cursor.execute('select * from lir_record WHERE start LIKE ? and type=4', (ipv4arr[0] + "." + ipv4arr[1] + ".%",))
+ row = cursor.fetchone()
+ if row is None:
+ cursor.execute('select * from lir_record WHERE start LIKE ? and type=4', (ipv4arr[0] + ".%",))
+ row = cursor.fetchone()
+
+
+ while(row is not None):
+ if (ip_address_to_dec(row[1]) <= ip_address_to_dec(ip_addr) <= (ip_address_to_dec(row[1]) + row[2])):
+ result.append(row[0])
+ result.append(get_name_from_country_code(cache_dir, row[0]))
+ return result
+ row = cursor.fetchone()
+ cursor.close()
+ return 1
+
def lookup_ip_address(ip_addr, cache_dir):
""" Return the country code and name for a given ip address. Attempts to
use GeoIP if available."""
+
+ if ip_address_valid(ip_addr) == False:
+ print 'Invalid ip address!'
+ return 1
+
ip_addr = socket.gethostbyname(ip_addr)
rir_cc = ""
print "Reverse lookup for: " + ip_addr
@@ -404,36 +471,31 @@ def lookup_ip_address(ip_addr, cache_dir):
geoip_cc, geoip_cc_name = geoip_lookup(cache_dir, ip_addr)
print "GeoIP country code: " + str(geoip_cc)
print "GeoIP country name: " + str(geoip_cc_name)
- ipv4arr = ip_addr.split('.')
+
+ ipv4arr = ip_addr.split('.')
if len(ipv4arr) != 4:
print """doesn't look like an ipv4 address.."""
sys.exit(5)
+ rir = rir_lookup(ip_addr, cache_dir)
+
+ if(rir != 1):
+ print 'RIR country code: ' + rir[0]
+ print 'RIR country: ' + rir[1]
+ else:
+ print 'Not found in RIR db'
- conn = sqlite3.connect(cache_dir +"sqlitedb")
- cursor = conn.cursor()
- cursor.execute('select * from ipv4 WHERE start LIKE ?', ( ipv4arr[0] + ".%",))
- for row in cursor:
- if(ip_address_to_dec(row[2]) <= ip_address_to_dec(ip_addr) <= (ip_address_to_dec(row[2])+row[3])):
- rir_cc = row[1]
- rir_cc_name = get_name_from_country_code(cache_dir, row[1])
- print 'RIR country code: ' + rir_cc
- print 'RIR country: ' + rir_cc_name
- break
-
- cursor.execute('select * from lir_record WHERE start LIKE ? and type=4', (ipv4arr[0] + ".%",))
- for row in cursor:
- if (ip_address_to_dec(row[1]) <= ip_address_to_dec(ip_addr) <= (ip_address_to_dec(row[1]) + row[2])):
- print 'LIR country code: ' + row[0]
- print 'LIR country: ', get_name_from_country_code(cache_dir, row[0])
- break
-
- cursor.close()
+ lir = lir_lookup(ip_addr, cache_dir)
+ if(lir != 1):
+ print 'LIR country code: ' + lir[0]
+ print 'LIR country code: ' + lir[1]
+
if GeoIP:
if geoip_cc != rir_cc:
print "It appears that the RIR data conflicts with the GeoIP data"
print "The GeoIP data is likely closer to being correct due to " \
"sub-delegation issues with LIR databases"
+
def return_first_ip_and_number_in_inetnum(line):
start_ip = line.split("-")[0].strip()
end_ip = line.split("-")[1].strip()
View
1  blockfinder.py
View
50 blockfindertest.py
@@ -0,0 +1,50 @@
+#!/usr/bin/python
+import blockfinder
+import unittest
+import os
+
+class CheckReverseLookup(unittest.TestCase):
+
+ ipValues = ( (3229318011, '192.123.123.123'),
+ (3463778365, '206.117.16.61'),
+ (4278190202, '255.0.0.122'),
+ (3654084623, '217.204.232.15'),
+ (134217728, '8.0.0.0'))
+
+ rirValues = ( ('217.204.232.15', 'GB'),
+ ('188.72.225.100', 'DE'),
+ ('8.8.8.1', 'US'))
+
+ cache_dir = str(os.path.expanduser('~')) + "/.blockfinder/"
+
+ def test_rir_lookup(self):
+ for ip, cc in self.rirValues:
+ result = blockfinder.rir_lookup(ip, self.cache_dir)
+ self.assertEqual(result[0], cc)
+
+ def test_ip_address_to_dec(self):
+ for dec, ip in self.ipValues:
+ result = blockfinder.ip_address_to_dec(ip)
+ self.assertEqual(result, dec)
+
+class CheckBlockFinder(unittest.TestCase):
+ cache_dir = str(os.path.expanduser('~')) + "/.blockfinder/"
+
+
+ # You can add known blocks to the tuple as a list
+ # they will be looked up and checked
+ knownResults = ( ('mm', ['203.81.64.0/19',
+ '203.81.160.0/20']),
+ ('kp', ['175.45.176.0/22']))
+
+ def test_ipv4_bf(self):
+ blockfinder.verbose = 0
+ for cc, values in self.knownResults:
+ self.result = blockfinder.use_sql_database_call("ipv4", cc.upper(), self.cache_dir)
+ self.assertEqual(self.result, values)
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+
Please sign in to comment.
Something went wrong with that request. Please try again.