In [1]:
import os
import json
import time
from datetime import datetime
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
import tkinter as tk
from tkinter import simpledialog

def timestamped_filename(base_filename):
    """Generate a filename with a timestamp."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename, ext = os.path.splitext(base_filename)
    return f"{filename}_{timestamp}{ext}"

class GoogleSearchLoader:
    def __init__(self, query):
        self.query = query

    def load_and_scroll(self):
        """Load search results and scroll until at least 100 items or no new items are found."""
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(f"https://www.google.com/search?q={quote_plus(self.query)}")

        num_results = 0
        last_height = driver.execute_script("return document.body.scrollHeight")
        
        while num_results < 100:
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(20)  # Increased delay to allow more content to load
            
            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                try:
                    next_button = driver.find_element(By.CSS_SELECTOR, 'a#pnnext')
                    next_button.click()
                    time.sleep(20)  # Wait for the next page to load
                    last_height = driver.execute_script("return document.body.scrollHeight")
                except Exception:
                    try:
                        more_results_button = driver.find_element(By.XPATH, '//a[contains(., "More results")]')
                        more_results_button.click()
                        time.sleep(20)  # Wait for more results to load
                        last_height = driver.execute_script("return document.body.scrollHeight")
                    except Exception:
                        print("No more pages or unable to load more results.")
                        break  # No new items or next/more results button was not found
            else:
                last_height = new_height
            
            search_items = driver.find_elements(By.CSS_SELECTOR, 'div.g')
            num_results = len(search_items)
        
        html_content = driver.page_source
        driver.quit()
        return html_content

    def parse_items(self, html_content):
        """Parse search items from HTML content using BeautifulSoup."""
        soup = BeautifulSoup(html_content, 'html.parser')
        results = []
        search_items = soup.select('div.g')  # Adjust the selector as needed
        for item in search_items:
            title_div = item.select_one('h3')
            title = title_div.text if title_div else "No title"
            link_tag = item.select_one('a')
            link = link_tag['href'] if link_tag else None
            snippet_div = item.select_one('div.kb0PBd')
            snippet = snippet_div.text if snippet_div else "No snippet available"
            if link:
                results.append({
                    "title": title,
                    "link": link,
                    "snippet": snippet
                })
        return results

    def save_html(self, html_content, base_filename="google_search_results.html"):
        """Save the HTML content to a timestamped file."""
        timestamped_file = timestamped_filename(base_filename)
        with open(timestamped_file, 'w', encoding='utf-8') as file:
            file.write(html_content)
        return timestamped_file

    def save_results_to_json(self, results, base_filename="parsed_search_results.json"):
        """Save the parsed results to a timestamped JSON file."""
        json_filename = timestamped_filename(base_filename)
        with open(json_filename, "w", encoding='utf-8') as json_file:
            json.dump(results, json_file, ensure_ascii=False, indent=4)
        return json_filename

def main():
    # Create a Tkinter root window and hide it
    root = tk.Tk()
    root.withdraw()

    # Prompt the user for the search query using Tkinter
    query = simpledialog.askstring("Input", "Please enter the search query topic:")

    # Check if the user provided a query
    if query:
        loader = GoogleSearchLoader(query=query)
        
        # Load and scroll through the search results, then save the HTML content
        html_content = loader.load_and_scroll()
        html_filename = loader.save_html(html_content)
        print(f"HTML content saved to {html_filename}")

        # Parse the HTML content to extract search results
        search_results = loader.parse_items(html_content)
        print(f"Extracted {len(search_results)} items.")

        # Save the parsed search results to a JSON file
        json_filename = loader.save_results_to_json(search_results)
        print(f"Parsed search results saved to {json_filename}")
    else:
        print("No query provided. Exiting.")

if __name__ == "__main__":
    main()


HTML content saved to google_search_results_20240624_153001.html
Extracted 102 items.
Parsed search results saved to parsed_search_results_20240624_153002.json


In [1]:
import os
import json
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

def timestamped_filename(base_filename):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename, ext = os.path.splitext(base_filename)
    return f"{filename}_{timestamp}{ext}"

def get_page_title(link):
    try:
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(link)
        title = driver.title
        driver.quit()
        return title, None
    except Exception as e:
        return None, str(e)

def test_link(link_info):
    link = link_info['link']
    title, error = get_page_title(link)
    if error:
        return {'link': link, 'status': 'error', 'error': error}
    else:
        return {'link': link, 'status': 'success', 'title': title}

def main():
    with open('parsed_search_results_20240624_153002.json', 'r', encoding='utf-8') as f:
        search_results = json.load(f)

    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(test_link, item): item for item in search_results}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)

    output_filename = timestamped_filename('links_scrape_test.json')
    with open(output_filename, 'w', encoding='utf-8') as f:
        json.dump(results, f, ensure_ascii=False, indent=4)

if __name__ == "__main__":
    main()


In [None]:
import os
import json
import time
from datetime import datetime
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import simpledialog

def timestamped_filename(base_filename):
    """Generate a filename with a timestamp."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename, ext = os.path.splitext(base_filename)
    return f"{filename}_{timestamp}{ext}"

