In [4]:
# Web Tracker Analysis using Firefox

import os
import json
import time
import logging
import csv
import urllib.parse
from datetime import datetime
from typing import Dict, List, Set, Any, Tuple, Optional
from collections import defaultdict

# You'll need to install these packages if you don't have them:
# !pip install selenium webdriver-manager pandas tldextract

from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from webdriver_manager.firefox import GeckoDriverManager
import pandas as pd
import tldextract

# Configure logging to display in the notebook
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Tracker categories
TRACKER_CATEGORIES = {
    'advertising': 'Advertising',
    'audio_video': 'Audio/Video Player',
    'customer_interaction': 'Customer Interaction',
    'hosting': 'Hosting Services',
    'consent': 'Consent Management',
    'analytics': 'Site Analytics',
    'misc': 'Miscellaneous',
    'utility': 'Utilities',
    'social': 'Social Media',
    'adult_advertising': 'Adult Advertising'
}

class Request:
    """Class to represent a network request."""
    def __init__(self, url: str, resource_type: str, size: int):
        self.url = url
        self.resource_type = resource_type
        self.size = size  # Size in bytes
        self.domain = self.extract_domain(url)
        self.timestamp = time.time()
    
    @staticmethod
    def extract_domain(url: str) -> str:
        """Extract root domain from URL."""
        try:
            ext = tldextract.extract(url)
            return f"{ext.domain}.{ext.suffix}"
        except:
            return urllib.parse.urlparse(url).netloc

class TrackerDatabase:
    """Database of known trackers and their categories."""
    def __init__(self, database_file: Optional[str] = None):
        self.trackers = {}
        self.categories = defaultdict(list)
        
        # Load from file if provided
        if database_file and os.path.exists(database_file):
            self.load_database(database_file)
        else:
            self.load_default_database()
    
    def load_database(self, database_file: str) -> None:
        """Load tracker database from JSON file."""
        try:
            with open(database_file, 'r') as f:
                data = json.load(f)
                self.trackers = data.get('trackers', {})
                
                # Build category index
                for domain, info in self.trackers.items():
                    if 'category' in info:
                        self.categories[info['category']].append(domain)
                
            logging.info(f"Loaded {len(self.trackers)} trackers from {database_file}")
        except Exception as e:
            logging.error(f"Error loading tracker database: {e}")
            self.load_default_database()
    
    def load_default_database(self) -> None:
        """Load a default set of common trackers."""
        # This is a minimal set - in practice you would want a comprehensive database
        default_trackers = {
            "google-analytics.com": {"name": "Google Analytics", "category": "analytics"},
            "doubleclick.net": {"name": "DoubleClick", "category": "advertising"},
            "facebook.net": {"name": "Facebook", "category": "social"},
            "fonts.googleapis.com": {"name": "Google Fonts", "category": "utility"},
            "hotjar.com": {"name": "Hotjar", "category": "analytics"},
            "youtube.com": {"name": "YouTube", "category": "audio_video"},
            "criteo.com": {"name": "Criteo", "category": "advertising"},
            "taboola.com": {"name": "Taboola", "category": "advertising"},
            "outbrain.com": {"name": "Outbrain", "category": "advertising"},
            "cloudflare.com": {"name": "Cloudflare", "category": "hosting"},
            "cdn.cookielaw.org": {"name": "OneTrust", "category": "consent"},
            "intercom.io": {"name": "Intercom", "category": "customer_interaction"},
            "stripe.com": {"name": "Stripe", "category": "utility"},
            "twitter.com": {"name": "Twitter", "category": "social"},
            "googleapis.com": {"name": "Google APIs", "category": "utility"},
            "gstatic.com": {"name": "Google Static", "category": "utility"},
            "amazon-adsystem.com": {"name": "Amazon Ads", "category": "advertising"},
            "googletagmanager.com": {"name": "Google Tag Manager", "category": "utility"},
            "adnxs.com": {"name": "AppNexus", "category": "advertising"},
            "cookiebot.com": {"name": "Cookiebot", "category": "consent"},
            "fastly.net": {"name": "Fastly", "category": "hosting"},
            "akamaihd.net": {"name": "Akamai", "category": "hosting"},
            "optimizely.com": {"name": "Optimizely", "category": "analytics"},
            "segment.io": {"name": "Segment", "category": "analytics"},
            "fontawesome.com": {"name": "Font Awesome", "category": "utility"},
            "piwik.pro": {"name": "Piwik", "category": "analytics"},
            "tiqcdn.com": {"name": "Tealium", "category": "analytics"},
            "mixpanel.com": {"name": "Mixpanel", "category": "analytics"},
            "jsdelivr.net": {"name": "jsDelivr", "category": "hosting"},
            "unpkg.com": {"name": "Unpkg", "category": "hosting"}
        }
        
        self.trackers = default_trackers
        
        # Build category index
        for domain, info in self.trackers.items():
            if 'category' in info:
                self.categories[info['category']].append(domain)
        
        logging.info(f"Loaded {len(self.trackers)} default trackers")
    
    def is_tracker(self, domain: str) -> bool:
        """Check if a domain is a known tracker."""
        # Direct match
        if domain in self.trackers:
            return True
        
        # Subdomain match
        for tracker_domain in self.trackers:
            if domain.endswith(f".{tracker_domain}"):
                return True
        
        return False
    
    def get_tracker_info(self, domain: str) -> Optional[Dict[str, Any]]:
        """Get information about a tracker."""
        # Direct match
        if domain in self.trackers:
            return self.trackers[domain]
        
        # Subdomain match
        for tracker_domain in self.trackers:
            if domain.endswith(f".{tracker_domain}"):
                return self.trackers[tracker_domain]
        
        return None
    
    def get_category(self, domain: str) -> str:
        """Get category of a tracker."""
        info = self.get_tracker_info(domain)
        if info and 'category' in info:
            return info['category']
        return 'misc'  # Default category for unknown trackers

