In [None]:
pip install scapy

In [None]:
#obtaing an EXAMPLE target_ip_address

import socket

def get_ip_address(domain):
    try:
        ip = socket.gethostbyname(domain)
        return ip
    except socket.gaierror:
        return None

domains = ["example.com", "google.com"]
ip_addresses = {domain: get_ip_address(domain) for domain in domains}

for domain, ip in ip_addresses.items():
    if ip:
        print(f"{domain}: {ip}")
    else:
        print(f"{domain}: Could not resolve IP")

In [5]:
from scapy.all import *



In [6]:
import time

In [7]:
# Define target server details

# 18.136.119.119
target_ip = "18.136.119.119" 
target_port = 443 

In [None]:
# Function to load packets from a capture file

def load_packets_from_capture(file_path):
    return rdpcap(file_path)

# Function to identify and extract all IP addresses and ports from a capture file
def extract_ip_addresses_and_ports(packets):
    ip_addresses_ports = set()
    for packet in packets:
        if packet.haslayer(IP):
            ip_layer = packet[IP]
            if packet.haslayer(TCP):
                tcp_layer = packet[TCP]
                src_ip = ip_layer.src
                dst_ip = ip_layer.dst
                src_port = tcp_layer.sport
                dst_port = tcp_layer.dport
                ip_addresses_ports.add((src_ip, src_port))
                ip_addresses_ports.add((dst_ip, dst_port))
            elif packet.haslayer(UDP):
                udp_layer = packet[UDP]
                src_ip = ip_layer.src
                dst_ip = ip_layer.dst
                src_port = udp_layer.sport
                dst_port = udp_layer.dport
                ip_addresses_ports.add((src_ip, src_port))
                ip_addresses_ports.add((dst_ip, dst_port))
    return ip_addresses_ports

# Function to display IP addresses and ports
def display_ip_addresses_ports(ip_addresses_ports):
    print("Extracted IP Addresses and Ports:")
    for ip, port in ip_addresses_ports:
        print(f"IP: {ip}, Port: {port}")

In [8]:
# Defining the base-probe-1 , base-probe-2 , probe with invalid length to trigger an immediate drop ,probe to determine RST threshold

def create_probes(target_ip,target_port):
    probes = []
    # Base Probe 1: 16-byte OpenVPN Client Reset
    base_probe_1 = IP(dst=target_ip)/TCP(dport=target_port, flags="PA")/Raw(load=b'\x38' * 16)
    probes.append(base_probe_1)

    # Base Probe 2: Similar to Base Probe 1 but with the last byte stripped off
    base_probe_2 = IP(dst=target_ip)/TCP(dport=target_port, flags="PA")/Raw(load=b'\x38' * 15)
    probes.append(base_probe_2)

    # Additional probe with invalid length to trigger an immediate drop
    additional_probe_invalid_length = IP(dst=target_ip)/TCP(dport=target_port, flags="PA")/Raw(load=b'\xFF\xFF' + b'\x00' * 14)
    probes.append(additional_probe_invalid_length)

    # Additional probe to determine RST threshold
    additional_probe_rst_threshold = IP(dst=target_ip)/TCP(dport=target_port, flags="PA")/Raw(load=b'\x00' * 2000)
    probes.append(additional_probe_rst_threshold)

    return probes

In [9]:
# Function to send probes and capture responses

def send_probes(probes):
    responses = []
    for probe in probes:
        response = sr1(probe, timeout=2, verbose=0)
        responses.append((probe, response))
    return responses

In [16]:
# Function to analyze the responses from base-probe-1 , base-probe-2 , probe with invalid length to trigger an immediate drop ,probe to determine RST threshold

