# 1) Without using online resources for this question, describe how you would go about trying to automate the detection of such domains? There are essentially 2 ways of doing.


# 2) Find a couple of commercial products that cover monitoring of typosquatted domains


# 3) Similarly, find a couple of open source products.


# 4) Discuss the pros and cons of the 2 methods

# Prerequisite Python Packages

## Python Packages Used:
### pandas: for visualizing data in table
### selenium: for scraping data from website
### requests: for doing API calls and getting the response
### asyncio: for doing asynchronous actions in python
### aiohttp: asynchronous HTTP client/server framework built on top of asyncio
### Levenshtein: for calculating Levenshtein distance
### Flask: for creating the rest api for quering domain registration date and country

In [166]:
pip install pandas selenium==3.141.0 requests asyncio aiohttp Levenshtein Flask

Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.



# Data for homoglyph replacement mapping

In [158]:
SIMILAR_CHAR = {
    '0': ['o'],
    '1': ['l', 'i', 'ı'],
    '2': ['ƻ'],
    '5': ['ƽ'],
    'a': ['à', 'á', 'à', 'â', 'ã', 'ä', 'å', 'ɑ', 'ạ', 'ǎ', 'ă', 'ȧ', 'ą', 'ə'],
    'b': ['d', 'ʙ', 'ɓ', 'ḃ', 'ḅ', 'ḇ', 'ƅ'],
    'c': ['e', 'ƈ', 'ċ', 'ć', 'ç', 'č', 'ĉ', 'ᴄ'],
    'd': ['b', 'cl', 'ɗ', 'đ', 'ď', 'ɖ', 'ḑ', 'ḋ', 'ḍ', 'ḏ', 'ḓ'],
    'e': ['c', 'é', 'è', 'ê', 'ë', 'ē', 'ĕ', 'ě', 'ė', 'ẹ', 'ę', 'ȩ', 'ɇ', 'ḛ'],
    'f': ['ƒ', 'ḟ'],
    'g': ['q', 'ɢ', 'ɡ', 'ġ', 'ğ', 'ǵ', 'ģ', 'ĝ', 'ǧ', 'ǥ'],
    'h': ['ĥ', 'ȟ', 'ħ', 'ɦ', 'ḧ', 'ḩ', 'ⱨ', 'ḣ', 'ḥ', 'ḫ', 'ẖ'],
    'i': ['1', 'l', 'í', 'ì', 'ï', 'ı', 'ɩ', 'ǐ', 'ĭ', 'ỉ', 'ị', 'ɨ', 'ȋ', 'ī', 'ɪ'],
    'j': ['ʝ', 'ǰ', 'ɉ', 'ĵ'],
    'k': ['lc', 'ḳ', 'ḵ', 'ⱪ', 'ķ', 'ᴋ'],
    'l': ['1', 'i', 'ɫ', 'ł', 'ı', 'ɩ'],
    'm': ['n', 'nn', 'rn', 'rr', 'ṁ', 'ṃ', 'ᴍ', 'ɱ', 'ḿ'],
    'n': ['m', 'r', 'ń', 'ṅ', 'ṇ', 'ṉ', 'ñ', 'ņ', 'ǹ', 'ň', 'ꞑ'],
    'o': ['0', 'ȯ', 'ọ', 'ỏ', 'ơ', 'ó', 'ö', 'ᴏ'],
    'p': ['ƿ', 'ƥ', 'ṕ', 'ṗ'],
    'q': ['g', 'ʠ'],
    'r': ['ʀ', 'ɼ', 'ɽ', 'ŕ', 'ŗ', 'ř', 'ɍ', 'ɾ', 'ȓ', 'ȑ', 'ṙ', 'ṛ', 'ṟ'],
    's': ['ʂ', 'ś', 'ṣ', 'ṡ', 'ș', 'ŝ', 'š', 'ꜱ'],
    't': ['ţ', 'ŧ', 'ṫ', 'ṭ', 'ț', 'ƫ'],
    'u': ['ᴜ', 'ǔ', 'ŭ', 'ü', 'ʉ', 'ù', 'ú', 'û', 'ũ', 'ū', 'ų', 'ư', 'ů', 'ű', 'ȕ', 'ȗ', 'ụ'],
    'v': ['ṿ', 'ⱱ', 'ᶌ', 'ṽ', 'ⱴ', 'ᴠ'],
    'w': ['vv', 'ŵ', 'ẁ', 'ẃ', 'ẅ', 'ⱳ', 'ẇ', 'ẉ', 'ẘ', 'ᴡ'],
    'x': ['ẋ', 'ẍ'],
    'y': ['ʏ', 'ý', 'ÿ', 'ŷ', 'ƴ', 'ȳ', 'ɏ', 'ỿ', 'ẏ', 'ỵ'],
    'z': ['ʐ', 'ż', 'ź', 'ᴢ', 'ƶ', 'ẓ', 'ẕ', 'ⱬ']
}

# Code for generating variants of a monitored domain using addition, omission, replacement, repetition, homoglyps and bit squatting.

In [248]:
"""
    Module for generating variants of a monitored domain

"""

import tldextract


