In [None]:
!pip install requests

In [None]:
import requests
from ipaddress import ip_network
from concurrent.futures import ThreadPoolExecutor

# List of CIDR ranges
cidrs = [
    '151.101.0.0/16',
    '151.101.8.0/21',
    '151.101.16.0/21',
    '151.101.28.0/22',
    '151.101.36.0/22',
    '151.101.40.0/21',
    '151.101.48.0/21',
    '151.101.60.0/22',
    '151.101.64.0/21',
    '151.101.76.0/22',
    '151.101.80.0/20',
    '151.101.96.0/20',
    '151.101.116.0/22',
    '151.101.124.0/22',
    '151.101.128.0/19',
    '151.101.160.0/21',
    '151.101.172.0/22',
    '151.101.176.0/21',
    '151.101.192.0/21',
    '151.101.200.0/22',
    '151.101.208.0/20',
    '151.101.224.0/20',
    '151.101.240.0/21',
    '151.101.248.0/22',
    '157.5.64.0/18',
    '157.5.69.0/24',
    '157.5.70.0/23',
    '157.5.72.0/21',
    '157.5.80.0/22',
    '157.52.64.0/23',
    '157.52.66.0/24',
    '157.52.69.0/24',
    '157.52.70.0/23',
]

# Function to check each IP
def check_website(ip):
    try:
        response = requests.get(f"http://{ip}", timeout=10)  # Increased timeout for better handling
        if response.status_code == 200:
            return f"Active website at {ip}"
        else:
            return f"Server responded at {ip}, status code: {response.status_code}"
    except requests.RequestException:
        return f"No response from {ip}"

# Using ThreadPoolExecutor to manage multiple threads
def main():
    with ThreadPoolExecutor(max_workers=10) as executor:  # Adjust max_workers depending on your CPU
        for cidr in cidrs:
            network = ip_network(cidr)
            first_ip = next(network.hosts())  # Still using the first IP to demonstrate
            future = executor.submit(check_website, str(first_ip))
            print(future.result())

if __name__ == "__main__":
    main()

In [None]:
import ipaddress
import socket

def reverse_dns_lookup(ip_range):
    reverse_dns_entries = []
    # Create an IPv4 network object from the range
    network = ipaddress.ip_network(ip_range)
    # Iterate through each IP in the range
    for ip in network.hosts():  # .hosts() excludes network and broadcast addresses
        try:
            # Get the reverse DNS entry
            host, _, _ = socket.gethostbyaddr(str(ip))
            reverse_dns_entries.append((str(ip), host))
        except socket.herror:  # Handle cases where no DNS entry is found
            # reverse_dns_entries.append((str(ip), "No reverse DNS entry found"))
            continue
        except Exception as e:
            # reverse_dns_entries.append((str(ip), str(e)))
            continue
    return reverse_dns_entries

# Example usage
ip_range = '151.101.64.0/21'  # You can replace this with any other range
results = reverse_dns_lookup(ip_range)
for ip, host in results:
    print(ip, "->", host)

In [None]:
import socket
import ipaddress

def check_port(ip, port=443, timeout=3):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(timeout)
    try:
        # Attempt to connect to the IP and port
        s.connect((ip, port))
        s.close()
        return True
    except (socket.timeout, socket.error):
        return False

def list_open_ports(ip_range):
    open_ips = []
    # Create an IPv4 network object from the range
    network = ipaddress.ip_network(ip_range)
    # Iterate through each IP in the range
    for ip in network.hosts():  # .hosts() excludes network and broadcast addresses
        if check_port(str(ip)):
            open_ips.append(str(ip))
            print(f"{ip} on port 443 is open")
        else:
            print(f"{ip} on port 443 is not open")
    return open_ips

# Example usage
ip_range = '151.101.64.0/21'  # Replace this with any other range
open_ips = list_open_ports(ip_range)

In [None]:
import ipaddress
import socket
import ssl

def check_tls_connection(ip, port=443, timeout=3):
    context = ssl.create_default_context()
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    try:
        with socket.create_connection((ip, port), timeout=timeout) as sock:
            with context.wrap_socket(sock, server_hostname=ip) as ssock:
                # Try to complete the TLS handshake
                ssock.do_handshake()
                return True
    except (ssl.SSLCertVerificationError, ssl.SSLError, socket.timeout):
        # Handle SSL errors, timeouts, and certificate mismatches gracefully
        return False
    except (ConnectionResetError, OSError):
        # Handle errors such as connection reset by peer or other OS-related network errors
        return False
    except Exception as e:
        # General exception handling to catch anything unexpected
        print(f"An unexpected error occurred: {e}")
        return False

