In [13]:
import subprocess
import time
import psutil

def ensure_tor_is_running():
    tor_process = None
    for proc in psutil.process_iter(['name']):
        if proc.info['name'] == 'tor':
            tor_process = proc
            break
    
    if tor_process is None:
        print("Tor is not running. Starting Tor...")
        subprocess.Popen(["/opt/homebrew/opt/tor/bin/tor"])
        time.sleep(10)  # Wait for Tor to start up
        print("Tor should now be running.")
    else:
        print("Tor is already running.")

In [15]:
if __name__ == "__main__":
    ensure_tor_is_running()


Tor is already running.


In [32]:
import requests
from bs4 import BeautifulSoup
import time
import random
import logging
import csv
from urllib.parse import quote
import socks
import socket
from fake_useragent import UserAgent
import cloudscraper
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class TorSession(requests.Session):
    def __init__(self, *args, **kwargs):
        super(TorSession, self).__init__(*args, **kwargs)
        self.proxies = {
            'http': 'socks5h://localhost:9050',
            'https': 'socks5h://localhost:9050'
        }

def get_tor_session():
    session = TorSession()
    retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session

def check_tor_connection():
    session = get_tor_session()
    try:
        response = session.get('https://check.torproject.org/', timeout=30)
        if 'Congratulations. This browser is configured to use Tor.' in response.text:
            logging.info("Successfully connected through Tor.")
            return True
        else:
            logging.warning("Connected to the internet, but not through Tor.")
            return False
    except Exception as e:
        logging.error(f"Error checking Tor connection: {str(e)}")
        return False

class AmazonScraper:
    def __init__(self):
        self.ua = UserAgent()
        self.session = self.create_scraper()

    def create_scraper(self):
        scraper = cloudscraper.create_scraper(
            browser={
                'browser': 'chrome',
                'platform': 'windows',
                'desktop': True
            },
            delay=10,
            interpreter='nodejs'
        )
        scraper.proxies = {
            'http': 'socks5h://localhost:9050',
            'https': 'socks5h://localhost:9050'
        }
        return scraper

    def get_headers(self):
        return {
            'User-Agent': self.ua.random,
            'Accept-Language': 'en-US,en;q=0.9',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Referer': 'https://www.amazon.co.uk/',
            'DNT': '1',
            'Upgrade-Insecure-Requests': '1',
            'Cache-Control': 'max-age=0',
            'Connection': 'keep-alive',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Sec-Fetch-User': '?1',
        }

    def throttle_request(self):
        time.sleep(random.uniform(45, 90))  

    def make_request(self, url, max_retries=5):
        for attempt in range(max_retries):
            try:
                self.session.cookies.clear()  
                headers = self.get_headers()
                response = self.session.get(url, headers=headers, timeout=60)
                response.raise_for_status()
                return response
            except requests.exceptions.RequestException as e:
                logging.warning(f"Attempt {attempt + 1} failed: {str(e)}")
                if attempt == max_retries - 1:
                    logging.error(f"Max retries reached. Unable to fetch {url}")
                    return None
                self.session = self.create_scraper()  # Create a new session for a new Tor circuit
                time.sleep((2 ** attempt) + random.random() * 30)  # Exponential backoff with added randomness

    def scrape_category(self, category, max_products=100):
        all_products = []
        page = 1
        encoded_category = quote(category)
        base_url = f"https://www.amazon.co.uk/s?k={encoded_category}&ref=nb_sb_noss_1"
        
        while len(all_products) < max_products:
            url = f"{base_url}&page={page}"
            self.throttle_request()
            
            response = self.make_request(url)
            if not response:
                break

            soup = BeautifulSoup(response.content, 'html.parser')
            product_cards = soup.select('div[data-component-type="s-search-result"]')
            
            if not product_cards:
                logging.warning(f"No product cards found on page {page} for {category}. This might be a captcha page.")
                break
            
            for card in product_cards:
                if len(all_products) >= max_products:
                    break
                
                product_data = self.extract_product_data(card, category)
                all_products.append(product_data)
                logging.info(f"Scraped product {len(all_products)} for {category}: {product_data['title']}")
            
            # Check if there's a next page
            next_page = soup.select_one('a.s-pagination-next')
            if not next_page:
                break
            
            page += 1
            self.session = self.create_scraper()  # This creates a new session for a new Tor circuit between pages 

        logging.info(f"Scraped {len(all_products)} products from the category {category}.")
        return all_products

    def extract_product_data(self, card, category):
        product_data = {'category': category}
        
        # Extract title
        title_elem = card.select_one('h2 a.a-link-normal')
        product_data['title'] = title_elem.text.strip() if title_elem else None
        
        # Extract price
        price_elem = card.select_one('span.a-price-whole')
        product_data['price'] = price_elem.text.strip() if price_elem else None
        
        # Extract rating
        rating_elem = card.select_one('span.a-icon-alt')
        product_data['rating'] = rating_elem.text.split()[0] if rating_elem else None
        
        # Extract review count
        review_count_elem = card.select_one('span.a-size-base.s-underline-text')
        product_data['review_count'] = review_count_elem.text.strip() if review_count_elem else None
        
        return product_data