class WebsiteAnalyzer:
    """Analyzes a website for trackers using Firefox."""
    def __init__(self, tracker_db: TrackerDatabase, headless: bool = True):
        self.tracker_db = tracker_db
        self.headless = headless
        self.driver = None
        self.requests = []
        self.setup_driver()
    
    def setup_driver(self) -> None:
        """Set up the Selenium WebDriver."""
        options = Options()
        if self.headless:
            options.add_argument("--headless")
        
        options.add_argument("--width=1920")
        options.add_argument("--height=1080")
        
        # Enable network request interception via DevTools
        options.set_preference("devtools.netmonitor.enabled", True)
        options.set_preference("devtools.netmonitor.har.enableAutoExportToFile", False)
        options.set_preference("devtools.netmonitor.har.defaultLogDir", os.getcwd())
        options.set_preference("devtools.netmonitor.har.defaultFileName", "network.har")
        options.log.level = "trace"
        
        try:
            self.driver = webdriver.Firefox(
                service=Service(GeckoDriverManager().install()),
                options=options
            )
            logging.info("Firefox WebDriver initialized")
        except Exception as e:
            logging.error(f"Failed to initialize Firefox WebDriver: {e}")
            raise
    
    def analyze_website(self, url: str, timeout: int = 30) -> Dict[str, Any]:
        """
        Visit a website and analyze trackers.
        
        Args:
            url: Website URL to analyze
            timeout: Page load timeout in seconds
            
        Returns:
            Dictionary with tracker analysis results
        """
        logging.info(f"Analyzing website: {url}")
        
        if not self.driver:
            self.setup_driver()
        
        # Reset requests
        self.requests = []
        
        try:
            # Navigate to the URL
            self.driver.get(url)
            
            # Wait for page to load
            time.sleep(timeout)
            
            # Get all network requests
            # In Firefox, we need to extract this information differently
            # We'll use the page source and resource timing API to get a list of loaded resources
            resources = self.driver.execute_script("""
                var resources = [];
                var entries = performance.getEntriesByType('resource');
                for (var i = 0; i < entries.length; i++) {
                    var entry = entries[i];
                    resources.push({
                        url: entry.name,
                        type: entry.initiatorType,
                        size: entry.transferSize
                    });
                }
                return resources;
            """)
            
            # Process resources
            for resource in resources:
                request = Request(
                    url=resource.get('url', ''),
                    resource_type=resource.get('type', 'unknown'),
                    size=resource.get('size', 0)
                )
                self.requests.append(request)
            
            # Analyze requests
            return self.analyze_requests(url)
            
        except Exception as e:
            logging.error(f"Error analyzing {url}: {e}")
            return {
                "url": url,
                "error": str(e),
                "trackers": 0,
                "tracking_requests": 0,
                "data_transferred": 0,
                "categories": {}
            }
    
    def analyze_requests(self, base_url: str) -> Dict[str, Any]:
        """Analyze collected requests to identify trackers."""
        base_domain = Request.extract_domain(base_url)
        
        # Counts
        tracker_domains = set()
        tracking_requests = []
        data_transferred = 0
        
        # Categorized tracking requests
        categories = defaultdict(int)
        
        # Analyze requests
        for request in self.requests:
            # Skip requests to the same domain
            if request.domain == base_domain:
                continue
            
            # Check if this is a tracker
            if self.tracker_db.is_tracker(request.domain):
                tracker_domains.add(request.domain)
                tracking_requests.append(request)
                data_transferred += request.size
                
                # Categorize the request
                category = self.tracker_db.get_category(request.domain)
                categories[category] += 1
        
        # Prepare results
        results = {
            "url": base_url,
            "domain": base_domain,
            "timestamp": datetime.now().isoformat(),
            "trackers": len(tracker_domains),
            "tracking_requests": len(tracking_requests),
            "data_transferred": data_transferred,
            "categories": {}
        }
        
        # Format categories for output
        for category, count in categories.items():
            results["categories"][TRACKER_CATEGORIES.get(category, category)] = count
        
        # Ensure all categories are represented
        for key, label in TRACKER_CATEGORIES.items():
            if label not in results["categories"]:
                results["categories"][label] = 0
        
        logging.info(f"Analysis complete: {results['trackers']} trackers, {results['tracking_requests']} tracking requests")
        return results
    
    def close(self) -> None:
        """Close the WebDriver."""
        if self.driver:
            self.driver.quit()
            self.driver = None
            logging.info("WebDriver closed")