def analyze_responses(responses):
    print("-----------------------Probe Analysis-------------------------------------------")
    openvpn_server_detected = False
    probe_behavior = {
        'base_probe_1': False,
        'base_probe_2': False,
        'flag': False,
        'invalid_length_probe': False,
        'rst_threshold_probe': False
    }
    
    response_times = {
        'base_probe_1': None,
        'base_probe_2': None
    }
    
    hmac_detection = False

    for probe, response in responses:
        probe_time = time.time()
        print(f"Probe: {probe.summary()}")
        
        if response:
            response_time = time.time()
            response_delay = response_time - probe_time
            print(f"Response: {response.summary()} (Delay: {response_delay}s)")
            
            if response.haslayer(TCP):
                tcp_layer = response.getlayer(TCP)
                if tcp_layer.flags & 0x04:  # RST flag
                    print("Received RST: Potential OpenVPN server with specific behavior")
                    openvpn_server_detected = True
                    if probe.haslayer(Raw) and len(probe[Raw].load) == 2000:
                        probe_behavior['rst_threshold_probe'] = True
                else:
                    print("Received other TCP response: Likely not an OpenVPN server")
            else:
                print("Response received but not a TCP response: Likely not an OpenVPN server")
        else:
            print("No response or silent drop")
            
            if probe.haslayer(Raw):
                raw_layer = probe.getlayer(Raw)
                payload_length = len(raw_layer.load)
                
                if payload_length == 16:
                    print("Silent drop for first probe: Possible OpenVPN server with HMAC (dropped at decryption routine)")
                    probe_behavior['base_probe_1'] = True
                    response_times['base_probe_1'] = time.time() - probe_time
                
                elif payload_length == 15:
                    print("Silent drop for second probe: Possible OpenVPN server with HMAC (dropped at packet reassembly routine)")
                    probe_behavior['base_probe_2'] = True
                    response_times['base_probe_2'] = time.time() - probe_time
                
                elif payload_length > 2:
                    first_two_bytes = int.from_bytes(raw_layer.load[:2], 'big')
                    
                    if first_two_bytes > 1627:
                        print("Silent drop for probe with invalid packet length: Likely an OpenVPN server")
                        probe_behavior['invalid_length_probe'] = True
                    
                    elif payload_length == 2000:
                        print("Silent drop for large probe: Possible OpenVPN server with RST behavior")
                        probe_behavior['rst_threshold_probe'] = True
                    
                    else:
                        print("Silent drop for other probe: Likely not an OpenVPN server")
                
                else:
                    print("Silent drop for other probe: Likely not an OpenVPN server")

    # Analyzing response times for HMAC detection
    if response_times['base_probe_1'] and response_times['base_probe_2']:
        print(f"Response time for base_probe_1: {response_times['base_probe_1']:.5f}")
        print(f"Response time for base_probe_2: {response_times['base_probe_2']:.5f}")
        
    if round(response_times['base_probe_1'], 3) != round(response_times['base_probe_2'], 3):
        print("Possible HMAC detection: Faster RST for the first probe and delayed drop for the second probe")
        hmac_detection = True
        probe_behavior['flag'] = True
    else:
        print("No significant HMAC detection difference")
        hmac_detection = False
    
    print("-------------------------------------------------------------------------------------------")
    print(probe_behavior)
    
    if all([probe_behavior['base_probe_1'], probe_behavior['base_probe_2'], probe_behavior['flag'], probe_behavior['rst_threshold_probe']]) and hmac_detection:
        print("The server is likely an OpenVPN server with HMAC detection.")
        openvpn_server_detected = True
    elif all([probe_behavior['base_probe_1'], probe_behavior['base_probe_2'], probe_behavior['flag'], probe_behavior['rst_threshold_probe']]):
        print("The server is likely an OpenVPN server.")
        openvpn_server_detected = True
    elif all([probe_behavior['base_probe_1'], probe_behavior['base_probe_2'], probe_behavior['flag'], probe_behavior['invalid_length_probe']]) and hmac_detection:
        print("The server is likely an OpenVPN server with HMAC detection.")
        openvpn_server_detected = True 
    elif all([probe_behavior['base_probe_1'], probe_behavior['base_probe_2'], probe_behavior['flag'], probe_behavior['invalid_length_probe']]):
        print("The server is likely an OpenVPN server.")
        openvpn_server_detected = True
    elif all([probe_behavior['base_probe_1'], probe_behavior['base_probe_2'], probe_behavior['flag']]) and hmac_detection:
        print("The server is likely an OpenVPN server.")
        openvpn_server_detected = True
    elif all([probe_behavior['base_probe_1'], probe_behavior['base_probe_2'], probe_behavior['flag']]):
        print("The server is likely an OpenVPN server.")
        openvpn_server_detected = True 
    else:
        print("The server is not likely an OpenVPN server.")
        openvpn_server_detected = False
    
    print("--------------------------------------------------------------------------------------------")

In [11]:
# Define additional probes for port-sharing detection

