In [1]:
import requests
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Tuple
import ssl
from urllib.parse import urlparse
import socket
import dns.resolver
import pandas as pd
from datetime import datetime
import logging
import pandas as pd
import os
import csv

In [2]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='speed_test.log'
)

In [3]:
class WebsiteSpeedTester:
    def __init__(self, urls: List[str], num_requests: int = 5):
        self.urls = [self._normalize_url(url) for url in urls]
        self.num_requests = num_requests
        self.results_df = pd.DataFrame()
        self.unreachable_sites = []
        logging.info(f"Initialized tester with {len(urls)} URLs")
        
    def _normalize_url(self, url: str) -> str:
        if not url.startswith(('http://', 'https://')):
            url = f'https://{url}'
        logging.debug(f"Normalized URL: {url}")
        return url

    def is_website_reachable(self, url: str) -> bool:
        logging.info(f"Testing reachability for {url}")
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
        }
        
        session = requests.Session()
        urls_to_try = [url]
        
        if url.startswith('https://'):
            urls_to_try.append('http://' + url[8:])
        
        for test_url in urls_to_try:
            try:
                logging.debug(f"Attempting connection to {test_url}")
                response = session.get(
                    test_url,
                    headers=headers,
                    timeout=10,
                    allow_redirects=True,
                    stream=True
                )
                response.close()
                
                if 200 <= response.status_code < 400:
                    logging.info(f"Successfully reached {test_url} (Status: {response.status_code})")
                    return True
                    
            except (requests.RequestException, socket.error, ssl.SSLError) as e:
                logging.warning(f"Failed to reach {test_url}: {str(e)}")
                continue
                
        logging.error(f"Website unreachable: {url}")
        return False
        
    def _measure_dns(self, domain: str) -> Tuple[float, bool, str]:
        logging.info(f"Performing DNS lookup for {domain}")
        try:
            start = time.time()
            dns.resolver.resolve(domain, 'A')
            duration = time.time() - start
            logging.info(f"DNS lookup successful for {domain} ({duration:.3f}s)")
            return duration, True, ""
        except Exception as e:
            logging.error(f"DNS lookup failed for {domain}: {str(e)}")
            return 0, False, f"DNS resolution failed: {str(e)}"
        
    def _test_single_url(self, url: str) -> Dict:
        logging.info(f"\n{'='*50}\nStarting test for {url}")
        
        # Create/append header to CSV if it doesn't exist
        if not os.path.exists('speed_test_results.csv'):
            with open('speed_test_results.csv', 'w', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=['url', 'timestamp', 'dns_lookup', 'connection', 
                                                     'ttfb', 'download', 'total', 'status', 'error_message'])
                writer.writeheader()
        logging.info(f"\n{'='*50}\nStarting test for {url}")
        
        metrics = {
            'url': url,
            'timestamp': datetime.now(),
            'dns_lookup': [],
            'connection': [],
            'ttfb': [],
            'download': [],
            'total': [],
            'status': 'success',
            'error_message': ''
        }
        
        if not self.is_website_reachable(url):
            metrics['status'] = 'failed'
            metrics['error_message'] = 'Website unreachable'
            self.unreachable_sites.append({'url': url, 'error': 'Website unreachable'})
            return metrics
            
        parsed_url = urlparse(url)
        domain = parsed_url.netloc
        
        dns_time, dns_success, dns_error = self._measure_dns(domain)
        if not dns_success:
            metrics['status'] = 'failed'
            metrics['error_message'] = dns_error
            self.unreachable_sites.append({'url': url, 'error': dns_error})
            return metrics
        
        logging.info(f"Starting {self.num_requests} requests for {url}")
        for i in range(self.num_requests):
            logging.info(f"Request {i+1}/{self.num_requests} for {url}")
            try:
                headers = {
                    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
                }
                
                metrics['dns_lookup'].append(dns_time)
                start_time = time.time()
                
                with requests.Session() as session:
                    conn_start = time.time()
                    response = session.get(url, headers=headers, stream=True, timeout=10, verify=False, allow_redirects=True)
                    conn_time = time.time() - conn_start
                    
                    if not (200 <= response.status_code < 400):
                        raise requests.RequestException(f"Invalid status code: {response.status_code}")
                        
                    end_time = time.time()
                    total_time = end_time - start_time
                    download_time = total_time - conn_time
                    
                    metrics['connection'].append(conn_time)
                    metrics['download'].append(download_time)
                    metrics['total'].append(total_time)
                    metrics['ttfb'].append(response.elapsed.total_seconds())
                    
                    logging.debug(f"Request {i+1} metrics for {url}:")
                    logging.debug(f"Connection: {conn_time:.3f}s")
                    logging.debug(f"Download: {download_time:.3f}s")
                    logging.debug(f"Total: {total_time:.3f}s")
                    
                time.sleep(1)
                
            except Exception as e:
                logging.error(f"Error during request {i+1} for {url}: {str(e)}")
                if i == 0:
                    metrics['status'] = 'failed'
                    metrics['error_message'] = str(e)
                    self.unreachable_sites.append({'url': url, 'error': str(e)})
                    break
                
        # Calculate averages for successful metrics
        for key in ['dns_lookup', 'connection', 'ttfb', 'download', 'total']:
            if metrics[key]:
                metrics[key] = statistics.mean(metrics[key])
                logging.info(f"Average {key} for {url}: {metrics[key]:.3f}s")
            else:
                metrics[key] = None
                
        logging.info(f"Completed testing {url}\n{'='*50}")
        
        # Write results immediately to CSV
        with open('speed_test_results.csv', 'a', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=['url', 'timestamp', 'dns_lookup', 'connection', 
                                                 'ttfb', 'download', 'total', 'status', 'error_message'])
            writer.writerow(metrics)
            
        return metrics

    def run_test(self, max_workers: int = 3) -> pd.DataFrame:
        logging.info(f"Starting batch test with {max_workers} workers")
        all_results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            results = list(executor.map(self._test_single_url, self.urls))
            all_results.extend(results)
            
        self.results_df = pd.DataFrame(all_results)
        logging.info("Batch test completed")
        return self.results_df
        
    def save_results(self, filename: str = 'speed_test_results.csv'):
        if not self.results_df.empty:
            self.results_df.to_csv(filename, index=False)
            logging.info(f"Results saved to {filename}")
            
        if self.unreachable_sites:
            unreachable_df = pd.DataFrame(self.unreachable_sites)
            unreachable_df.to_csv('unreachable_sites.csv', index=False)
            logging.info(f"Unreachable sites saved to unreachable_sites.csv")

In [None]:
def main():
    websites = pd.read_csv("../data/us_gov_domain_list.csv", usecols=["Domain name"]).drop_duplicates()
    websites = websites["Domain name"].dropna().astype(str).tolist()
    try:
        logging.info("Starting speed test script")
        tester = WebsiteSpeedTester(websites)
        
        results_df = tester.run_test()
        
        print("\nResults Summary:")
        print(f"Total sites tested: {len(websites)}")
        print(f"Successful: {len(results_df[results_df['status'] == 'success'])}")
        print(f"Failed: {len(results_df[results_df['status'] == 'failed'])}")
        
        print("\nDetailed Results (in seconds):")
        print(results_df[['url', 'status', 'error_message'] + 
                        [col for col in results_df.columns if col not in 
                         ['url', 'status', 'error_message', 'timestamp']]].round(3))
        
        tester.save_results()
        
    except Exception as e:
        logging.error(f"Script error: {str(e)}")
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main()













































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































