class GoogleSearchLoader:
    def __init__(self, query):
        self.query = query

    def load_and_scroll(self):
        """Load search results and scroll until at least 100 items or no new items are found."""
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(f"https://www.google.com/search?q={quote_plus(self.query)}")

        num_results = 0
        last_height = driver.execute_script("return document.body.scrollHeight")

        while num_results < 100:
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(20)  # Increased delay to allow more content to load

            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                try:
                    next_button = driver.find_element(By.CSS_SELECTOR, 'a#pnnext')
                    next_button.click()
                    time.sleep(20)  # Wait for the next page to load
                except Exception:
                    break  # No more pages to load
            last_height = new_height

            soup = BeautifulSoup(driver.page_source, 'html.parser')
            items = soup.select('.tF2Cxc')
            num_results = len(items)

        html_content = driver.page_source
        driver.quit()
        return html_content

    def parse_items(self, html_content):
        """Parse items from the HTML content."""
        soup = BeautifulSoup(html_content, 'html.parser')
        results = []

        for item in soup.select('.tF2Cxc'):
            title_element = item.select_one('.DKV0Md')
            link_element = item.select_one('.yuRUbf > a')
            snippet_element = item.select_one('.IsZvec')

            title = title_element.text if title_element else None
            link = link_element['href'] if link_element else None
            snippet = snippet_element.text if snippet_element else None

            if link:
                results.append({
                    "title": title,
                    "link": link,
                    "snippet": snippet
                })
        return results

    def save_html(self, html_content, base_filename="google_search_results.html"):
        """Save the HTML content to a timestamped file."""
        timestamped_file = timestamped_filename(base_filename)
        with open(timestamped_file, 'w', encoding='utf-8') as file:
            file.write(html_content)
        return timestamped_file

    def save_results_to_json(self, results, base_filename="parsed_search_results.json"):
        """Save the parsed results to a timestamped JSON file."""
        json_filename = timestamped_filename(base_filename)
        with open(json_filename, "w", encoding='utf-8') as json_file:
            json.dump(results, json_file, ensure_ascii=False, indent=4)
        return json_filename

def get_page_title(link):
    try:
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(link)
        title = driver.title
        driver.quit()
        return title, None
    except Exception as e:
        return None, str(e)

def test_link(link_info):
    link = link_info['link']
    title, error = get_page_title(link)
    if error:
        return {'link': link, 'status': 'error', 'error': error}
    else:
        return {'link': link, 'status': 'success', 'title': title}

def test_links_concurrently(search_results):
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(test_link, item): item for item in search_results}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
    return results

def main():
    # Create a Tkinter root window and hide it
    root = tk.Tk()
    root.withdraw()

    # Prompt the user for the search query using Tkinter
    query = simpledialog.askstring("Input", "Please enter the search query topic:")

    # Check if the user provided a query
    if query:
        loader = GoogleSearchLoader(query=query)

        # Load and scroll through the search results, then save the HTML content
        html_content = loader.load_and_scroll()
        html_filename = loader.save_html(html_content)
        print(f"HTML content saved to {html_filename}")

        # Parse the HTML content to extract search results
        search_results = loader.parse_items(html_content)
        print(f"Extracted {len(search_results)} items.")

        # Save the parsed search results to a JSON file
        json_filename = loader.save_results_to_json(search_results)
        print(f"Parsed search results saved to {json_filename}")

        # Load the parsed search results from the JSON file
        with open(json_filename, 'r', encoding='utf-8') as f:
            search_results = json.load(f)

        # Test the links concurrently and save the results
        test_results = test_links_concurrently(search_results)
        output_filename = timestamped_filename('links_scrape_test.json')
        with open(output_filename, 'w', encoding='utf-8') as f:
            json.dump(test_results, f, ensure_ascii=False, indent=4)
        print(f"Test results saved to {output_filename}")

    else:
        print("No query provided. Exiting.")

if __name__ == "__main__":
    main()


In [1]:
import os
import json
import time
from datetime import datetime
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import simpledialog

