Import

In [3]:
import os
import time
import click
import socket
import platform
import requests
import paramiko
import warnings
import concurrent
import subprocess
import pandas as pd
from tqdm import tqdm
from bs4 import BeautifulSoup
from tabulate import tabulate
from urllib.parse import urlparse
from Wappalyzer import Wappalyzer, WebPage

Linode

In [1]:
def linode():
    host = click.prompt("Enter the IP address of the Linode server to connect to")
    username = click.prompt("Enter the limited user account to use for connecting to the Linode server")
    password = click.prompt("Enter the password for the user account", hide_input=True)

    command = "df"
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host, username=username, password=password)
    stdin, stdout, stderr = client.exec_command(command)
    print(stdout.read().decode())
    client.close()

Enumerator

In [2]:
def passive_enumerator(domain):
    """
    Uses Subfinder to enumerate subdomains for a given domain.

    Args:
        domain (str): The domain to enumerate subdomains for.

    Returns:
        set: A set of subdomains.
    """
    with open(os.devnull, 'w') as nullfile:
        subfinder_output = subprocess.check_output(['subfinder', '-d', domain], stderr=nullfile)

    subdomains = set(subfinder_output.decode('utf-8').strip().split('\n'))

    return subdomains

def active_enumerator(domain):
    """
    Enumerates subdomains for a given domain, and checks which ones are active.
    Args:
        domain (str): The domain to enumerate subdomains for.
    Returns:
        set or None: A set of subdomains, or None if no subdomains were found.
    """
    subdomains = set()
    dir_path = "../lists/active_enumerator"
    if not os.path.isdir(dir_path):
        raise FileNotFoundError(f"Subdomains directory '{dir_path}' not found")
    file_names = [file_name for file_name in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, file_name)) and file_name.endswith(".txt")]
    total_files = len(file_names)
    for i, file_name in enumerate(file_names):
        file_path = os.path.join(dir_path, file_name)
        with open(file_path, "r") as subdomain_file:
            for line in tqdm(subdomain_file, desc="Active enumeration", unit="Subdomains"):
                subdomain = line.strip()
                full_domain = subdomain + "." + domain
                try:
                    response = requests.head("https://" + full_domain, timeout=3)
                    if response.status_code < 400:
                        subdomains.add(subdomain)
                except:
                    pass
    if subdomains:
        return subdomains
    else:
        return set()

HTTP Response fetcher

In [None]:
def fetch_response(subdomains, technology):
    response_table = []
    for subdomain in tqdm(subdomains, desc='Testing subdomains', unit='subdomain', leave=False):
        try:
            response = requests.get(f'http://{subdomain}')
            response_table.append([subdomain, response.status_code, response.reason, ''])
            if technology:
                tech = fetch_tech(subdomain)
                response_table[-1][-1] = tech
            time.sleep(0.5)  # Add a delay to avoid overloading the target website
        except requests.exceptions.RequestException as e:
            print(f"An error occurred while trying to connect to '{subdomain}': {str(e)}")
            continue
        except (socket.gaierror, socket.error) as e:
            print(f"An error occurred while trying to connect to '{subdomain}': {str(e)}")
            continue
    return response_table

Tech fetcher

In [6]:
def fetch_tech(url):
    if not url.startswith('http'):
        url = 'https://' + url
    webpage = WebPage.new_from_url(url)
    wappalyzer = Wappalyzer.latest()
    technologies = []
    
    max_retries = 5
    retry_count = 0
    while retry_count < max_retries:
        try:
            for tech in wappalyzer.analyze(webpage):
                technologies.append(tech)
            return technologies
        except Exception as e:
            retry_count += 1
            print(f"Error fetching technologies for {url}: {e}")
            time.sleep(5)
    print(f"Max retries reached for {url}")
    return None

In [7]:
Intruder

NameError: name 'Intruder' is not defined

In [None]:
def intruder(url, num_threats=6, num_requests=200, num_retries=4, pause_before_retry=3000):
    """
    Sends HTTP requests to a given URL with a specified number of threats and requests.

    Args:
        url (str): The URL to send the requests to (i.e. subdomain.domain.com).
        num_threats (int, optional): The number of parallel threats to send requests from. Defaults to 6.
        num_requests (int, optional): The number of requests to send from each threat. Defaults to 200.
        num_retries (int, optional): The number of times to retry failed requests. Defaults to 4.
        pause_before_retry (int, optional): The number of milliseconds to wait before retrying a failed request. 
            Defaults to 3000.
    """
    if not url.startswith('http://') and not url.startswith('https://'):
        url = 'https://' + url
    prepared_request = requests.Request('GET', url).prepare()
    results = []
    with requests.Session() as session:
        adapter = requests.adapters.HTTPAdapter(max_retries=num_retries)
        session.mount('http://', adapter)
        session.mount('https://', adapter)
        pool = session.send
        for i in range(num_threats):
            threat_results = []
            with tqdm(total=num_requests, desc=f'Threat {i+1}') as pbar:
                for j in range(num_requests):
                    response = pool(prepared_request)
                    threat_results.append(response)
                    if response.status_code == 200:
                        break
                    time.sleep(pause_before_retry/1000)
                    pbar.update(1)
            results.append(threat_results)
    return results