Class to accumulate whois records and risk factors.
They will memoized to reduce requested services for more data.

1. Use a database to store the results for use in various programs.
2. Use an ipnetwork as key
3. Store company name with key
4. When a class instance is created, it will load db into dictionary.
5. The instance is normally readonly, but can be writeable. That means new addresses
will be added if they are not in the database and become permanent parts.



In [23]:
import sys
import pandas as pd
import numpy as np
import ipaddress
import dbm
import pickle
from sortedcontainers import SortedDict
import requests
from bs4 import BeautifulSoup
import json
import os
import traceback
import datetime

import pprint
pp = pprint.PrettyPrinter()

class Debug():
    """ Print debug messages if active """
    def __init__(self, set=1):
        self._set = set
    def prt(self, str):
        if self._set:
            sys.stderr.write(str)
    def set(self):
        self._set = 1
    def unset(self):
        self._set = 0
debug = Debug()


def get_risk(ip_string):
    # Return risk factors from scamalytics into a dict
    #     {"ip": ? , 
    #      "score": ?, 
    #      "risk": ?, 
    #      "risk_comment: ?"}

    # Fetch the complete record from scamalytics restful api
    # ip_string ... make request by ip address as a string

    html_text = ""
    url = "https://scamalytics.com/ip/" + ip_string
    html_text = requests.get(url).text

#     soup = BeautifulSoup(html_text, 'lxml')
    soup = BeautifulSoup(html_text, 'xml')
    
    # Tag=pre
    result = json.loads(soup.pre.string)
    
    # The comment is in the body of an unlabelled div. Used the css class to find.
    # Remove special UTF-8 character \U200b, a zero width space.
#     result["risk_comment"] = soup.find_all("div", class_="panel_body")[0].get_text().replace("\u200b","")
    result["risk_comment"] = ""
    
    return result

def parse_arin(html_text, ip_string):
    # Values that are not found are set to np.NaN
    
    fillna = lambda x: np.nan if x is None else x if isinstance(x, str) else x.string
    
    def get_streetaddress(soup):
        # More than one address line may be recorded
        tag = soup.org.streetaddress
        if tag is None:
            return np.nan
        address = []
        for line in tag:
            address.append(line.string)
        return address
    def get_postalcode(soup):
        return fillna(soup.org.postalcode)
    def get_city(soup):
        return fillna(soup.org.city)
    def get_handle(soup):
        return fillna(soup.org.handle)
    def get_state(soup):
        # The iso3166 tags are the international country codes
        # Ref: https://www.iso.org/glossary-for-iso-3166.html
        # BeautifulSoup does not parse tags contain "-" so
        # use find_all to locate the tags with a string search.
        tag = soup.org.find_all("iso3166-2")
        if tag is None:
            return np.nan
        for t in tag:
            x = fillna(t)
        return x
    def get_country(soup):
        tag = soup.org.find_all("iso3166-1")
        if tag is None:
            return np.nan
        for t in tag:
            x = fillna(t.find('name'))
        return x
    def get_countrycode(soup):
        tag = soup.org.find_all("iso3166-1")
        for t in tag:
            x = fillna(t.code2)
        return x
    def get_organization(soup, info):
        # There are 2 tag=name in the tag=org, one for country and one for organization. 
        # The country is part of iso3166-1 so it can be isolated. Look for organization
        # by looking at both tags and selecting the one that is not equal to country.
        for t in soup.org.find_all("name"):
            if t.string != info["country"]:
                x = fillna(t)
        return t
    def get_timestamp():
        return datetime.today()
    def get_cidr(soup, info):
        tag = soup.net.netblocks
        if tag is None:
            return None
        result = {}
        for netblock in tag:
            cidr_prefix = fillna(netblock.startaddress)
            cidr_length = fillna(netblock.cidrlength)
            if (cidr_prefix is None) or (cidr_length is None):
                continue
            cidr = cidr_prefix + "/" + cidr_length
            result[cidr] = info
        return result if len(result)>0 else None

    # Parse html into a hierarchy using BeautifulSoup 
    soup = BeautifulSoup(html_text, 'lxml')

    # ARIN reports a list of CIDR net_addresses. 
    # The database will be indexed by ipaddress.net_address.
    # A record will be written for each cidr and duplicate the ARIN info
    # Obtain the organization name from tag=net instead of the tag=org which
    # has more than one tag=name making it harder to isolate.
    try:
        info = {}

        # Obtain info from tag=org

        info["address"] = get_streetaddress(soup)
        info["postalcode"] = get_postalcode(soup)
        info["state"] = get_state(soup)
        info["country"] = get_country(soup)
        info["countrycode"] = get_countrycode(soup)
        info["organization"] = get_organization(soup, info)
        info["city"] = get_city(soup)
        info["handle"] = get_handle(soup)
        info["timestamp"] = get_timestamp()

        # Add the risk obtained from scamalytics
        info.update(get_risk(ip_string))
    
        # Parse into dict to return results, item by item
        result = get_cidr(soup)

        return result
    
    except:
        print(f"Error in parse_arin({ip_string=}")
        print(f"{html_text=}")
        return None