def timestamped_filename(base_filename):
    """Generate a filename with a timestamp."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename, ext = os.path.splitext(base_filename)
    return f"{filename}_{timestamp}{ext}"

class GoogleSearchLoader:
    def __init__(self, query):
        self.query = query

    def load_and_scroll(self):
        """Load search results and scroll until at least 100 items or no new items are found."""
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(f"https://www.google.com/search?q={quote_plus(self.query)}")

        num_results = 0
        last_height = driver.execute_script("return document.body.scrollHeight")
        
        while num_results < 100:
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(20)  # Increased delay to allow more content to load
            
            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                try:
                    next_button = driver.find_element(By.CSS_SELECTOR, 'a#pnnext')
                    next_button.click()
                    time.sleep(20)  # Wait for the next page to load
                    last_height = driver.execute_script("return document.body.scrollHeight")
                except Exception:
                    try:
                        more_results_button = driver.find_element(By.XPATH, '//a[contains(., "More results")]')
                        more_results_button.click()
                        time.sleep(20)  # Wait for more results to load
                        last_height = driver.execute_script("return document.body.scrollHeight")
                    except Exception:
                        print("No more pages or unable to load more results.")
                        break  # No new items or next/more results button was not found
            else:
                last_height = new_height
            
            search_items = driver.find_elements(By.CSS_SELECTOR, 'div.g')
            num_results = len(search_items)
        
        html_content = driver.page_source
        driver.quit()
        return html_content

    def parse_items(self, html_content):
        """Parse search items from HTML content using BeautifulSoup."""
        soup = BeautifulSoup(html_content, 'html.parser')
        results = []
        search_items = soup.select('div.g')  # Adjust the selector as needed
        for item in search_items:
            title_div = item.select_one('h3')
            title = title_div.text if title_div else "No title"
            link_tag = item.select_one('a')
            link = link_tag['href'] if link_tag else None
            snippet_div = item.select_one('div.kb0PBd')
            snippet = snippet_div.text if snippet_div else "No snippet available"
            if link:
                results.append({
                    "title": title,
                    "link": link,
                    "snippet": snippet
                })
        return results

    def save_html(self, html_content, base_filename="google_search_results.html"):
        """Save the HTML content to a timestamped file."""
        timestamped_file = timestamped_filename(base_filename)
        with open(timestamped_file, 'w', encoding='utf-8') as file:
            file.write(html_content)
        return timestamped_file

    def save_results_to_json(self, results, base_filename="parsed_search_results.json"):
        """Save the parsed results to a timestamped JSON file."""
        json_filename = timestamped_filename(base_filename)
        with open(json_filename, "w", encoding='utf-8') as json_file:
            json.dump(results, json_file, ensure_ascii=False, indent=4)
        return json_filename

def get_page_title(link):
    try:
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(link)
        title = driver.title
        driver.quit()
        return title, None
    except Exception as e:
        return None, str(e)

def test_link(link_info):
    link = link_info['link']
    title, error = get_page_title(link)
    if error:
        return {'link': link, 'status': 'error', 'error': error}
    else:
        return {'link': link, 'status': 'success', 'title': title}

def test_links_concurrently(search_results):
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(test_link, item): item for item in search_results}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
    return results

def main():
    # Create a Tkinter root window and hide it
    root = tk.Tk()
    root.withdraw()

    # Prompt the user for the search query using Tkinter
    query = simpledialog.askstring("Input", "Please enter the search query topic:")

    # Check if the user provided a query
    if query:
        loader = GoogleSearchLoader(query=query)
        
        # Load and scroll through the search results, then save the HTML content
        html_content = loader.load_and_scroll()
        html_filename = loader.save_html(html_content)
        print(f"HTML content saved to {html_filename}")

        # Parse the HTML content to extract search results
        search_results = loader.parse_items(html_content)
        print(f"Extracted {len(search_results)} items.")

        # Save the parsed search results to a JSON file
        json_filename = loader.save_results_to_json(search_results)
        print(f"Parsed search results saved to {json_filename}")

        # Load the parsed search results from the JSON file
        with open(json_filename, 'r', encoding='utf-8') as f:
            search_results = json.load(f)

        # Test the links concurrently and save the results
        test_results = test_links_concurrently(search_results)
        output_filename = timestamped_filename('links_scrape_test.json')
        with open(output_filename, 'w', encoding='utf-8') as f:
            json.dump(test_results, f, ensure_ascii=False, indent=4)
        print(f"Test results saved to {output_filename}")

    else:
        print("No query provided. Exiting.")

if __name__ == "__main__":
    main()


HTML content saved to google_search_results_20240624_171355.html
Extracted 101 items.
Parsed search results saved to parsed_search_results_20240624_171356.json
Test results saved to links_scrape_test_20240624_171835.json


In [1]:
import os
import json
import time
from datetime import datetime
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import simpledialog

def timestamped_filename(base_filename):
    """Generate a filename with a timestamp."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename, ext = os.path.splitext(base_filename)
    return f"{filename}_{timestamp}{ext}"

class GoogleSearchLoader:
    def __init__(self, query):
        self.query = query

    def load_and_scroll(self):
        """Load search results and scroll until at least 100 items or no new items are found."""
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(f"https://www.google.com/search?q={quote_plus(self.query)}")

        num_results = 0
        last_height = driver.execute_script("return document.body.scrollHeight")
        
        while num_results < 100:
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(20)  # Increased delay to allow more content to load
            
            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                try:
                    next_button = driver.find_element(By.CSS_SELECTOR, 'a#pnnext')
                    next_button.click()
                    time.sleep(20)  # Wait for the next page to load
                    last_height = driver.execute_script("return document.body.scrollHeight")
                except Exception:
                    try:
                        more_results_button = driver.find_element(By.XPATH, '//a[contains(., "More results")]')
                        more_results_button.click()
                        time.sleep(20)  # Wait for more results to load
                        last_height = driver.execute_script("return document.body.scrollHeight")
                    except Exception:
                        print("No more pages or unable to load more results.")
                        break  # No new items or next/more results button was not found
            else:
                last_height = new_height
            
            search_items = driver.find_elements(By.CSS_SELECTOR, 'div.g')
            num_results = len(search_items)
        
        html_content = driver.page_source
        driver.quit()
        return html_content

    def parse_items(self, html_content):
        """Parse search items from HTML content using BeautifulSoup."""
        soup = BeautifulSoup(html_content, 'html.parser')
        results = []
        search_items = soup.select('div.g')  # Adjust the selector as needed
        for item in search_items:
            title_div = item.select_one('h3')
            title = title_div.text if title_div else "No title"
            link_tag = item.select_one('a')
            link = link_tag['href'] if link_tag else None
            snippet_div = item.select_one('div.kb0PBd')
            snippet = snippet_div.text if snippet_div else "No snippet available"
            if link:
                results.append({
                    "title": title,
                    "link": link,
                    "snippet": snippet
                })
        return results

    def save_html(self, html_content, base_filename="google_search_results.html"):
        """Save the HTML content to a timestamped file."""
        timestamped_file = timestamped_filename(base_filename)
        with open(timestamped_file, 'w', encoding='utf-8') as file:
            file.write(html_content)
        return timestamped_file

    def save_results_to_json(self, results, base_filename="parsed_search_results.json"):
        """Save the parsed results to a timestamped JSON file."""
        json_filename = timestamped_filename(base_filename)
        with open(json_filename, "w", encoding='utf-8') as json_file:
            json.dump(results, json_file, ensure_ascii=False, indent=4)
        return json_filename