def list_successful_tls_ips(ip_range):
    successful_ips = []
    network = ipaddress.ip_network(ip_range)
    for ip in network.hosts():  # Exclude network and broadcast addresses
        if check_tls_connection(str(ip)):
            successful_ips.append(str(ip))
            print(f"Successful TLS connection at {ip}")
        # else:
        #     print(f"Failed TLS connection at {ip}")
    return successful_ips

# Example usage
ip_range = '151.101.64.0/21'  # Replace with your specific IP range
successful_ips = list_successful_tls_ips(ip_range)

In [None]:
!rm vmessping_amd64_linux.zip vmessping_amd64_linux.zip*
!wget https://github.com/v2fly/vmessping/releases/download/v0.3.4/vmessping_amd64_linux.zip
!unzip vmessping_amd64_linux.zip
!mv vmessping_amd64_linux vmessping
!ls -al

In [None]:
!vmessping

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
!cp drive/MyDrive/go/vmessping_amd64_linux vmessping_amd64_linux

In [None]:
!chmod 765 /content/vmessping_amd64_linux
!ls -l /content/vmessping_amd64_linux

In [None]:
import subprocess
import json
import base64
import ipaddress

def parse_vmess_uri(vmess_uri):
    base64_string = vmess_uri[8:]
    json_string = base64.urlsafe_b64decode(base64_string + '==').decode('utf-8')
    return json.loads(json_string)

def update_parameter(json_data, key, value):
    json_data[key] = value
    return json_data


def update_proxy_address(config_data, new_address):
    """
    Update the address parameter in the proxy configuration.

    Args:
    config_data (dict): The full configuration data as a Python dictionary.
    new_address (str): The new proxy address to set.

    Returns:
    dict: Updated configuration data.
    """
    # Navigate through the outbounds list
    for outbound in config_data.get("outbounds", []):
        # Check if the outbound tag is 'proxy' and protocol is 'vmess'
        if outbound.get("tag") == "proxy" and outbound.get("protocol") == "vmess":
            # Access the 'vnext' list inside the 'settings' dict
            vnext_list = outbound.get("settings", {}).get("vnext", [])
            for vnext in vnext_list:
                # Update the address
                vnext["address"] = new_address
    return config_data

def generate_vmess_uri(json_data):
    json_string = json.dumps(json_data)
    base64_string = base64.urlsafe_b64encode(json_string.encode('utf-8')).decode('utf-8').rstrip('=')
    return f"vmess://{base64_string}"

def check_vmess_ping(vmess_uri):
    # Use the correct command format as per your example
    command = f"./vmessping_amd64_linux -c 1 -allow-insecure {vmess_uri}"
    try:
        # Run the command and capture the output
        result = subprocess.run(command, shell=True, text=True, capture_output=True, check=False)
        # print(result.stdout)  # Display the full output for debugging or information
        if "0 success" in result.stdout:
            return False
        return True
    except subprocess.CalledProcessError as e:
        # print("Failed to run vmessping:", e)
        return False

# Example usage
vmess_uri = "vmess://..."
ip_range = '140.248.88.0/22'  # Replace with your specific IP range
# ip_range = '151.101.64.0/21'  # Replace with your specific IP range
# Create an IPv4 network object from the range
network = ipaddress.ip_network(ip_range)
# Iterate through each IP in the range
for ip in network.hosts():  # .hosts() excludes network and broadcast addresses
    data = parse_vmess_uri(vmess_uri)
    updated_data = update_parameter(data, 'add', str(ip))
    # updated_data = update_proxy_address(data, '151.101.64.81')
    new_vmess_uri = generate_vmess_uri(updated_data)

    # Check if the VMess URI is pingable
    is_pingable = check_vmess_ping(new_vmess_uri)
    if is_pingable:
        print(str(ip))
        # print("Is VMess URI pingable?", is_pingable)

In [None]:
!sudo /content/vmessping_amd64_linux -c 1 -allow-insecure vmess://eyJhZGQiOiJ0b3Quc3Bpa2VzZXJ2aWNlLm9ubGluZSIsImFpZCI6IjAiLCJhbHBuIjoiIiwiZnAiOiJyYW5kb21pemVkIiwiaG9zdCI6ImRvbS5nYW5nby5vbmxpbmUiLCJpZCI6ImEzZmVmYWEyLTJkM2YtNDc5Zi05NmViLWM2OTc0ZGFkMGRkOSIsIm5ldCI6IndzIiwicGF0aCI6Ii92bWVzcyIsInBvcnQiOiI0NDMiLCJwcyI6ImF0YWdlbGRpIiwic2N5IjoiYXV0byIsInNuaSI6Ind3dy5nb29nbGUuY29tIiwidGxzIjoidGxzIiwidHlwZSI6IiIsInYiOiIyIn0=