def get_arin(ip_string):
    '''Return dict for the net_address that contains this ip_string
        {"cidr": ?,
         {"organization": ? ,
          "handle": ? ,
          "city": ? ,
          "address" : ? ,
          "postalcode": ? ,
          "countrycode": ? ,
          "state": ? ,
          "country": ? ,
          "timestamp": ?,
         }
    '''

    # Fetch the complete record from arin restful api
    # Ref: https://www.arin.net/resources/registry/whois/rws/api/#networks-and-asns
    # ip_string ... make request by ip address as a string
    # pft ......... get full record
    
    url = "http://whois.arin.net/rest/ip/" + ip_string + "/pft"
    html_text = ""
    try:
        html_text = requests.get(url).text
    except:
        return None
#     import pdb; pdb.set_trace()
    return parse_arin(html_text, ip_string)

class Risk():
    
    def __init__(self, filename, readonly=True):

        # Open database. Create as needed.
        
        self.readonly = readonly
        self.open_option = f'{"r" if self.readonly else "w"}'
        self.db_filename = filename
        self.hp = pickle.HIGHEST_PROTOCOL
         
        try:
            self.db = dbm.open(self.db_filename, self.open_option)
        except:
            if self.readonly:
                print(f"{self.db_filename} does not exist but will not be created when class is {readonly=}")
                return None
            else:
                self.db = dbm.open(self.db_filename, "c")
                
        # Read the data into dictionary:
        #   risk[ipaddress.ipv4network] = [organization, country, risk]
        #   
        
        self.risk = SortedDict()
        self.risk_count = 0

        for key in self.db.keys():
            self.risk[pickle.loads(key)] = pickle.loads(self.db[key])

        self.risk_count = len(self.risk)
        self.db.close()

        
    def find(self, ip_string):
        """ 
        risk[cidr] = {organization, handle, city, address, postalcode, countrycode, state, ...}
        creating one if needed and adding it to the database.
        Return:
        - None ........ when Risk.ip is None
                        when Risk.ip and not Risk.findarin then Risk.findarin was found 
                        at ARIN site but could be added to db
        - ARIN dict ... Risk.ip and Risk.searchresult (==Risk.ip)
                        or Risk.ip and Risk.findarin and Risk.addarin (==Risk.findarin)
        """

        try: 
            self.ip = ipaddress.ip_address(ip_string)
        except:
            self.ip = None
            print(f"Could not find IPv4Address for {ip_string}")
            return None
        
        # Find the address to insert
        self.searchresult = None
        self.findarin = None
        self.addarin = None
        
        self.searchresult = self.cidr_search(self.ip)
        if self.searchresult is None:
            self.findarin = get_arin(ip_string)
            if self.findarin is None:
                debug.prt(f"No arin results for {ip_string=}\n")
                return None
            self.addarin = self.add(self.findarin)
            if self.addarin is None:
                debug.prt(f"ARIN results could not be added for {ip_string=}\n")
                return None
            return self.findarin
        else:
            return self.searchresult
    

    def add(self, new_risks):
        '''
        Add the result of get_arin, a dict with cidr as key 
        to both the Risk.risk dict and the database.
        Return:
        None ... No risks to add.
                 A risk value could not be pickle'd
        True ... Risks added successfully
        '''
        
        if (new_risks is None) or (len(new_risks) == 0) or self.readonly:
            return None
        
        # Store in dictionary first.
        # There may be more than one cidr retrieved by get_arin
        # Each CIDR has to be type ip_network

        for new_cidr, new_risk in new_risks.items():
            netblock = ipaddress.ip_network(new_cidr)
            self.risk[netblock] = new_risk
        
        # Store in database next
        if not self.readonly:
            
            # Collect the pickle's of each netblock and risk
            additions = []
            for new_cidr, new_risk in new_risks.items():
                netblock = ipaddress.ip_network(new_cidr)
                # key and value have to be pickle'd before storing
                try:
                    pickled_netblock = pickle.dumps(netblock, protocol=self.hp)
                    # This is a hack that allows pickle to work 
                    new_risk_temp = f"{new_risk}"
                    pickled_risk     = pickle.dumps(new_risk_temp, protocol=self.hp)
                    additions.append([pickled_netblock, pickled_risk])
                except BaseException as ex:
                    print(ex)
                    debug.prt(f"Pickle error: {new_cidr=}\n{new_risk=}\n")
                    return None

            # Write into database making sure to close it
            with dbm.open(self.db_filename, self.open_option) as self.db:
                for addition in additions:
                    self.db[addition[0]] = addition[1]

        return True
    

    def len(self):
        return self.risk_count


    def cidr_search(self, target_ip):
        # risk.cidr_search(target_ip) is True when ip's network is in db
        # type(target_ip) is ipaddress.IPv4Address
        # Updates cidr_search_result property with risk[cidr of target_ip] else None
        sz = len(self.risk)
        if sz == 0:
            self.bisearch_result = None
            return False
        s = 0
        e = sz
        while s > e:
            m = (s + e)//2
            cidr = self.risk.peekitem(m)[0]
            if target_ip in cidr:
                self.cidr_search_result = self.risk[cidr]
                return True
            if target_ip > cidr:
                s = m + 1
            else:
                e = m - 1
        self.cidr_search_result = None
        return False