def get_page_title(link):
    try:
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(link)
        title = driver.title
        driver.quit()
        return title, None
    except Exception as e:
        return None, str(e)

def test_link(link_info):
    link = link_info['link']
    title, error = get_page_title(link)
    if error:
        return {'link': link, 'status': 'error', 'error': error}
    else:
        return {'link': link, 'status': 'success', 'title': title}

def test_links_concurrently(search_results):
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(test_link, item): item for item in search_results}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
    return results

def generate_report(test_results):
    success_count = sum(1 for result in test_results if result['status'] == 'success')
    error_count = len(test_results) - success_count

    report_lines = [
        "Link Scrape Test Report",
        f"Total Links Tested: {len(test_results)}",
        f"Total Successes: {success_count}",
        f"Total Errors: {error_count}",
        "",
        "Details:"
    ]

    for result in test_results:
        if result['status'] == 'success':
            report_lines.append(f"SUCCESS: {result['link']} - Title: {result['title']}")
        else:
            report_lines.append(f"ERROR: {result['link']} - Error: {result['error']}")

    return "\n".join(report_lines)

def main():
    # Create a Tkinter root window and hide it
    root = tk.Tk()
    root.withdraw()

    # Prompt the user for the search query using Tkinter
    query = simpledialog.askstring("Input", "Please enter the search query topic:")

    # Check if the user provided a query
    if query:
        loader = GoogleSearchLoader(query=query)
        
        # Load and scroll through the search results, then save the HTML content
        html_content = loader.load_and_scroll()
        html_filename = loader.save_html(html_content)
        print(f"HTML content saved to {html_filename}")

        # Parse the HTML content to extract search results
        search_results = loader.parse_items(html_content)
        print(f"Extracted {len(search_results)} items.")

        # Save the parsed search results to a JSON file
        json_filename = loader.save_results_to_json(search_results)
        print(f"Parsed search results saved to {json_filename}")

        # Load the parsed search results from the JSON file
        with open(json_filename, 'r', encoding='utf-8') as f:
            search_results = json.load(f)

        # Test the links concurrently and save the results
        test_results = test_links_concurrently(search_results)
        output_filename = timestamped_filename('links_scrape_test.json')
        with open(output_filename, 'w', encoding='utf-8') as f:
            json.dump(test_results, f, ensure_ascii=False, indent=4)
        print(f"Test results saved to {output_filename}")

        # Generate the report
        report = generate_report(test_results)
        report_filename = timestamped_filename('links_scrape_report.txt')
        with open(report_filename, 'w', encoding='utf-8') as f:
            f.write(report)
        print(f"Report saved to {report_filename}")

    else:
        print("No query provided. Exiting.")

if __name__ == "__main__":
    main()


HTML content saved to google_search_results_20240624_173846.html
Extracted 102 items.
Parsed search results saved to parsed_search_results_20240624_173847.json
Test results saved to links_scrape_test_20240624_174440.json
Report saved to links_scrape_report_20240624_174440.txt


In [1]:
import requests
from bs4 import BeautifulSoup

def can_be_scraped(url, expected_element, element_attribute=None, attribute_value=None):
    try:
        response = requests.get(url)
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            if element_attribute and attribute_value:
                elements = soup.find_all(expected_element, {element_attribute: attribute_value})
            else:
                elements = soup.find_all(expected_element)
            return len(elements) > 0
        else:
            print(f"Failed to retrieve webpage. Status code: {response.status_code}")
            return False
    except Exception as e:
        print(f"An error occurred: {e}")
        return False

In [5]:
import requests
from bs4 import BeautifulSoup

def can_be_scraped(url, expected_element, element_attribute=None, attribute_value=None):
    try:
        response = requests.get(url)
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Check for the expected element
            if element_attribute and attribute_value:
                elements = soup.find_all(expected_element, {element_attribute: attribute_value})
            else:
                elements = soup.find_all(expected_element)
            
            if len(elements) > 0:
                # Extract metadata if the scrape test passes
                metadata = {
                    'title': soup.find('title').get_text() if soup.find('title') else None,
                    'description': None,
                    'keywords': None
                }

                description_tag = soup.find('meta', attrs={'name': 'description'})
                if description_tag and 'content' in description_tag.attrs:
                    metadata['description'] = description_tag['content']

                keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
                if keywords_tag and 'content' in keywords_tag.attrs:
                    metadata['keywords'] = keywords_tag['content']

                return True, metadata
            else:
                return False, {}
        else:
            print(f"Failed to retrieve webpage. Status code: {response.status_code}")
            return False, {}
    except Exception as e:
        print(f"An error occurred: {e}")
        return False, {}

# Example usage
url = 'https://https://finance.yahoo.com'
expected_element = 'h1'
element_attribute = 'class'
attribute_value = 'header'

can_scrape, metadata = can_be_scraped(url, expected_element, element_attribute, attribute_value)
if can_scrape:
    print("Webpage can be scraped. Metadata:")
    print(metadata)
else:
    print("Webpage cannot be scraped.")