def scrape_all_categories(categories):
    scraper = AmazonScraper()
    all_data = []

    for category in categories:
        if isinstance(category, list):
            for subcategory in category:
                products = scraper.scrape_category(f"{category[0]} {subcategory}", max_products=100)
                all_data.extend(products)
        else:
            products = scraper.scrape_category(category, max_products=100)
            all_data.extend(products)
        
        time.sleep(random.uniform(180, 300))  

    return all_data

def save_to_csv(data, filename):
    if not data:
        logging.warning("No data to save. CSV file will not be created.")
        return
    
    keys = data[0].keys()
    try:
        with open(filename, 'w', newline='', encoding='utf-8') as output_file:
            dict_writer = csv.DictWriter(output_file, keys)
            dict_writer.writeheader()
            dict_writer.writerows(data)
        logging.info(f"Data successfully saved to {filename}")
    except IOError as e:
        logging.error(f"IOError occurred while saving data: {str(e)}")
    except Exception as e:
        logging.error(f"Unexpected error occurred while saving data: {str(e)}")

if __name__ == "__main__":
    if not check_tor_connection():
        logging.error("Failed to connect through Tor. Exiting.")
        exit(1)

    categories = [
        "All Departments", "Apps & Games"
    ]

    try:
        all_products = scrape_all_categories(categories)
        save_to_csv(all_products, 'amazon_products_TOR.csv')
        logging.info("Scraping completed. Data saved to amazon_products_TOR.csv")
    except KeyboardInterrupt:
        logging.info("Scraping interrupted by user.")
    except Exception as e:
        logging.error(f"An unexpected error occurred: {str(e)}")
    finally:
        logging.info("Scraping process finished.")

2024-09-30 22:17:51,854 - INFO - Successfully connected through Tor.
2024-09-30 22:19:14,575 - INFO - Scraped product 1 for All Departments: PARKLEES Men's Casual Summer Combat Cotton Twill Cargo Shorts
2024-09-30 22:19:14,575 - INFO - Scraped product 2 for All Departments: Fire Bullets with K-CYTRO for Women and Men
2024-09-30 22:19:14,576 - INFO - Scraped product 3 for All Departments: TSOTMO Lyric Inspired Gift Poets Department Merch Concert Tour Merch Singer New Album Gift Singer Concert Gift Music Lover Gift
2024-09-30 22:19:14,576 - INFO - Scraped product 4 for All Departments: Fire Bullets Max Strength Black Edition for Women and Men
2024-09-30 22:19:14,577 - INFO - Scraped product 5 for All Departments: Children's Catholic Book for Girls: God Made You: Watercolor Illustrated Bible Verses Catholic Books for Kids in All Departments Catholic Books in ... for Baptism First Communion for Girls)
2024-09-30 22:19:14,577 - INFO - Scraped product 6 for All Departments: Department Femdom