class GenerateDomainName:
    """
    A class to generate domain names using
    diffrent type squatting methods

    ...

    Attributes
    ----------
    domain : str
        domain that need to be searched for
        Typosquatting related entries

    Methods
    -------
    parse_domain():
        parse a domain into three parts domain_prefix,
        domain_without_tld and domain_tld

    append_prefix_suffix():
        appends domain_prefix and domain_tld to domain name

    get_letters_numbers():
        generates numbers [0-9] and letter [a-z]

    addition():
        adds single characters from [a-z] and [0-9]
        to the domain name

    omission():
        removes a charactor from the domain name

    repetition():
        using all characters that already exists in the
        domain name one by one to create new domain name

    replacement():
        replaces a charactor in the domain name

    replace_char():
        replaces a charactor with multiple occurrence
        one at a time in a string with another charactor

    homoglyphs():
        replaces all possible homoglyphs using
        the mapping data

    bit_squatting():
        flips the jTh bit in iTh charactor of any domain to
        generate multiple possible bit-squatting

    main():
        main method that calls all the
        functions to generate domain names

    """

    def __init__(self, domain: str):
        self.domain = domain
        prefix, domain_name, tld = self.parse_domain()
        self.domain_prefix = prefix
        self.domain_name = domain_name
        self.domain_tld = tld
        self.domain_list = []

    def parse_domain(self) -> tuple:
        """
            This method splits domain to domain_prefix,
            domain_without_tld and domain_tld

        Returns:
            tuple: domain_prefix, domain_without_tld, domain_tld
        """
        domain_extract = tldextract.extract(self.domain)
        domain_prefix = domain_extract.subdomain + "."
        domain_without_tld = domain_extract.domain
        domain_tld = "." + domain_extract.suffix
        return domain_prefix, domain_without_tld, domain_tld

    def append_prefix_suffix(self, name: str) -> str:
        """
            This method appends the domain prefix and tld
            to generated domain name

        Returns:
            list: domain name with prefix and tld
        """
        return self.domain_prefix + name + self.domain_tld

    @staticmethod
    def get_letters_numbers() -> list:
        """
            This method generates numbers [0-9] and letter [a-z]

        Returns:
            list: list of numbers [0-9] and letters [a-z]
        """
        number_strings = [str(i) for i in range(10)]
        letter_strings = [chr(ord('a') + i) for i in range(26)]
        combined_list = number_strings + letter_strings
        return combined_list

    def addition(self) -> list:
        """
            This method adds single characters from [a-z] and [0-9]
            to the domain name to create possible domain names

        Returns:
            list: list of type squatted domains
        """
        combined_list = self.get_letters_numbers()
        for addition_string in combined_list:
            for j in range(0, len(self.domain_name)):
                domain_variation = self.append_prefix_suffix(self.domain_name[:j] + \
                                                             addition_string + self.domain_name[j:])
                if domain_variation not in self.domain_list:
                    self.domain_list.append(domain_variation)
                if j == len(self.domain_name) - 1:
                    domain_variation = self.append_prefix_suffix(self.domain_name + \
                                                                 addition_string)
                    self.domain_list.append(domain_variation)
        return self.domain_list

    def omission(self) -> list:
        """
            This method removes a charactor from the domain name
            to create possible domain names

        Returns:
            list: list of type squatted domains
        """
        for i in range(0, len(self.domain_name)):
            domain_variation = self.append_prefix_suffix(self.domain_name[0:i] +
                                self.domain_name[i + 1:len(self.domain_name)])
            if domain_variation not in self.domain_list:
                self.domain_list.append(domain_variation)
        return self.domain_list

    def repetition(self) -> list:
        """
            This method uses all characters that already exists in the
            domain name one by one to create possible domain names

        Returns:
            list: list of type squatted domains
        """
        for i, letter in enumerate(self.domain_name):
            domain_variation = self.append_prefix_suffix(self.domain_name[:i]
                                                         + letter + self.domain_name[i:])
            if domain_variation not in self.domain_list:
                self.domain_list.append(domain_variation)
        return self.domain_list

    def replacement(self) -> list:
        """
            This method replaces a charactor in the domain name
            to create possible domain names

        Returns:
            list: list of type squatted domains
        """
        letters_numbers = self.get_letters_numbers()
        for replacement_string in letters_numbers:
            for j in range(0, len(self.domain_name)):
                first = self.domain_name[:j]
                last = self.domain_name[j + 1:]
                domain_variation = self.append_prefix_suffix(first + 
                                            replacement_string + last)
                if domain_variation not in self.domain_list:
                    self.domain_list.append(domain_variation)
        return self.domain_list

    @staticmethod
    def replace_char(original_string: str, target: str, replacement: str) -> list:
        """
            This method replaces a charactor with multiple occurrence
            one at a time in a string with another charactor

        Returns:
            list: list of possible strings after replacement
        """
        index = []
        new_strings = []
        for i, char in enumerate(original_string):
            if char == target:
                index.append(i)
        for item in index:
            if item != -1:
                new_string = original_string[:item] + replacement + \
                            original_string[item + 1:]
                new_strings.append(new_string)
            else:
                return [original_string]
        return new_strings

    def homoglyphs(self) -> list:
        """
            This method replaces all possible homoglyphs using
            the mapping data to create possible domain names

        Returns:
            list: list of type squatted domains
        """
        for item in self.domain_name:
            if item in SIMILAR_CHAR:
                for glyph in SIMILAR_CHAR[item]:
                    modified_domains = self.replace_char(self.domain_name,
                                                         item, glyph)
                    for modified_domain in modified_domains:
                        domain_variation = self.append_prefix_suffix(modified_domain)
                        if domain_variation not in self.domain_list:
                            self.domain_list.append(domain_variation)
        return self.domain_list

    def bit_squatting(self) -> list:
        """
            This method flips the jTh bit in iTh charactor of any domain to
            generate multiple possible bit-squatting

        Returns:
            list: list of type squatted domains
        """
        for i, _ in enumerate(self.domain_name):
            for j in range(8):
                bit_squatted_char = chr(ord(self.domain_name[i]) ^ (1 << j))
                if bit_squatted_char.isalnum():
                    domain_variation = self.append_prefix_suffix(
                        self.domain_name[:i] + bit_squatted_char + self.domain_name[i + 1:])
                    if domain_variation not in self.domain_list:
                        self.domain_list.append(domain_variation)
        return self.domain_list

    def main(self) -> list:
        """
            This method is the main method that calls all the
            functions to generate domain names

        Returns:
            list: list of type squatted domains
        """
        self.addition()
        self.omission()
        self.repetition()
        self.replacement()
        self.homoglyphs()
        self.bit_squatting()
        return self.domain_list