An error occurred: HTTPSConnectionPool(host='https', port=443): Max retries exceeded with url: /finance.yahoo.com (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x0000016433F68F50>: Failed to resolve 'https' ([Errno 11001] getaddrinfo failed)"))
Webpage cannot be scraped.


In [6]:
import os
import json
import time
from datetime import datetime
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import simpledialog

def timestamped_filename(base_filename):
    """Generate a filename with a timestamp."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename, ext = os.path.splitext(base_filename)
    return f"{filename}_{timestamp}{ext}"

class GoogleSearchLoader:
    def __init__(self, query):
        self.query = query

    def load_and_scroll(self):
        """Load search results and scroll until at least 100 items or no new items are found."""
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(f"https://www.google.com/search?q={quote_plus(self.query)}")

        num_results = 0
        last_height = driver.execute_script("return document.body.scrollHeight")
        
        while num_results < 100:
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(20)  # Increased delay to allow more content to load
            
            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                try:
                    next_button = driver.find_element(By.CSS_SELECTOR, 'a#pnnext')
                    next_button.click()
                    time.sleep(20)  # Wait for the next page to load
                    last_height = driver.execute_script("return document.body.scrollHeight")
                except Exception:
                    try:
                        more_results_button = driver.find_element(By.XPATH, '//a[contains(., "More results")]')
                        more_results_button.click()
                        time.sleep(20)  # Wait for more results to load
                        last_height = driver.execute_script("return document.body.scrollHeight")
                    except Exception:
                        print("No more pages or unable to load more results.")
                        break  # No new items or next/more results button was not found
            else:
                last_height = new_height
            
            search_items = driver.find_elements(By.CSS_SELECTOR, 'div.g')
            num_results = len(search_items)
        
        html_content = driver.page_source
        driver.quit()
        return html_content

    def parse_items(self, html_content):
        """Parse search items from HTML content using BeautifulSoup."""
        soup = BeautifulSoup(html_content, 'html.parser')
        results = []
        search_items = soup.select('div.g')  # Adjust the selector as needed
        for item in search_items:
            title_div = item.select_one('h3')
            title = title_div.text if title_div else "No title"
            link_tag = item.select_one('a')
            link = link_tag['href'] if link_tag else None
            snippet_div = item.select_one('div.kb0PBd')
            snippet = snippet_div.text if snippet_div else "No snippet available"
            if link:
                results.append({
                    "title": title,
                    "link": link,
                    "snippet": snippet
                })
        return results

    def save_html(self, html_content, base_filename="google_search_results.html"):
        """Save the HTML content to a timestamped file."""
        timestamped_file = timestamped_filename(base_filename)
        with open(timestamped_file, 'w', encoding='utf-8') as file:
            file.write(html_content)
        return timestamped_file

    def save_results_to_json(self, results, base_filename="parsed_search_results.json"):
        """Save the parsed results to a timestamped JSON file."""
        json_filename = timestamped_filename(base_filename)
        with open(json_filename, "w", encoding='utf-8') as json_file:
            json.dump(results, json_file, ensure_ascii=False, indent=4)
        return json_filename

def get_page_metadata(link):
    try:
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service)
        driver.get(link)
        soup = BeautifulSoup(driver.page_source, 'html.parser')
        title = driver.title

        metadata = {
            "title": title,
            "description": "",
            "keywords": ""
        }

        description_tag = soup.find('meta', attrs={'name': 'description'})
        if description_tag:
            metadata["description"] = description_tag.get('content', '')

        keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
        if keywords_tag:
            metadata["keywords"] = keywords_tag.get('content', '')

        driver.quit()
        return metadata, None
    except Exception as e:
        return None, str(e)

def test_link(link_info):
    link = link_info['link']
    metadata, error = get_page_metadata(link)
    if error:
        return {'link': link, 'status': 'error', 'error': error}
    else:
        return {'link': link, 'status': 'success', 'metadata': metadata}

def test_links_concurrently(search_results):
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(test_link, item): item for item in search_results}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
    return results

def generate_report(test_results):
    success_count = sum(1 for result in test_results if result['status'] == 'success')
    error_count = len(test_results) - success_count

    report_lines = [
        "Link Scrape Test Report",
        f"Total Links Tested: {len(test_results)}",
        f"Total Successes: {success_count}",
        f"Total Errors: {error_count}",
        "",
        "Details:"
    ]

    for result in test_results:
        if result['status'] == 'success':
            report_lines.append(f"SUCCESS: {result['link']} - Metadata: {result['metadata']}")
        else:
            report_lines.append(f"ERROR: {result['link']} - Error: {result['error']}")

    return "\n".join(report_lines)

def main():
    # Create a Tkinter root window and hide it
    root = tk.Tk()
    root.withdraw()

    # Prompt the user for the search query using Tkinter
    query = simpledialog.askstring("Input", "Please enter the search query topic:")

    # Check if the user provided a query
    if query:
        loader = GoogleSearchLoader(query=query)
        
        # Load and scroll through the search results, then save the HTML content
        html_content = loader.load_and_scroll()
        html_filename = loader.save_html(html_content)
        print(f"HTML content saved to {html_filename}")

        # Parse the HTML content to extract search results
        search_results = loader.parse_items(html_content)
        print(f"Extracted {len(search_results)} items.")

        # Save the parsed search results to a JSON file
        json_filename = loader.save_results_to_json(search_results)
        print(f"Parsed search results saved to {json_filename}")

        # Load the parsed search results from the JSON file
        with open(json_filename, 'r', encoding='utf-8') as f:
            search_results = json.load(f)

        # Test the links concurrently and save the results
        test_results = test_links_concurrently(search_results)
        output_filename = timestamped_filename('links_scrape_test.json')
        with open(output_filename, 'w', encoding='utf-8') as f:
            json.dump(test_results, f, ensure_ascii=False, indent=4)
        print(f"Test results saved to {output_filename}")

        # Generate the report
        report = generate_report(test_results)
        report_filename = timestamped_filename('links_scrape_report.txt')
        with open(report_filename, 'w', encoding='utf-8') as f:
            f.write(report)
        print(f"Report saved to {report_filename}")

    else:
        print("No query provided. Exiting.")

if __name__ == "__main__":
    main()


No more pages or unable to load more results.


WebDriverException: Message: disconnected: not connected to DevTools
  (failed to check if window was closed: disconnected: not connected to DevTools)
  (Session info: chrome=126.0.6478.127)
Stacktrace:
	GetHandleVerifier [0x0083C1C3+27395]
	(No symbol) [0x007D3DC4]
	(No symbol) [0x006D1B7F]
	(No symbol) [0x006BC8E8]
	(No symbol) [0x006BC809]
	(No symbol) [0x006D3DB0]
	(No symbol) [0x0074C3F6]
	(No symbol) [0x00733736]
	(No symbol) [0x00707541]
	(No symbol) [0x007080BD]
	GetHandleVerifier [0x00AF3A93+2876371]
	GetHandleVerifier [0x00B47F5D+3221661]
	GetHandleVerifier [0x008BD634+556916]
	GetHandleVerifier [0x008C474C+585868]
	(No symbol) [0x007DCE04]
	(No symbol) [0x007D9818]
	(No symbol) [0x007D99B7]
	(No symbol) [0x007CBF0E]
	BaseThreadInitThunk [0x76857BA9+25]
	RtlInitializeExceptionChain [0x7788C10B+107]
	RtlClearBits [0x7788C08F+191]


In [1]:
import os
import json
import time
from datetime import datetime
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import WebDriverException
from webdriver_manager.chrome import ChromeDriverManager
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import simpledialog

def timestamped_filename(base_filename):
    """Generate a filename with a timestamp."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename, ext = os.path.splitext(base_filename)
    return f"{filename}_{timestamp}{ext}"

class GoogleSearchLoader:
    def __init__(self, query):
        self.query = query

    def load_and_scroll(self, retries=3):
        """Load search results and scroll until at least 100 items or no new items are found."""
        attempt = 0
        while attempt < retries:
            try:
                options = Options()
                options.add_argument("--headless")  # Run Chrome in headless mode
                service = Service(ChromeDriverManager().install())
                driver = webdriver.Chrome(service=service, options=options)
                driver.set_page_load_timeout(60)  # Set a timeout for loading pages
                driver.get(f"https://www.google.com/search?q={quote_plus(self.query)}")

                num_results = 0
                last_height = driver.execute_script("return document.body.scrollHeight")
                
                while num_results < 100:
                    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                    time.sleep(20)  # Increased delay to allow more content to load
                    
                    new_height = driver.execute_script("return document.body.scrollHeight")
                    if new_height == last_height:
                        try:
                            next_button = driver.find_element(By.CSS_SELECTOR, 'a#pnnext')
                            next_button.click()
                            time.sleep(20)  # Wait for the next page to load
                            last_height = driver.execute_script("return document.body.scrollHeight")
                        except Exception:
                            try:
                                more_results_button = driver.find_element(By.XPATH, '//a[contains(., "More results")]')
                                more_results_button.click()
                                time.sleep(20)  # Wait for more results to load
                                last_height = driver.execute_script("return document.body.scrollHeight")
                            except Exception:
                                print("No more pages or unable to load more results.")
                                break  # No new items or next/more results button was not found
                    else:
                        last_height = new_height
                    
                    search_items = driver.find_elements(By.CSS_SELECTOR, 'div.g')
                    num_results = len(search_items)
                
                html_content = driver.page_source
                driver.quit()
                return html_content
            except WebDriverException as e:
                print(f"Attempt {attempt + 1} failed: {e}")
                attempt += 1
                time.sleep(10)  # Wait before retrying

        raise WebDriverException("Failed to load and scroll Google search results after multiple attempts.")

    def parse_items(self, html_content):
        """Parse search items from HTML content using BeautifulSoup."""
        soup = BeautifulSoup(html_content, 'html.parser')
        results = []
        search_items = soup.select('div.g')  # Adjust the selector as needed
        for item in search_items:
            title_div = item.select_one('h3')
            title = title_div.text if title_div else "No title"
            link_tag = item.select_one('a')
            link = link_tag['href'] if link_tag else None
            snippet_div = item.select_one('div.kb0PBd')
            snippet = snippet_div.text if snippet_div else "No snippet available"
            if link:
                results.append({
                    "title": title,
                    "link": link,
                    "snippet": snippet
                })
        return results

    def save_html(self, html_content, base_filename="google_search_results.html"):
        """Save the HTML content to a timestamped file."""
        timestamped_file = timestamped_filename(base_filename)
        with open(timestamped_file, 'w', encoding='utf-8') as file:
            file.write(html_content)
        return timestamped_file

    def save_results_to_json(self, results, base_filename="parsed_search_results.json"):
        """Save the parsed results to a timestamped JSON file."""
        json_filename = timestamped_filename(base_filename)
        with open(json_filename, "w", encoding='utf-8') as json_file:
            json.dump(results, json_file, ensure_ascii=False, indent=4)
        return json_filename

def get_page_metadata(link):
    try:
        options = Options()
        options.add_argument("--headless")  # Run Chrome in headless mode
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service, options=options)
        driver.set_page_load_timeout(60)  # Set a timeout for loading pages
        driver.get(link)
        soup = BeautifulSoup(driver.page_source, 'html.parser')
        title = driver.title

        metadata = {
            "title": title,
            "description": "",
            "keywords": ""
        }

        description_tag = soup.find('meta', attrs={'name': 'description'})
        if description_tag:
            metadata["description"] = description_tag.get('content', '')

        keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
        if keywords_tag:
            metadata["keywords"] = keywords_tag.get('content', '')

        driver.quit()
        return metadata, None
    except WebDriverException as e:
        return None, str(e)
    except Exception as e:
        return None, str(e)