class AnalysisManager:
    """Manages the analysis of multiple websites."""
    def __init__(self, tracker_db_file: Optional[str] = None, output_dir: str = "results"):
        self.tracker_db = TrackerDatabase(tracker_db_file)
        self.output_dir = output_dir
        self.analyzer = None
        
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
    
    def analyze_urls(self, urls: List[str], timeout: int = 30) -> List[Dict[str, Any]]:
        """
        Analyze a list of websites.
        
        Args:
            urls: List of URLs to analyze
            timeout: Timeout in seconds for each page load
            
        Returns:
            List of analysis results
        """
        results = []
        self.analyzer = WebsiteAnalyzer(self.tracker_db)
        
        for i, url in enumerate(urls):
            logging.info(f"Analyzing ({i+1}/{len(urls)}): {url}")
            try:
                result = self.analyzer.analyze_website(url, timeout)
                results.append(result)
                
                # Save interim results after each site
                if results:
                    interim_filename = os.path.join(self.output_dir, f"interim_results_{i+1}.json")
                    with open(interim_filename, 'w') as f:
                        json.dump(results, f, indent=2)
                    logging.info(f"Saved interim results to {interim_filename}")
                
            except Exception as e:
                logging.error(f"Error analyzing {url}: {e}")
        
        # Clean up
        if self.analyzer:
            self.analyzer.close()
        
        # Save final results
        if results:
            self.save_results(results)
        
        return results
    
    def save_results(self, results: List[Dict[str, Any]]) -> None:
        """Save analysis results to files."""
        # Create a timestamp for filenames
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
        
        # Save raw JSON
        json_file = os.path.join(self.output_dir, f"tracker_analysis_{timestamp}.json")
        with open(json_file, 'w') as f:
            json.dump(results, f, indent=2)
        
        # Save summary CSV
        csv_file = os.path.join(self.output_dir, f"tracker_summary_{timestamp}.csv")
        self.save_summary_csv(results, csv_file)
        
        # Save detailed stats
        stats_file = os.path.join(self.output_dir, f"tracker_stats_{timestamp}.csv")
        self.save_detailed_stats(results, stats_file)
        
        logging.info(f"Results saved to {self.output_dir}")
        
        # Return the paths to the files
        return {
            'json': json_file,
            'summary': csv_file,
            'stats': stats_file
        }
    
    def save_summary_csv(self, results: List[Dict[str, Any]], filename: str) -> None:
        """Save a summary CSV of the results."""
        with open(filename, 'w', newline='') as f:
            writer = csv.writer(f)
            
            # Write header
            writer.writerow([
                'URL', 'Domain', 'Timestamp', 'Trackers', 
                'Tracking Requests', 'Data Transferred (bytes)',
                *[label for _, label in sorted(TRACKER_CATEGORIES.items())]
            ])
            
            # Write data
            for result in results:
                row = [
                    result['url'],
                    result['domain'],
                    result['timestamp'],
                    result['trackers'],
                    result['tracking_requests'],
                    result['data_transferred']
                ]
                
                # Add category counts
                for _, label in sorted(TRACKER_CATEGORIES.items()):
                    row.append(result['categories'].get(label, 0))
                
                writer.writerow(row)
    
    def save_detailed_stats(self, results: List[Dict[str, Any]], filename: str) -> None:
        """Calculate and save detailed statistics."""
        if not results:
            return
        
        # Prepare dataframe
        df = pd.DataFrame(results)
        
        # Extract category columns
        for category_label in TRACKER_CATEGORIES.values():
            df[category_label] = df['categories'].apply(lambda x: x.get(category_label, 0))
        
        # Drop the categories column
        df = df.drop(columns=['categories'])
        
        # Calculate statistics
        stats = {}
        numeric_columns = [
            'trackers', 'tracking_requests', 'data_transferred',
            *TRACKER_CATEGORIES.values()
        ]
        
        for col in numeric_columns:
            if col in df.columns:
                stats[f"{col}_count"] = df[col].count()
                stats[f"{col}_mean"] = df[col].mean()
                stats[f"{col}_std"] = df[col].std()
                stats[f"{col}_min"] = df[col].min()
                stats[f"{col}_25%"] = df[col].quantile(0.25)
                stats[f"{col}_median"] = df[col].median()
                stats[f"{col}_75%"] = df[col].quantile(0.75)
                stats[f"{col}_max"] = df[col].max()
        
        # Save stats to CSV
        with open(filename, 'w', newline='') as f:
            writer = csv.writer(f)
            
            # Write header and data
            writer.writerow(['Metric', 'Count', 'Mean', 'Std', 'Min', '25%', 'Median', '75%', 'Max'])
            
            for col in numeric_columns:
                if col in df.columns:
                    writer.writerow([
                        col, 
                        stats[f"{col}_count"],
                        round(stats[f"{col}_mean"], 2),
                        round(stats[f"{col}_std"], 2),
                        stats[f"{col}_min"],
                        round(stats[f"{col}_25%"], 2),
                        round(stats[f"{col}_median"], 2),
                        round(stats[f"{col}_75%"], 2),
                        stats[f"{col}_max"]
                    ])
        
        # Return the stats dataframe for display
        return pd.read_csv(filename)