def create_port_sharing_probes(target_ip, target_port):
    probes = []
    
    # Base probes to detect OpenVPN
    openvpn_probe = IP(dst=target_ip)/TCP(dport=target_port, flags="S")
    probes.append(openvpn_probe)
    
    # HTTP probe
    http_probe = IP(dst=target_ip)/TCP(dport=target_port, flags="PA")/Raw(load="GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
    probes.append(http_probe)

    # TLS Client Hello probe
    tls_probe = IP(dst=target_ip)/TCP(dport=target_port, flags="PA")/Raw(load=b'\x16\x03\x01\x00\x2e\x01\x00\x00\x2a\x03\x03' + b'\x00' * 40)
    probes.append(tls_probe)

    # SSH probe
    ssh_probe = IP(dst=target_ip)/TCP(dport=target_port, flags="PA")/Raw(load=b'\x53\x53\x48\x2d\x32\x2e\x30\x2d\x4f\x70\x65\x6e\x53\x53\x48\x5f\x37\x2e\x34\x20\x44\x65\x62\x69\x61\x2d\x33\x2e\x30\x2e\x31\x30')
    probes.append(ssh_probe)

    return probes

In [12]:
def send_probe(probe):
    response = sr1(probe, timeout=2, verbose=False)
    return response

def send_probes(probes):
    responses = [(probe, send_probe(probe)) for probe in probes]
    return responses

In [13]:
# Function to analyze co-location and port-sharing responses

def analyze_port_sharing_responses(responses):
    print("-----------------------Co-Location Analysis-------------------------------------------")
    co_location_detected = False
    probe_results = []

    for probe, response in responses:
        if response:
            probe_results.append((probe.summary(), response.summary()))
            if response.haslayer(Raw):
                raw_data = bytes(response[Raw])
                if b"HTTP" in raw_data:
                    print("Response indicates port-sharing with an HTTP service")
                    co_location_detected = True
                    break
                elif b"SSH" in raw_data:
                    print("Response indicates port-sharing with an SSH service")
                    co_location_detected = True
                    break
                elif b"\x16\x03" in raw_data:
                    print("Response indicates port-sharing with a TLS service")
                    co_location_detected = True
                    break
                else:
                    print("Response indicates a service other than OpenVPN, HTTP, SSH, or TLS")
            else:
                print("Response received but does not contain raw data: Likely not an OpenVPN server")
        else:
            probe_results.append((probe.summary(), "No response or silent drop (possible OpenVPN server)"))

    # Additional probing for HTTP, TLS, and SSH protocols
    if not co_location_detected:
        target_ip = responses[0][0][IP].dst
        target_port = responses[0][0][TCP].dport
        additional_probes = create_port_sharing_probes(target_ip, target_port)
        additional_responses = send_probes(additional_probes)
        
        for additional_probe, additional_response in additional_responses:
            if additional_response and additional_response.haslayer(Raw):
                raw_data = bytes(additional_response[Raw])
                if b"HTTP" in raw_data:
                    print("Additional probing indicates port-sharing with an HTTP service")
                    co_location_detected = True
                    break
                elif b"SSH" in raw_data:
                    print("Additional probing indicates port-sharing with an SSH service")
                    co_location_detected = True
                    break
                elif b"\x16\x03" in raw_data:
                    print("Additional probing indicates port-sharing with a TLS service")
                    co_location_detected = True
                    break
                else:
                    print(f"Additional probing for {additional_probe.summary()} indicates a service other than OpenVPN, HTTP, SSH, or TLS")
            else:
                print(f"Additional probing for {additional_probe.summary()} did not receive a response or was silently dropped")

    print("------------------------------------------------------------------------------------------")
    for probe_result in probe_results:
        print(f"Port-sharing Probe: {probe_result[0]}")
        print(f"Response: {probe_result[1]}")
    
    print("--------------------------------------------------------------------------------------------")
    if co_location_detected:
        print("The server is likely co-located with another service (HTTP, SSH, or TLS).")
    else:
        print("The server does not appear to be co-located with HTTP, SSH, or TLS services.")
    print("---------------------------------------------------------------------------------------------")

In [None]:
# Main function

def main():
    
    file_path = r'C:\Users\sathw\OneDrive\Desktop\testPCAP1.pcapng'
    packets = load_packets_from_capture(file_path)
    
    # Extract VPN IP addresses and ports
    ip_addresses_ports = extract_ip_addresses_and_ports(packets)
    
    # Display VPN IP addresses and ports
    display_ip_addresses_ports(ip_addresses_ports)
    
    # Perform probing on each extracted IP address and port
    for ip, port in ip_addresses_ports:
        print("--------------------------------------------------------------")
        print(f"Probing IP: {ip}, Port: {port}")
        
        base_probes = create_probes(ip, port)
        base_responses = send_probes(base_probes)
        analyze_responses(base_responses)

        port_sharing_probes = create_port_sharing_probes(ip, port)
        port_sharing_responses = send_probes(port_sharing_probes)
        analyze_port_sharing_responses(port_sharing_responses)
        print("---------------------------------------------------------------")

if __name__ == "__main__":
    main()

In [18]:
# Main function without loading .pcapng file

def main():
    print(f"Probing IP: {target_ip}, Port: {target_port}")
    base_probes = create_probes(target_ip,target_port)
    base_responses = send_probes(base_probes)
    analyze_responses(base_responses)

    port_sharing_probes = create_port_sharing_probes(target_ip,target_port)
    port_sharing_responses = send_probes(port_sharing_probes)
    analyze_port_sharing_responses(port_sharing_responses)

if __name__ == "__main__":
    main()

Probing IP: 18.136.119.119, Port: 443
-----------------------Probe Analysis-------------------------------------------
Probe: IP / TCP 192.168.1.2:ftp_data > 18.136.119.119:https PA / Raw
No response or silent drop
Silent drop for first probe: Possible OpenVPN server with HMAC (dropped at decryption routine)
Probe: IP / TCP 192.168.1.2:ftp_data > 18.136.119.119:https PA / Raw
No response or silent drop
Silent drop for second probe: Possible OpenVPN server with HMAC (dropped at packet reassembly routine)
Probe: IP / TCP 192.168.1.2:ftp_data > 18.136.119.119:https PA / Raw
No response or silent drop
Silent drop for first probe: Possible OpenVPN server with HMAC (dropped at decryption routine)
Probe: IP / TCP 192.168.1.2:ftp_data > 18.136.119.119:https PA / Raw
No response or silent drop
Silent drop for large probe: Possible OpenVPN server with RST behavior
Response time for base_probe_1: 0.00197
Response time for base_probe_2: 0.00303
Possible HMAC detection: Faster RST for the first pro