def test_link(link_info):
    link = link_info['link']
    metadata, error = get_page_metadata(link)
    if error:
        return {'link': link, 'status': 'error', 'error': error}
    else:
        return {'link': link, 'status': 'success', 'metadata': metadata}

def test_links_concurrently(search_results):
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(test_link, item): item for item in search_results}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
    return results

def generate_report(test_results):
    success_count = sum(1 for result in test_results if result['status'] == 'success')
    error_count = len(test_results) - success_count

    report_lines = [
        "Link Scrape Test Report",
        f"Total Links Tested: {len(test_results)}",
        f"Total Successes: {success_count}",
        f"Total Errors: {error_count}",
        "",
        "Details:"
    ]

    for result in test_results:
        if result['status'] == 'success':
            report_lines.append(f"SUCCESS: {result['link']} - Metadata: {result['metadata']}")
        else:
            report_lines.append(f"ERROR: {result['link']} - Error: {result['error']}")

    return "\n".join(report_lines)

def main():
    # Create a Tkinter root window and hide it
    root = tk.Tk()
    root.withdraw()

    # Prompt the user for the search query using Tkinter
    query = simpledialog.askstring("Input", "Please enter the search query topic:")

    # Check if the user provided a query
    if query:
        loader = GoogleSearchLoader(query=query)
        
        # Load and scroll through the search results, then save the HTML content
        html_content = loader.load_and_scroll()
        html_filename = loader.save_html(html_content)
        print(f"HTML content saved to {html_filename}")

        # Parse the HTML content to extract search results
        search_results = loader.parse_items(html_content)
        print(f"Extracted {len(search_results)} items.")

        # Save the parsed search results to a JSON file
        json_filename = loader.save_results_to_json(search_results)
        print(f"Parsed search results saved to {json_filename}")

        # Load the parsed search results from the JSON file
        with open(json_filename, 'r', encoding='utf-8') as f:
            search_results = json.load(f)

        # Test the links concurrently and save the results
        test_results = test_links_concurrently(search_results)
        output_filename = timestamped_filename('links_scrape_test.json')
        with open(output_filename, 'w', encoding='utf-8') as f:
            json.dump(test_results, f, ensure_ascii=False, indent=4)
        print(f"Test results saved to {output_filename}")

        # Generate the report
        report = generate_report(test_results)
        report_filename = timestamped_filename('links_scrape_report.txt')
        with open(report_filename, 'w', encoding='utf-8') as f:
            f.write(report)
        print(f"Report saved to {report_filename}")

    else:
        print("No query provided. Exiting.")