if __name__ == '__main__':
    possible_data = GenerateDomainName("www.google.com").main()
    print(possible_data)


['www.0google.com', 'www.g0oogle.com', 'www.go0ogle.com', 'www.goo0gle.com', 'www.goog0le.com', 'www.googl0e.com', 'www.google0.com', 'www.1google.com', 'www.g1oogle.com', 'www.go1ogle.com', 'www.goo1gle.com', 'www.goog1le.com', 'www.googl1e.com', 'www.google1.com', 'www.2google.com', 'www.g2oogle.com', 'www.go2ogle.com', 'www.goo2gle.com', 'www.goog2le.com', 'www.googl2e.com', 'www.google2.com', 'www.3google.com', 'www.g3oogle.com', 'www.go3ogle.com', 'www.goo3gle.com', 'www.goog3le.com', 'www.googl3e.com', 'www.google3.com', 'www.4google.com', 'www.g4oogle.com', 'www.go4ogle.com', 'www.goo4gle.com', 'www.goog4le.com', 'www.googl4e.com', 'www.google4.com', 'www.5google.com', 'www.g5oogle.com', 'www.go5ogle.com', 'www.goo5gle.com', 'www.goog5le.com', 'www.googl5e.com', 'www.google5.com', 'www.6google.com', 'www.g6oogle.com', 'www.go6ogle.com', 'www.goo6gle.com', 'www.goog6le.com', 'www.googl6e.com', 'www.google6.com', 'www.7google.com', 'www.g7oogle.com', 'www.go7ogle.com', 'www.goo7gl

In [249]:
#list of generated domains using addition, omission, replacement, repetition, homoglyps and bit squatting
possible_data


['www.0google.com',
 'www.g0oogle.com',
 'www.go0ogle.com',
 'www.goo0gle.com',
 'www.goog0le.com',
 'www.googl0e.com',
 'www.google0.com',
 'www.1google.com',
 'www.g1oogle.com',
 'www.go1ogle.com',
 'www.goo1gle.com',
 'www.goog1le.com',
 'www.googl1e.com',
 'www.google1.com',
 'www.2google.com',
 'www.g2oogle.com',
 'www.go2ogle.com',
 'www.goo2gle.com',
 'www.goog2le.com',
 'www.googl2e.com',
 'www.google2.com',
 'www.3google.com',
 'www.g3oogle.com',
 'www.go3ogle.com',
 'www.goo3gle.com',
 'www.goog3le.com',
 'www.googl3e.com',
 'www.google3.com',
 'www.4google.com',
 'www.g4oogle.com',
 'www.go4ogle.com',
 'www.goo4gle.com',
 'www.goog4le.com',
 'www.googl4e.com',
 'www.google4.com',
 'www.5google.com',
 'www.g5oogle.com',
 'www.go5ogle.com',
 'www.goo5gle.com',
 'www.goog5le.com',
 'www.googl5e.com',
 'www.google5.com',
 'www.6google.com',
 'www.g6oogle.com',
 'www.go6ogle.com',
 'www.goo6gle.com',
 'www.goog6le.com',
 'www.googl6e.com',
 'www.google6.com',
 'www.7google.com',


# Web Scraping Code

### 1) This website takes some time to load the data based on the domain that we are trying to check.
### 2) Once the data is loaded, data is scraped from the site based on class which is "domain-row resolved"
### 3) Domain with punycode is also pulled and puny coded domain details are also attached in the final data.
### 4) Data is processed and returned in the form of list of dictionary and the status and message of the code    execution status

## Note:
### This code uses selenium to scape data from dnstwister.report website which is tested with Chrome Browser.
### Code is tested with Chrome Version 123.0.6312.122 and Chrome webdriver version 123.

In [232]:
"""
    Module for getting report of a domain of your choice
    from https://dnstwister.report and scrape the required 
    details.

"""
import re
import idna
import json
import binascii
import ipaddress
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

DNSTWISTER_ENDPOINT = "https://dnstwister.report"
SUPPORTED_BROWSERS = ["chrome", "firefox", "edge"]


class ScrapeData:
    """
    A class to scrap data from dnsteister website

    ...

    Attributes
    ----------
    domain : str
        domain url that need to be searched for 
        Typosquatting related entries
    browser : str
        name of the browser that is supported.
        supported browsers are: 
        1) firefox 
        2) chrome
        3) edge

    Methods
    -------
    get_driver():
        get the driver for supported browser

    scrap_domain_data():
        scrap domain data of the domain that is passed
        
    encode_domain_name():
        encodes the domain name
    """

    def __init__(self, domain: str, browser: str):
        self.domain = domain
        self.browser = browser

    def get_driver(self):
        """
            This method checks for the driver input and 
            returns the driver if it is supported

        Returns:
            Returns a status(boolean) and 
            driver details if browser is supported 
            or message if it is not supported
        """
        if self.browser in SUPPORTED_BROWSERS:
            if self.browser == "chrome":
                return True, webdriver.Chrome()
            elif self.browser == "edge":
                return True, webdriver.Edge()
            elif self.browser == "firefox":
                return True, webdriver.Firefox()
        return False, f"{self.browser} browser is not supported"

    def encode_domain_name(self) -> str:
        """
            This method returns encoded domain name for API usage
        
        """
        return binascii.hexlify(self.domain.encode()).decode()
    
    def scrape_domain_data(self) -> dict:
        """
            This method connects to browser and get domain
            related data from dnstwist website

        Returns:
            Returns a dictionary with status(boolean),
            a list of dictionary containing domain and ip,
            and message regarding the status

            Eg: 
                {"status": True,
                "data":[{'domain': 'www.google.com','ip': '172.217.16.228'},
                {'domain': 'login-google.com','ip': '199.231.164.178'}],
                "message": "Scraping completed successfully"}

            Eg:

                if browser not supported:

                {"status": False, "data":[],"message":"safari browser is not supported"}

                if some exception occurs:

                {"status": False, "data":[], "message":"Exception: <exception reason>"}
        """
        try:
            encoded_domain = self.encode_domain_name()
            url = f"{DNSTWISTER_ENDPOINT}/search?ed={encoded_domain}"
            status, driver = self.get_driver()
            if status:
                driver.get(url)
                WebDriverWait(driver, 60).until(EC.presence_of_element_located((By.LINK_TEXT, "export csv")))
                scrape_data = driver.find_elements_by_xpath('//tr[@class="domain-row resolved"]')
                processed_data = []
                for item in enumerate(scrape_data):
                    data = re.sub(r'\([^)]*\)', '', item[1].text)
                    data = data.replace('\n', ' ')
                    data = data.split(" ")
                    data = [x for x in data if x]
                    puny_code = idna.encode(data[0]).decode()
                    processed_data.append(
                        {
                        "domain": data[0], 
                        "puny_code_domain": puny_code if puny_code != data[0] else "",
                        "ip": data[1]
                        }
                    )
                driver.quit()
                return {"status": True, "data": processed_data,
                        "message": "Scraping completed successfully"}
            return {"status": False, "data": [], "message": driver}
        except Exception as e:
            return {"status": False, "data": [],"message": f"Exception: {str(e)}"}

if __name__ == "__main__":
    output = ScrapeData("www.google.com", "chrome").scrape_domain_data()
    print(output)

{'status': True, 'data': [{'domain': 'www.google.org', 'puny_code_domain': '', 'ip': '216.239.32.27'}, {'domain': 'ww.google.com', 'puny_code_domain': '', 'ip': '142.250.200.46'}, {'domain': 'www.google.co', 'puny_code_domain': '', 'ip': '142.250.187.206'}, {'domain': 'www.google.net', 'puny_code_domain': '', 'ip': '142.250.200.36'}, {'domain': 'www.google.de', 'puny_code_domain': '', 'ip': '172.217.16.227'}, {'domain': 'www.google.tk', 'puny_code_domain': '', 'ip': '142.250.200.3'}, {'domain': 'wwwngoogle.com', 'puny_code_domain': '', 'ip': '199.59.243.225'}, {'domain': 'wwwgoogle.com', 'puny_code_domain': '', 'ip': '216.58.212.228'}, {'domain': 'www.go0ogle.com', 'puny_code_domain': '', 'ip': '95.211.189.138'}, {'domain': 'www.gookgle.com', 'puny_code_domain': '', 'ip': '103.224.182.226'}, {'domain': 'wwwg.oogle.com', 'puny_code_domain': '', 'ip': '104.21.19.57'}, {'domain': 'www.googlwe.com', 'puny_code_domain': '', 'ip': '95.211.189.137'}, {'domain': 'www.googloe.com', 'puny_code_d

In [233]:
import copy

domain_data = copy.deepcopy(output['data'])
domain_data

[{'domain': 'www.google.org', 'puny_code_domain': '', 'ip': '216.239.32.27'},
 {'domain': 'ww.google.com', 'puny_code_domain': '', 'ip': '142.250.200.46'},
 {'domain': 'www.google.co', 'puny_code_domain': '', 'ip': '142.250.187.206'},
 {'domain': 'www.google.net', 'puny_code_domain': '', 'ip': '142.250.200.36'},
 {'domain': 'www.google.de', 'puny_code_domain': '', 'ip': '172.217.16.227'},
 {'domain': 'www.google.tk', 'puny_code_domain': '', 'ip': '142.250.200.3'},
 {'domain': 'wwwngoogle.com', 'puny_code_domain': '', 'ip': '199.59.243.225'},
 {'domain': 'wwwgoogle.com', 'puny_code_domain': '', 'ip': '216.58.212.228'},
 {'domain': 'www.go0ogle.com', 'puny_code_domain': '', 'ip': '95.211.189.138'},
 {'domain': 'www.gookgle.com',
  'puny_code_domain': '',
  'ip': '103.224.182.226'},
 {'domain': 'wwwg.oogle.com', 'puny_code_domain': '', 'ip': '104.21.19.57'},
 {'domain': 'www.googlwe.com', 'puny_code_domain': '', 'ip': '95.211.189.137'},
 {'domain': 'www.googloe.com', 'puny_code_domain': '

# Code for REST API for fetching domain registration date and country using Flask and whois cli

## Note: Using available trial API's from internet is creating some issues in terms of data format consistency and query limits,  so I have created a rest api by making use of whois cli to query the registration date and the country. For the next Async and Sync API call codes, domain registration date and country are extracted using this API. This API is running in an Azure Windows 11 VM.


In [None]:
from flask import Flask
import subprocess
import re
import idna

app = Flask(__name__)

@app.get('/get_domain_details/<domain>')
def get_domain_details(domain):
    creation_date = ""
    registrant_country = ""
    command = f"whois {domain}"
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output,error = process.communicate()
    output_str = output.decode("utf-8")
    creation_date_pattern = r"Creation Date: (\d{4}-\d{2}-\d{2})"
    registrant_country_pattern = r"Registrant Country: (\w+)"
    creation_date_match = re.search(creation_date_pattern, output_str)
    if creation_date_match:
        creation_date = creation_date_match.group(1)
    registrant_country_match = re.search(registrant_country_pattern, output_str)
    if registrant_country_match:
        registrant_country = registrant_country_match.group(1)
    return {"domain":idna.decode(domain), "reg_date":creation_date, "country":registrant_country}

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=True)

# Code to get the Domain Registration Date and Country through Async API calls

## Note: Due to some limitation of jupyter notebook, Async API calls are very difficult to implement. But the same code will work standalone. Synchronous API call code is also attached below this code which is used for the time being to get the data.


In [234]:
"""
    Module to get the Domain registration date 
    and country using Asynchronous API call

"""

import aiohttp
import asyncio

API_ENDPOINT = "http://20.67.244.20:8000"

class AsyncAPICall:
    """
        This class does Asynchronous API Calls and returns 
        the response of all the requests in a list
        
        Attribute:
        domain_list: list
                    all typesquated scraped domain data
        
    """
    
    def __init__(self, domain_list: list):
        self.domain_list = domain_list

    async def fetch_domain_data(self, session, url: str):
        """
            This method uses session.get() to make an asynchronous HTTP GET request
            and returns the API response in json format
        
        """
        async with session.get(url) as response:
            return await response.json()

    async def main(self) -> list:
        """
            This method is the main function which creates ClientSession for 
            making asynchronous HTTP requests
            
            'asyncio.gather()' runs the tasks concurrently and gather their results
        
        """
        results = []
        async with aiohttp.ClientSession() as session:
            url = f"{API_ENDPOINT}/get_domain_details"
            tasks = [self.fetch_domain_data(session, f"{url}/{domain['domain']}") for domain in self.domain_list]
            results = await asyncio.gather(*tasks)
        return results
    
    def merge_domain_details(self, data_async: list) -> list:
        """
            This function merges the async api output list 
            with the input domain list

        Args:
            data_async (list): async api output

        Returns:
            list: merged input list of domain dict with the 
            l     location and registartion date
        """
        new_dict = {x['domain']: x for x in self.domain_list}
        for item in data_async:
            if item['domain'] in new_dict:
                new_dict[item['domain']].update(item)
            else:
                new_dict[item['domain']] = item
        merged_list = list(new_dict.values())
        return merged_list

if __name__ == "__main__":
    async_obj = AsyncAPICall(domain_data)
    data = asyncio.run(async_obj.main())
    final_output = async_obj.merge_domain_details(data)
    print(final_output)

RuntimeError: asyncio.run() cannot be called from a running event loop

# Code to get the Domain Registration Date and Country through Synchronous API calls

In [235]:
"""
    Module to get the Domain registration date 
    and country using Synchronous API call

"""

import requests

API_ENDPOINT = "http://20.67.244.20:8000"

class APICall:
    """
     This class does Sync API calls to get 
     domain registration date and country
     through rest api
     
     Attribute:
        domain_list: list
                    all typesquated scraped domain data
    
    """
    def __init__(self, domain_list: list):
        self.domain_list = domain_list
    
    def get_api_call(self, domain_name: str) -> tuple:
        """
            This method do get API call with three retries 
            if there is any error from API end and returns the data
        
        """
        base_url = f"{API_ENDPOINT}/get_domain_details"
        url = f"{base_url}/{domain_name}"
        retry = 3
        response = ""
        while retry > 0:
            response = requests.get(url=url, timeout=60)
            if response.status_code == 200:
                return True, response.json()
            else:
                retry = retry - 1
                time.sleep(5)
        return False, str(response.content)         
        
    def main(self) -> tuple:
        """
            This is the main method
        
        """
        for domain in self.domain_list:
            domain_name = domain["domain"]
            if domain['puny_code_domain'] != "":
                domain_name = domain['puny_code_domain']
            status, d_data = self.get_api_call(domain_name)
            if status:
                domain.update(d_data)
            else:
                return False, f"Failed to get data from API. Error: {d_data}"
        return True, self.domain_list
    
if __name__ == "__main__":
    status, out = APICall(domain_data).main()  
    print(out)

[{'domain': 'www.google.org', 'puny_code_domain': '', 'ip': '216.239.32.27', 'country': 'US', 'reg_date': '1998-10-21'}, {'domain': 'ww.google.com', 'puny_code_domain': '', 'ip': '142.250.200.46', 'country': 'US', 'reg_date': '1997-09-15'}, {'domain': 'www.google.co', 'puny_code_domain': '', 'ip': '142.250.187.206', 'country': '', 'reg_date': ''}, {'domain': 'www.google.net', 'puny_code_domain': '', 'ip': '142.250.200.36', 'country': 'US', 'reg_date': '1999-03-15'}, {'domain': 'www.google.de', 'puny_code_domain': '', 'ip': '172.217.16.227', 'country': '', 'reg_date': ''}, {'domain': 'www.google.tk', 'puny_code_domain': '', 'ip': '142.250.200.3', 'country': '', 'reg_date': ''}, {'domain': 'wwwngoogle.com', 'puny_code_domain': '', 'ip': '199.59.243.225', 'country': '', 'reg_date': '2011-05-16'}, {'domain': 'wwwgoogle.com', 'puny_code_domain': '', 'ip': '216.58.212.228', 'country': 'US', 'reg_date': '1999-03-03'}, {'domain': 'www.go0ogle.com', 'puny_code_domain': '', 'ip': '95.211.189.138

# Observation:
## Execution time of Async API call was aprox. 9 to 10 times faster than the Sync API call

In [236]:
new_output = copy.deepcopy(out)
new_output

[{'domain': 'www.google.org',
  'puny_code_domain': '',
  'ip': '216.239.32.27',
  'country': 'US',
  'reg_date': '1998-10-21'},
 {'domain': 'ww.google.com',
  'puny_code_domain': '',
  'ip': '142.250.200.46',
  'country': 'US',
  'reg_date': '1997-09-15'},
 {'domain': 'www.google.co',
  'puny_code_domain': '',
  'ip': '142.250.187.206',
  'country': '',
  'reg_date': ''},
 {'domain': 'www.google.net',
  'puny_code_domain': '',
  'ip': '142.250.200.36',
  'country': 'US',
  'reg_date': '1999-03-15'},
 {'domain': 'www.google.de',
  'puny_code_domain': '',
  'ip': '172.217.16.227',
  'country': '',
  'reg_date': ''},
 {'domain': 'www.google.tk',
  'puny_code_domain': '',
  'ip': '142.250.200.3',
  'country': '',
  'reg_date': ''},
 {'domain': 'wwwngoogle.com',
  'puny_code_domain': '',
  'ip': '199.59.243.225',
  'country': '',
  'reg_date': '2011-05-16'},
 {'domain': 'wwwgoogle.com',
  'puny_code_domain': '',
  'ip': '216.58.212.228',
  'country': 'US',
  'reg_date': '1999-03-03'},
 {'d

# VirusTotal Report Generation Code

### Note: Few domains country and registration date which was not available in the whois command output is pulled using the "whois" data available in the VirusTotal API response.

In [237]:
"""
Virus Total API integration module

"""
import re
import json
import base64
import requests


VT_ENDPOINT = "https://www.virustotal.com"

class VTReport:
    """
        This class checks an IP on Virus Total

    Attribute:
        ip: str
            ip address
            
        api_key: str
                 VirusTotal API key
    """
    def __init__(self, ip: str, api_key: str):
        self.ip = ip
        self.api_key = api_key

    def decode_api_key(self):
        """
            This method decodes base64 encoded api_key

        Returns:
            str: decoded api_key
        """
        return base64.b64decode(self.api_key)
    
    def update_api_key(self):
        """
            This method updates the api_key with the decoded api_key
        
        """
        self.api_key = self.decode_api_key()
        
    def get_domain_report(self) -> tuple:
        """
            This method get the Virus Total report for an IP

        Returns:
            Returns the required Virus Total report details
            Eg:
            True,
            {
                last_analysis_stats:{
                                "malicious": 5,
                                "suspicious": 0,
                                "harmless": 60
                            }
            }
        """
        try:
            url = f"{VT_ENDPOINT}/api/v3/ip_addresses/{self.ip}"
            headers = {
                "accept": "application/json",
                "x-apikey": self.api_key
            }
            response = requests.get(url, headers=headers, timeout=60)
            vt_data = json.loads(response.text)
            if response.status_code == 200:
                if "data" in vt_data and \
                    "attributes" in vt_data['data'] and \
                    "last_analysis_stats" in vt_data['data']['attributes']:
                    last_analysis_stats = vt_data["data"]["attributes"]["last_analysis_stats"]
                    del last_analysis_stats["timeout"]
                    del last_analysis_stats["undetected"]
                    #"whois" data available in the VirusTotal API response
                    whois_data = vt_data["data"]["attributes"]["whois"]
                    country, reg_date = self.process_whois(whois_data)
                    return True, {"last_analysis_stats":last_analysis_stats, \
                                  "country": country, "reg_date": reg_date}
                else:
                    return False, "Required details not found in the api response"
            else:
                return False, str(response.content)
        except Exception as e:
            return False, f"Exception: {str(e)}"
        
    def process_whois(self, whois: str) -> tuple:
        country = ""
        reg_date = ""
        country_pattern = r"Country: (\w+)"
        regdate_pattern = r"RegDate: (\d{4}-\d{2}-\d{2})"
        country_match = re.search(country_pattern, whois)
        if country_match:
            country = country_match.group(1)
        regdate_match = re.search(regdate_pattern, whois)
        if regdate_match:
            reg_date = regdate_match.group(1)
        return country, reg_date

    def main(self) -> tuple:
        """
            Main function to generate a domain VT report
        """
        self.update_api_key()
        return self.get_domain_report()
    
if __name__ == '__main__':
    for item in new_output:
        #Base64 encoded VirusTotal API Key
        encoded_api_key = "ZjkzNzIxOGJjOTE0YmIxMGE4YWM1ZDkzMTRkMGZjNjIxZWRkYjVmNDZlMmQ5NGUwOGE1ZTQzMGNjNzJjNDg4YQ=="
        status, data = VTReport(item['ip'], encoded_api_key).main()
        if status:
            item.update(data['last_analysis_stats'])
            if item["country"] == "":
                item.update({"country":data["country"]})
            if item["reg_date"] == "":
                item.update({"reg_date":data["reg_date"]})
    print(new_output)

[{'domain': 'www.google.org', 'puny_code_domain': '', 'ip': '216.239.32.27', 'country': 'US', 'reg_date': '1998-10-21', 'malicious': 0, 'suspicious': 0, 'harmless': 66}, {'domain': 'ww.google.com', 'puny_code_domain': '', 'ip': '142.250.200.46', 'country': 'US', 'reg_date': '1997-09-15', 'malicious': 0, 'suspicious': 0, 'harmless': 67}, {'domain': 'www.google.co', 'puny_code_domain': '', 'ip': '142.250.187.206', 'country': 'US', 'reg_date': '2012-05-24', 'malicious': 0, 'suspicious': 0, 'harmless': 66}, {'domain': 'www.google.net', 'puny_code_domain': '', 'ip': '142.250.200.36', 'country': 'US', 'reg_date': '1999-03-15', 'malicious': 0, 'suspicious': 0, 'harmless': 66}, {'domain': 'www.google.de', 'puny_code_domain': '', 'ip': '172.217.16.227', 'country': 'US', 'reg_date': '2012-04-16', 'malicious': 0, 'suspicious': 0, 'harmless': 66}, {'domain': 'www.google.tk', 'puny_code_domain': '', 'ip': '142.250.200.3', 'country': 'US', 'reg_date': '2012-05-24', 'malicious': 0, 'suspicious': 0, '

In [238]:
vt_report = copy.deepcopy(new_output)
vt_report

[{'domain': 'www.google.org',
  'puny_code_domain': '',
  'ip': '216.239.32.27',
  'country': 'US',
  'reg_date': '1998-10-21',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 66},
 {'domain': 'ww.google.com',
  'puny_code_domain': '',
  'ip': '142.250.200.46',
  'country': 'US',
  'reg_date': '1997-09-15',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 67},
 {'domain': 'www.google.co',
  'puny_code_domain': '',
  'ip': '142.250.187.206',
  'country': 'US',
  'reg_date': '2012-05-24',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 66},
 {'domain': 'www.google.net',
  'puny_code_domain': '',
  'ip': '142.250.200.36',
  'country': 'US',
  'reg_date': '1999-03-15',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 66},
 {'domain': 'www.google.de',
  'puny_code_domain': '',
  'ip': '172.217.16.227',
  'country': 'US',
  'reg_date': '2012-04-16',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 66},
 {'domain': 'www.google.tk',
  'puny_code_domain': '',
  'ip': '142.250.200.3'

In [239]:
import pandas as pd

vt_df = pd.DataFrame(vt_report)
vt_df

Unnamed: 0,domain,puny_code_domain,ip,country,reg_date,malicious,suspicious,harmless
0,www.google.org,,216.239.32.27,US,1998-10-21,0,0,66
1,ww.google.com,,142.250.200.46,US,1997-09-15,0,0,67
2,www.google.co,,142.250.187.206,US,2012-05-24,0,0,66
3,www.google.net,,142.250.200.36,US,1999-03-15,0,0,66
4,www.google.de,,172.217.16.227,US,2012-04-16,0,0,66
...,...,...,...,...,...,...,...,...
129,login-google.com,,20.42.95.243,US,2016-10-18,0,0,0
130,www.google.cn,,216.58.201.99,US,2012-01-27,0,0,67
131,www.google.info,,142.250.179.227,US,2001-07-31,0,0,64
132,auth-google.com,,128.199.247.177,US,2019-06-13,0,0,61


In [240]:
l_d_input = copy.deepcopy(vt_report)
l_d_input

[{'domain': 'www.google.org',
  'puny_code_domain': '',
  'ip': '216.239.32.27',
  'country': 'US',
  'reg_date': '1998-10-21',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 66},
 {'domain': 'ww.google.com',
  'puny_code_domain': '',
  'ip': '142.250.200.46',
  'country': 'US',
  'reg_date': '1997-09-15',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 67},
 {'domain': 'www.google.co',
  'puny_code_domain': '',
  'ip': '142.250.187.206',
  'country': 'US',
  'reg_date': '2012-05-24',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 66},
 {'domain': 'www.google.net',
  'puny_code_domain': '',
  'ip': '142.250.200.36',
  'country': 'US',
  'reg_date': '1999-03-15',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 66},
 {'domain': 'www.google.de',
  'puny_code_domain': '',
  'ip': '172.217.16.227',
  'country': 'US',
  'reg_date': '2012-04-16',
  'malicious': 0,
  'suspicious': 0,
  'harmless': 66},
 {'domain': 'www.google.tk',
  'puny_code_domain': '',
  'ip': '142.250.200.3'

# Code for Levenshtein Distance

In [243]:
import Levenshtein

LEVENSHTEIN_THRESHOLD = 4

class LevenshteinDistance:
    """
        This Class calculates Levenshtein Distance and 
        sort the domain list
        
        Attribute:
        domain: str
                domain that need to be searched for
                Typosquatting related entries

        domain_data: list
                    all typesquated scraped domain data
    """
    def __init__(self, domain_data, domain):
        self.domain_data = domain_data
        self.domain = domain

    def domain_levenshtein_sort(self):
        """
            This Method calculates the levenstein distance and sort the domain 
            details list with ascending order of levenshtein distance by removing
            domains with levenshtein distance greater than the LEVENSTEIN_THRESHOLD 
        
        """
        output_data = []
        for item in self.domain_data:
            distance = Levenshtein.distance(self.domain, item["domain"])
            if distance <= LEVENSHTEIN_THRESHOLD and item["domain"] != self.domain:
                item['levenshtein_distance'] = distance
                output_data.append(item)
        sorted_data = sorted(output_data, key=lambda x: x['levenshtein_distance'])
        return sorted_data
    
if __name__ == '__main__':
    data = LevenshteinDistance(l_d_input,'www.google.com').domain_levenshtein_sort()
    print(data)

[{'domain': 'ww.google.com', 'puny_code_domain': '', 'ip': '142.250.200.46', 'country': 'US', 'reg_date': '1997-09-15', 'malicious': 0, 'suspicious': 0, 'harmless': 67, 'levenshtein_distance': 1}, {'domain': 'www.google.co', 'puny_code_domain': '', 'ip': '142.250.187.206', 'country': 'US', 'reg_date': '2012-05-24', 'malicious': 0, 'suspicious': 0, 'harmless': 66, 'levenshtein_distance': 1}, {'domain': 'wwwngoogle.com', 'puny_code_domain': '', 'ip': '199.59.243.225', 'country': '', 'reg_date': '2011-05-16', 'malicious': 9, 'suspicious': 0, 'harmless': 58, 'levenshtein_distance': 1}, {'domain': 'wwwgoogle.com', 'puny_code_domain': '', 'ip': '216.58.212.228', 'country': 'US', 'reg_date': '1999-03-03', 'malicious': 2, 'suspicious': 0, 'harmless': 64, 'levenshtein_distance': 1}, {'domain': 'www.go0ogle.com', 'puny_code_domain': '', 'ip': '95.211.189.138', 'country': 'PA', 'reg_date': '2002-09-01', 'malicious': 1, 'suspicious': 0, 'harmless': 64, 'levenshtein_distance': 1}, {'domain': 'www.g

In [244]:
ld_df = pd.DataFrame(data)
ld_df

Unnamed: 0,domain,puny_code_domain,ip,country,reg_date,malicious,suspicious,harmless,levenshtein_distance
0,ww.google.com,,142.250.200.46,US,1997-09-15,0,0,67,1
1,www.google.co,,142.250.187.206,US,2012-05-24,0,0,66,1
2,wwwngoogle.com,,199.59.243.225,,2011-05-16,9,0,58,1
3,wwwgoogle.com,,216.58.212.228,US,1999-03-03,2,0,64,1
4,www.go0ogle.com,,95.211.189.138,PA,2002-09-01,1,0,64,1
...,...,...,...,...,...,...,...,...,...
124,www.google.ru,,142.250.178.3,US,2012-05-24,0,0,65,3
125,www.google.ml,,142.250.187.227,US,2012-05-24,0,0,65,3
126,www.googlecom.com,,172.217.16.227,US,2000-04-28,0,0,66,3
127,www.google.ga,,216.58.212.195,US,2012-01-27,1,0,66,3


# Code to categorise the domains into benign, suspicious and malicious according to a grading of your own choosing, given the VT report per domain, the registration details and Levenshtein distance to genuine domain. 

## Logic:
### 1) Malicious: Data which have atleast one malicious report as per VT report are categorised as malicious.
### 2) Suspicious:  Four data's are considered and if any of the contition matches, domain is considered as suspicious.
#### a) If atleast one report for suspicious in VT report.
#### b) If the levenshtein distance is less than or equal to 2.
#### c) If the registration date of the typesquatted domain is after the threshold registration date. Threshold date is taken on the basis of when the original domain became public and profitable.
#### d) If the typesquatted domain location belongs to a suspecious country.
### 3) Benign: Rest of the domain are assumed as benign.


In [245]:
from datetime import datetime

SUSPICIOUS_COUNTRIES = ["KR"]

class CategoriseDomain:
    """
        This class categorises the domain
        
        Attribute:
        reg_date_threshold: str
                            date on which the domain
                            become popular and profitable
        domain_data: list
                    all typesquated scraped domain data
    """
    def __init__(self, domain_data, reg_date_threshold):
        self.domain_data = domain_data
        self.reg_date_threshold = datetime.strptime(reg_date_threshold, '%Y-%m-%d')
        
    def categorise_domain(self) -> list:
        """
            This method categorises the domain based on multiple conditions
        """
        categorise_data = []
        for item in self.domain_data:
            reg_date_flag = self.check_registration_date(item["reg_date"])
            reg_country_flag = self.check_registered_country(item["country"])
            if item["malicious"] > 0:
                categorise_data.append({"domain":item["domain"],"category":"malicious"})
            elif item["suspicious"] > 0 or item["levenshtein_distance"] <= 2 \
                    or reg_date_flag or reg_country_flag:
                categorise_data.append({"domain":item["domain"],"category":"suspicious"})
            else:
                categorise_data.append({"domain":item["domain"],"category":"benign"})
        return categorise_data
    
    def check_registration_date(self, registration_date) -> bool:
        """
            This method checks if the domain's registration 
            date is less than the threshold registration date
        """
        if registration_date != '':
            registration_date = datetime.strptime(registration_date, '%Y-%m-%d')
            if registration_date > self.reg_date_threshold:
                return True
        return False
    
    def check_registered_country(self, country) -> bool:
        """
            This method checks if the domain's registration 
            country belongs to suspicious countries list
        """
        if country in SUSPICIOUS_COUNTRIES:
            return True
        return False

if __name__ == "__main__":
    # The domain www.google.com went public in 2004 and became profitable 
    # in the same year, henced used 2004-01-01 as a threshold date here
    reg_date_threshold = '2004-01-01'
    category_data = CategoriseDomain(data, reg_date_threshold).categorise_domain()
    print(category_data)

[{'domain': 'ww.google.com', 'category': 'suspicious'}, {'domain': 'www.google.co', 'category': 'suspicious'}, {'domain': 'wwwngoogle.com', 'category': 'malicious'}, {'domain': 'wwwgoogle.com', 'category': 'malicious'}, {'domain': 'www.go0ogle.com', 'category': 'malicious'}, {'domain': 'www.gookgle.com', 'category': 'malicious'}, {'domain': 'www.googlwe.com', 'category': 'suspicious'}, {'domain': 'www.googloe.com', 'category': 'suspicious'}, {'domain': 'www.goozle.com', 'category': 'suspicious'}, {'domain': 'www.goohle.com', 'category': 'malicious'}, {'domain': 'www.googl.com', 'category': 'suspicious'}, {'domain': 'www.gooyle.com', 'category': 'suspicious'}, {'domain': 'www.googlde.com', 'category': 'malicious'}, {'domain': 'www.googlr.com', 'category': 'malicious'}, {'domain': 'www.gootle.com', 'category': 'suspicious'}, {'domain': 'www.googl4.com', 'category': 'suspicious'}, {'domain': 'www.googli.com', 'category': 'suspicious'}, {'domain': 'www.googlè.com', 'category': 'suspicious'

In [246]:
c_df = pd.DataFrame(category_data)
#total count in diffrent categories
c_df["category"].value_counts()

category
malicious     73
suspicious    52
benign         4
Name: count, dtype: int64

In [247]:
c_df

Unnamed: 0,domain,category
0,ww.google.com,suspicious
1,www.google.co,suspicious
2,wwwngoogle.com,malicious
3,wwwgoogle.com,malicious
4,www.go0ogle.com,malicious
...,...,...
124,www.google.ru,suspicious
125,www.google.ml,suspicious
126,www.googlecom.com,benign
127,www.google.ga,malicious