# Routine to read a clean set ip addresses from the sample data
# and icorporate them into the risk database

db_filename = "mywhois"
sample_filename = "clean_test_data.csv"

# Open the database and load the current data
risk = Risk(db_filename, readonly=False)

# Read the clean set of sample data set
clean_ip = pd.read_csv(sample_filename)
                       
# range over the unique ip addresses
new = 0
old = 0
for n, ip in enumerate(clean_ip.ip.drop_duplicates()[:10]):
    before = risk.len()
    risk.find(ip)
    after  = risk.len()
    if before == after:
        old += 1
    else:
        new += after - before
        
print(f"{n=} {new=} {old=} {risk.len()=}")


n=9 new=0 old=10 risk.len()=0


In [None]:
def api_arin(ip_string):
    url       = "http://whois.arin.net/rest/ip/" + ip_string + "/pft"
    html_text = ""
    try:
        html_text = requests.get(url).text
    except:
        return None
    # Parse html into a hierarchy using BeautifulSoup 
#     soup = BeautifulSoup(html_text, 'lxml')
    soup = BeautifulSoup(html_text, 'xml')
    return soup

In [None]:
soup = api_arin("24.228.215.103")
print(f"{len(soup)=}")

In [None]:
print(soup.netblocks)

In [None]:
# Routine to read a clean set ip addresses from the sample data
# and icorporate them into the resk database

db_filename = "mywhois"
sample_filename = "clean_test_data.csv"

# Open the database and load the current data
risk = Risk(db_filename, readonly=False)

# Read the clean set of sample data set
clean_ip = pd.read_csv(sample_filename)
                       
# range over the unique ip addresses
new = 0
old = 0
for n, ip in enumerate(clean_ip.ip.drop_duplicates()[2:3]):
    before = risk.len()
    risk.find(ip)
    after  = risk.len()
    if before == after:
        old += 1
    else:
        new += 1
print(f"{n=} {new=} {old=} {risk.len()=}")

    



In [None]:
risk=Risk("mywhois")

In [12]:
for ip in clean_ip.ip.drop_duplicates()[:10]:
    print(ip)

142.255.122.114
172.58.230.193
72.68.212.63
172.100.125.174
24.228.215.103
68.199.195.147
108.29.95.66
32.208.115.88
24.189.68.18
172.58.235.43


In [None]:
work={'organization': 'MCI Communications Services, Inc. d/b/a Verizon Business', 'handle': 'MCICS', 'asn': '', 'city': 'Ashburn', 'address': ['22001 Loudoun County Pkwy'], 'postalcode': '20147', 'state': 'VA', 'country': 'United States', 'ip': '72.68.212.63', 'score': '21', 'risk': 'medium', 'risk_comment': 'IP address 72.68.212.63 is operated by Verizon Communications whose web traffic we consider to present a potentially medium fraud risk. This IP address is owned by MCI Communications Services, Inc. d/b/a Verizon Business whose web traffic we also consider to present a potentially medium fraud risk. In both cases, non-web traffic may present a different risk or no risk at all. Scamalytics see low levels of traffic from Verizon Communications across our global network, little of which we suspect to be potentially fraudulent. We have no visibility into the web traffic directly from 72.68.212.63, and therefore apply a risk score of 21/100 based on the overall risk from Verizon Communications’s IP addresses where we do have visibility.'}
pickle.dumps(f"{work}")


In [None]:
pp.pprint(soup)

In [None]:
for k in risk.risk:
    print(k)

In [11]:
# t=requests.get("http://whois.arin.net/rest/ip/" + ip_string + "/pft").text

# s = BeautifulSoup(t)

# s.org

# for x in s.org.find_all("iso3166-1"):
#     country = x.find('name').string
# print(country)

# for x in s.org.find_all("name"):
#     if x.string != country:x
#         organization = x.string


# print(organization)

NameError: name 's' is not defined

