VPN Analysis Notebook to extract and enhance VPN endpoint data for further analysis of the upstream networks that host and facilitate VPN providers.

Wrote this because I wanted to empirically prove the anecdotes that the overwhelming majority of VPN providers are reliant on M247, which represents the single biggest point of failure for a hypothetical attack for a global passive adversary.

Acknowledgements:
* qdm12 (Quentin McGaw) <https://github.com/qdm12> for Gluetun
* Frank Denis <iptoasn.com> for IPToASN



In [3]:
'''
Using the GlueTun <https://github.com/qdm12/gluetun> project's
server.json directory of VPN endpoints, extract and write to a CSV
file for further processing.
'''

import json
import csv
import requests

# URL of Gluetun servers.json file
json_url = 'https://raw.githubusercontent.com/qdm12/gluetun/refs/heads/master/internal/storage/servers.json'

# Fetch the JSON server data from the Gluetun project
response = requests.get(json_url)

data = response.json()  # Parse the JSON data

# Define the CSV file where the data will be saved
csv_file_path = 'vpn_servers_all_providers.csv'

# Define the headers for the CSV file
headers = ['provider', 'vpn', 'country', 'region', 'city', 'server_name', 'hostname', 'wgpubkey', 'ips', 'tcp', 'udp']

# Open the CSV file for writing
with open(csv_file_path, 'w', newline='', encoding='utf-8') as csvfile:
    # Create a CSV writer object
    writer = csv.DictWriter(csvfile, fieldnames=headers)
    
    # Write the headers to the CSV file
    writer.writeheader()
    
    # Iterate over each provider in the JSON data
    for provider_name, provider_data in data.items():
        # Skip non-dictionary items
        if not isinstance(provider_data, dict) or 'servers' not in provider_data:
            continue
        
        # Iterate over the servers for each provider to write to CSV
        for server in provider_data['servers']:
            row = {header: server.get(header, '') for header in headers if header not in ['provider']}
            row['provider'] = provider_name  # Add provider name to row
            row['ips'] = ', '.join(server.get('ips', []))
            row['tcp'] = str(server.get('tcp', ''))
            row['udp'] = str(server.get('udp', ''))
            writer.writerow(row)

# Output the created CSV
print(f"CSV file saved: {csv_file_path}")



CSV file saved: vpn_servers_all_providers.csv


In [4]:
'''
Download and extract the IP to ASN dataset.
'''

import requests
import gzip
import shutil

url = 'https://iptoasn.com/data/ip2asn-combined.tsv.gz'
response = requests.get(url, stream=True)
gz_file_path = 'ip2asn-combined.tsv.gz'

with open(gz_file_path, 'wb') as f:
    f.write(response.content)

tsv_file_path = 'ip2asn-combined.tsv'
with gzip.open(gz_file_path, 'rb') as f_in:
    with open(tsv_file_path, 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

In [1]:
'''
Enhance the data to include ASN based on IP2ASN dataset using range-based integer lookups.
'''

import pandas as pd
import ipaddress
import socket
from tqdm import tqdm

csv_file_path = 'vpn_servers_all_providers.csv'
tsv_file_path = 'ip2asn-combined.tsv'

# Function to convert an IP address to an integer to perform faster range based lookups
def ip_to_int(ip):
    return int(ipaddress.ip_address(ip))

# Parse the IP2ASN TSV file into a list of tuples
def load_tsv(tsv_path):
    ip_ranges = []
    with open(tsv_path, 'r') as file:
        for line in file:
            start, end, asn, _, description = line.strip().split('\t', 4)
            ip_ranges.append((ip_to_int(start), ip_to_int(end), asn, description))
    return ip_ranges

# Function to perform a rDNS lookup and resolve hostnames
def resolve_hostname(hostname):
    if not isinstance(hostname, str):  # Check if hostname is a string
        return None  # If not, return None without attempting DNS lookup
    try:
        ip_list = socket.gethostbyname_ex(hostname)[2]
        for ip in ip_list:
            if ipaddress.ip_address(ip).version == 4:
                return ip
    except socket.gaierror:
        return None

# Function to find the ASN and its description for a given IP address
def find_asn_and_description(ip_int, ip_ranges):
    for start, end, asn, description in ip_ranges:
        if start <= ip_int <= end:
            return asn, description
    return 'ASN not found', 'Description not found'

# Initial load of the IP ranges from the TSV file
ip_ranges = load_tsv(tsv_file_path)

# Load the VPN endpoints CSV
df = pd.read_csv(csv_file_path)

# Add new columns for the ASN and ASN description - initialized to None
df['ASN'] = None
df['ASN_Description'] = None

# Update the DataFrame with the ASN and its description for each IP
for index, row in tqdm(df.iterrows(), total=df.shape[0], desc="Processing IPs"):
    ip_address = row['ips']
    hostname = row['hostname'] if isinstance(row['hostname'], str) else None
    if pd.isnull(ip_address) or ',' in ip_address or ip_address == '':  # Check for empty, NaN, or multiple IPs
        ip_address = resolve_hostname(hostname)  # Attempt rDNS lookup with a valid hostname
        if ip_address is None:
            continue

    try:
        ip_int = ip_to_int(ip_address)
        asn, description = find_asn_and_description(ip_int, ip_ranges)
        df.at[index, 'ASN'] = asn
        df.at[index, 'ASN_Description'] = description
    except ValueError:
        continue 

# Save the enhanced DataFrame to a new CSV file
updated_csv_file_path = 'vpn_servers_all_providers_with_asn_and_description.csv'
df.to_csv(updated_csv_file_path, index=False)

print(f"Updated CSV saved to {updated_csv_file_path}")

Processing IPs: 100%|██████████| 20776/20776 [36:58<00:00,  9.37it/s]  


Updated CSV saved to vpn_servers_all_providers_with_asn_and_description.csv


In [16]:
#Summarize the # of distinct VPN Providers using an ASN

import pandas as pd

file_path = 'vpn_servers_all_providers_with_asn_and_description.csv'
data = pd.read_csv(file_path)

distinct_providers_per_asn = data.groupby('ASN')['provider'].nunique().reset_index()

distinct_providers_per_asn.columns = ['ASN', 'Distinct_Providers']

distinct_providers_per_asn['ASN'] = distinct_providers_per_asn['ASN'].astype(str)

asn_descriptions = data[['ASN', 'ASN_Description']].drop_duplicates()
asn_descriptions['ASN'] = asn_descriptions['ASN'].astype(str)

distinct_providers_per_asn = distinct_providers_per_asn.merge(asn_descriptions, on='ASN', how='left')

distinct_providers_per_asn.to_csv('distinct_providers_per_asn_with_description.csv', index=False)