if __name__ == "__main__":
    main()


No more pages or unable to load more results.
HTML content saved to google_search_results_20240701_171326.html
Extracted 9 items.
Parsed search results saved to parsed_search_results_20240701_171326.json
Test results saved to links_scrape_test_20240701_171342.json
Report saved to links_scrape_report_20240701_171342.txt


In [1]:
import os
import json
import time
from datetime import datetime
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import WebDriverException
from webdriver_manager.chrome import ChromeDriverManager
from concurrent.futures import ThreadPoolExecutor, as_completed
import tkinter as tk
from tkinter import simpledialog

def timestamped_filename(base_filename):
    """Generate a filename with a timestamp."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename, ext = os.path.splitext(base_filename)
    return f"{filename}_{timestamp}{ext}"

class GoogleSearchLoader:
    def __init__(self, query):
        self.query = query

    def load_and_scroll(self, retries=3):
        """Load search results and scroll until at least 100 items or no new items are found."""
        attempt = 0
        while attempt < retries:
            try:
                options = Options()
                options.add_argument("--headless")  # Run Chrome in headless mode
                service = Service(ChromeDriverManager().install())
                driver = webdriver.Chrome(service=service, options=options)
                driver.set_page_load_timeout(60)  # Set a timeout for loading pages
                driver.get(f"https://www.google.com/search?q={quote_plus(self.query)}")

                num_results = 0
                last_height = driver.execute_script("return document.body.scrollHeight")
                
                while num_results < 100:
                    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                    time.sleep(20)  # Increased delay to allow more content to load
                    
                    new_height = driver.execute_script("return document.body.scrollHeight")
                    if new_height == last_height:
                        try:
                            next_button = driver.find_element(By.CSS_SELECTOR, 'a#pnnext')
                            next_button.click()
                            time.sleep(20)  # Wait for the next page to load
                            last_height = driver.execute_script("return document.body.scrollHeight")
                        except Exception:
                            try:
                                more_results_button = driver.find_element(By.XPATH, '//a[contains(., "More results")]')
                                more_results_button.click()
                                time.sleep(20)  # Wait for more results to load
                                last_height = driver.execute_script("return document.body.scrollHeight")
                            except Exception:
                                print("No more pages or unable to load more results.")
                                break  # No new items or next/more results button was not found
                    else:
                        last_height = new_height
                    
                    search_items = driver.find_elements(By.CSS_SELECTOR, 'div.g')
                    num_results = len(search_items)
                
                html_content = driver.page_source
                driver.quit()
                return html_content
            except WebDriverException as e:
                print(f"Attempt {attempt + 1} failed: {e}")
                attempt += 1
                time.sleep(10)  # Wait before retrying

        raise WebDriverException("Failed to load and scroll Google search results after multiple attempts.")

    def parse_items(self, html_content):
        """Parse search items from HTML content using BeautifulSoup."""
        soup = BeautifulSoup(html_content, 'html.parser')
        results = []
        search_items = soup.select('div.g')  # Adjust the selector as needed
        for item in search_items:
            title_div = item.select_one('h3')
            title = title_div.text if title_div else "No title"
            link_tag = item.select_one('a')
            link = link_tag['href'] if link_tag else None
            snippet_div = item.select_one('div.kb0PBd')
            snippet = snippet_div.text if snippet_div else "No snippet available"
            if link:
                results.append({
                    "title": title,
                    "link": link,
                    "snippet": snippet
                })
        return results

    def save_html(self, html_content, base_filename="google_search_results.html"):
        """Save the HTML content to a timestamped file."""
        timestamped_file = timestamped_filename(base_filename)
        with open(timestamped_file, 'w', encoding='utf-8') as file:
            file.write(html_content)
        return timestamped_file

    def save_results_to_json(self, results, base_filename="parsed_search_results.json"):
        """Save the parsed results to a timestamped JSON file."""
        json_filename = timestamped_filename(base_filename)
        with open(json_filename, "w", encoding='utf-8') as json_file:
            json.dump(results, json_file, ensure_ascii=False, indent=4)
        return json_filename

def get_page_metadata(link):
    try:
        options = Options()
        options.add_argument("--headless")  # Run Chrome in headless mode
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service, options=options)
        driver.set_page_load_timeout(60)  # Set a timeout for loading pages
        driver.get(link)
        soup = BeautifulSoup(driver.page_source, 'html.parser')
        title = driver.title

        metadata = {
            "title": title,
            "description": "",
            "keywords": ""
        }

        description_tag = soup.find('meta', attrs={'name': 'description'})
        if description_tag:
            metadata["description"] = description_tag.get('content', '')

        keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
        if keywords_tag:
            metadata["keywords"] = keywords_tag.get('content', '')

        driver.quit()
        return metadata, None
    except WebDriverException as e:
        return None, str(e)
    except Exception as e:
        return None, str(e)

def test_link(link_info):
    link = link_info['link']
    metadata, error = get_page_metadata(link)
    if error:
        return {'link': link, 'status': 'error', 'error': error}
    else:
        return {'link': link, 'status': 'success', 'metadata': metadata}

def test_links_concurrently(search_results):
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(test_link, item): item for item in search_results}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
    return results

def generate_report(test_results, query):
    success_count = sum(1 for result in test_results if result['status'] == 'success')
    error_count = len(test_results) - success_count

    report_lines = [
        "Link Scrape Test Report",
        f"Search Query: {query}",
        f"Total Links Tested: {len(test_results)}",
        f"Total Successes: {success_count}",
        f"Total Errors: {error_count}",
        "",
        "Details:"
    ]

    for result in test_results:
        if result['status'] == 'success':
            report_lines.append(f"SUCCESS: {result['link']} - Metadata: {result['metadata']}")
        else:
            report_lines.append(f"ERROR: {result['link']} - Error: {result['error']}")

    return "\n".join(report_lines)

def main():
    # Create a Tkinter root window and hide it
    root = tk.Tk()
    root.withdraw()

    # Prompt the user for the search query using Tkinter
    query = simpledialog.askstring("Input", "Please enter the search query topic:")

    # Check if the user provided a query
    if query:
        loader = GoogleSearchLoader(query=query)
        
        # Load and scroll through the search results, then save the HTML content
        html_content = loader.load_and_scroll()
        html_filename = loader.save_html(html_content)
        print(f"HTML content saved to {html_filename}")

        # Parse the HTML content to extract search results
        search_results = loader.parse_items(html_content)
        print(f"Extracted {len(search_results)} items.")

        # Save the parsed search results to a JSON file
        json_filename = loader.save_results_to_json(search_results)
        print(f"Parsed search results saved to {json_filename}")

        # Load the parsed search results from the JSON file
        with open(json_filename, 'r', encoding='utf-8') as f:
            search_results = json.load(f)

        # Test the links concurrently and save the results
        test_results = test_links_concurrently(search_results)
        output_filename = timestamped_filename('links_scrape_test.json')
        with open(output_filename, 'w', encoding='utf-8') as f:
            json.dump(test_results, f, ensure_ascii=False, indent=4)
        print(f"Test results saved to {output_filename}")

        # Generate the report
        report = generate_report(test_results, query)
        report_filename = timestamped_filename('links_scrape_report.txt')
        with open(report_filename, 'w', encoding='utf-8') as f:
            f.write(report)
        print(f"Report saved to {report_filename}")

    else:
        print("No query provided. Exiting.")

if __name__ == "__main__":
    main()


No more pages or unable to load more results.
HTML content saved to google_search_results_20240702_185202.html
Extracted 10 items.
Parsed search results saved to parsed_search_results_20240702_185202.json
Test results saved to links_scrape_test_20240702_185217.json
Report saved to links_scrape_report_20240702_185217.txt