# Example usage in Jupyter notebook
# First install the required packages
# !pip install selenium webdriver-manager pandas tldextract

# Define your list of URLs to analyze
urls_to_analyze = [
    "https://example.com",
    "https://nytimes.com",
    # Add more URLs here
]

# Create a directory for results
output_directory = "tracker_results"

# Initialize the analysis manager
manager = AnalysisManager(output_dir=output_directory)

# Run the analysis
# results = manager.analyze_urls(urls_to_analyze)

# Display the results (if you've run the analysis)
# import pandas as pd
# stats_files = [f for f in os.listdir(output_directory) if f.startswith("tracker_stats_")]
# if stats_files:
#     latest_stats_file = sorted(stats_files)[-1]
#     stats_df = pd.read_csv(f"{output_directory}/{latest_stats_file}")
#     display(stats_df)

2025-03-15 18:55:49,103 - INFO - Loaded 30 default trackers


In [6]:

# Run the analysis (uncomment to execute)
results = manager.analyze_urls(urls_to_analyze)

# Display the results (if you've run the analysis)
import pandas as pd
# Display the results
import pandas as pd
import os

# First, let's find the stats files specifically
stats_files = [f for f in os.listdir(output_directory) if f.startswith("tracker_stats_")]

if stats_files:
    # Sort to get the latest one
    latest_stats_file = sorted(stats_files)[-1]
    
    try:
        # Try to read the CSV with explicit encoding
        stats_df = pd.read_csv(f"{output_directory}/{latest_stats_file}", encoding='utf-8')
        display(stats_df)
    except Exception as e:
        print(f"Error reading CSV: {e}")
        
        # Let's inspect the file contents
        with open(f"{output_directory}/{latest_stats_file}", 'r') as f:
            print("First 10 lines of the file:")
            for i, line in enumerate(f):
                if i < 10:
                    print(f"Line {i+1}: {repr(line)}")
                else:
                    break