In [17]:
def parse_arin(html_text, ip_string):
    # Values that are not found are set to np.NaN
    
    fillna = lambda x: np.nan if x is None else x if isinstance(x, str) else x.string
    
    def get_streetaddress(soup):
        # More than one address line may be recorded
        tag = soup.org.streetaddress
        if tag is None:
            return np.nan
        address = []
        for line in tag:
            address.append(line.string)
        return address
    def get_postalcode(soup):
        return fillna(soup.org.postalcode)
    def get_city(soup):
        return fillna(soup.org.city)
    def get_handle(soup):
        return fillna(soup.org.handle)
    def get_state(soup):
        # The iso3166 tags are the international country codes
        # Ref: https://www.iso.org/glossary-for-iso-3166.html
        # BeautifulSoup does not parse tags contain "-" so
        # use find_all to locate the tags with a string search.
        tag = soup.org.find_all("iso3166-2")
        if tag is None:
            return np.nan
        for t in tag:
            x = fillna(t)
        return x
    def get_country(soup):
        tag = soup.org.find_all("iso3166-1")
        if tag is None:
            return np.nan
        for t in tag:
            x = fillna(t.find('name'))
        return x
    def get_countrycode(soup):
        tag = soup.org.find_all("iso3166-1")
        for t in tag:
            x = fillna(t.code2)
        return x
    def get_organization(soup, info):
        # There are 2 tag=name in the tag=org, one for country and one for organization. 
        # The country is part of iso3166-1 so it can be isolated. Look for organization
        # by looking at both tags and selecting the one that is not equal to country.
        for t in soup.org.find_all("name"):
            if t.string != info["country"]:
                x = fillna(t)
        return t
    
    # Parse into dict to return results, item by item
    result = {}

    # Parse html into a hierarchy using BeautifulSoup 
    soup = BeautifulSoup(html_text, 'lxml')

    # ARIN reports a list of CIDR net_addresses. 
    # The database will be indexed by ipaddress.net_address.
    # A record will be written for each cidr and duplicate the ARIN info
    # Obtain the organization name from tag=net instead of the tag=org which
    # has more than one tag=name making it harder to isolate.
    try:
        info = {}

        # Obtain info from tag=org

        info["address"] = get_streetaddress(soup)
        info["postalcode"] = get_postalcode(soup)
        info["state"] = get_state(soup)
        info["country"] = get_country(soup)
        info["countrycode"] = get_countrycode(soup)
        info["organization"] = get_organization(soup, info)
        info["city"] = get_city(soup)
        info["handle"] = get_handle(soup)

        # Add the risk obtained from scamalytics
#         info.update(get_risk(ip_string))

        # The netblocks scope contains a list of netblock sections
        for netblock in soup.net.netblocks:
            cidr = netblock.startaddress.string + "/" + netblock.cidrlength.string      
            result[cidr] = info
        return result
    
    except:
        print(f"Error in parse_arin({ip_string=}")
        print(f"{html_text=}")
        return None

def xget_arin(ip_string):
    '''Return dict for the net_address that contains this ip_string
        {"cidr": ?,
         {"organization": ? ,
          "handle": ? ,
          "city": ? ,
          "address" : ? ,
          "postalcode": ? ,
          "countrycode": ? ,
          "state": ? ,
          "country": ? ,
         }
    '''

    # Fetch the complete record from arin restful api
    # Ref: https://www.arin.net/resources/registry/whois/rws/api/#networks-and-asns
    # ip_string ... make request by ip address as a string
    # pft ......... get full record
    
    url = "http://whois.arin.net/rest/ip/" + ip_string + "/pft"
    html_text = ""
    try:
        html_text = requests.get(url).text
    except:
        return None
#     import pdb; pdb.set_trace()
    return parse_arin(html_text, ip_string)


In [19]:
ip_string = "72.68.212.63"
pp.pprint(xget_arin(ip_string))


{'72.65.128.0/17': {'address': ['22001 Loudoun County Pkwy'],
                    'city': 'Ashburn',
                    'country': 'United States',
                    'countrycode': 'US',
                    'handle': 'MCICS',
                    'organization': <name>MCI Communications Services, Inc. d/b/a Verizon Business</name>,
                    'postalcode': '20147',
                    'state': 'VA'},
 '72.66.0.0/15': {'address': ['22001 Loudoun County Pkwy'],
                  'city': 'Ashburn',
                  'country': 'United States',
                  'countrycode': 'US',
                  'handle': 'MCICS',
                  'organization': <name>MCI Communications Services, Inc. d/b/a Verizon Business</name>,
                  'postalcode': '20147',
                  'state': 'VA'},
 '72.68.0.0/15': {'address': ['22001 Loudoun County Pkwy'],
                  'city': 'Ashburn',
                  'country': 'United States',
                  'countrycode': 'US',
    

In [22]:
ip = ipaddress.ip_address("256.0.0.0")

ValueError: '256.0.0.0' does not appear to be an IPv4 or IPv6 address