else:
    # If no stats files, look at what files are actually there
    print("Files in output directory:")
    for f in os.listdir(output_directory):
        print(f)
    
    # Let's look at the JSON results instead
    json_files = [f for f in os.listdir(output_directory) if f.endswith(".json")]
    
    if json_files:
        latest_json = sorted(json_files)[-1]
        with open(f"{output_directory}/{latest_json}", 'r') as f:
            import json
            data = json.load(f)
            print(f"\nFound {len(data)} website results in {latest_json}")
            
            # Create a simple summary DataFrame
            summary = []
            for site in data:
                summary.append({
                    'url': site['url'],
                    'trackers': site.get('trackers', 0),
                    'tracking_requests': site.get('tracking_requests', 0),
                    'data_transferred': site.get('data_transferred', 0)
                })
            
            if summary:
                summary_df = pd.DataFrame(summary)
                display(summary_df)

2025-03-15 18:59:10,988 - INFO - Get LATEST geckodriver version for 135.0 firefox
2025-03-15 18:59:11,592 - INFO - Driver [/Users/soodoku/.wdm/drivers/geckodriver/macos/0.36/geckodriver] found in cache
2025-03-15 18:59:15,461 - INFO - Firefox WebDriver initialized
2025-03-15 18:59:15,463 - INFO - Analyzing (1/2): https://example.com
2025-03-15 18:59:15,464 - INFO - Analyzing website: https://example.com
2025-03-15 18:59:46,168 - INFO - Analysis complete: 0 trackers, 0 tracking requests
2025-03-15 18:59:46,171 - INFO - Saved interim results to tracker_results/interim_results_1.json
2025-03-15 18:59:46,172 - INFO - Analyzing (2/2): https://nytimes.com
2025-03-15 18:59:46,173 - INFO - Analyzing website: https://nytimes.com
2025-03-15 19:00:19,684 - INFO - Analysis complete: 5 trackers, 22 tracking requests
2025-03-15 19:00:19,693 - INFO - Saved interim results to tracker_results/interim_results_2.json
2025-03-15 19:00:20,652 - INFO - WebDriver closed
2025-03-15 19:00:20,676 - INFO - Resul

Unnamed: 0,Metric,Count,Mean,Std,Min,25%,Median,75%,Max
0,trackers,2,2.5,3.54,0,1.25,2.5,3.75,5
1,tracking_requests,2,11.0,15.56,0,5.5,11.0,16.5,22
2,data_transferred,2,23087.0,32649.95,0,11543.5,23087.0,34630.5,46174
3,Advertising,2,9.5,13.44,0,4.75,9.5,14.25,19
4,Audio/Video Player,2,0.0,0.0,0,0.0,0.0,0.0,0
5,Customer Interaction,2,0.0,0.0,0,0.0,0.0,0.0,0
6,Hosting Services,2,0.0,0.0,0,0.0,0.0,0.0,0
7,Consent Management,2,0.0,0.0,0,0.0,0.0,0.0,0
8,Site Analytics,2,0.0,0.0,0,0.0,0.0,0.0,0
9,Miscellaneous,2,0.0,0.0,0,0.0,0.0,0.0,0
