In [None]:
# This notebook scrapes data from https://clb.org.hk/zh-hans using Google Search

In [None]:
# !pip install selenium

In [None]:
import requests
from requests.adapters import HTTPAdapter, Retry
import time
import json
from pathlib import Path
import os
import pandas as pd
from bs4 import BeautifulSoup
from tqdm.notebook import tqdm
import re 

In [None]:
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException

# Set up the WebDriver (for Chrome)
driver = webdriver.Chrome()

# Perform a Google search 
def google_search(query):
    driver.get("https://www.google.com")
    search_box = driver.find_element("name", "q")
    search_box.send_keys(query)
    search_box.send_keys(Keys.RETURN)
    time.sleep(3)  # Wait for the results to load

# Scrape results from the current page
def scrape_results():
    results = driver.find_elements("css selector", "div.g")  # CSS selector for each result block
    links = []
    for result in results:
        try:
            title = result.find_element("tag name", "h3").text
            link = result.find_element("tag name", "a").get_attribute("href")
            links.append((title, link))
        except NoSuchElementException:
            continue  # Skip any incomplete results (e.g., ads or non-standard elements)
    return links

# Navigate to the next page
def go_to_next_page():
    try:
        next_button = driver.find_element("id", "pnnext")  # This is Google's Next button ID
        next_button.click()
        time.sleep(3)  # Wait for the next page to load
        return True
    except NoSuchElementException:
        return False  # No more pages

# Full scraping process for multiple pages
def scrape_all_pages(query):
    google_search(query)
    all_results = []
    
    while True:
        results = scrape_results()
        all_results.extend(results)
        
        if not go_to_next_page():  # Check if there's a next page
            break

    return all_results

# Save the results to Excel
def save_to_excel(data, filename="../CLB/search_results.xlsx"):
    # Convert the list of DataFrames to a single DataFrame
    df = pd.concat(data, ignore_index=True)
    # Save to Excel
    df.to_excel(filename, index=False)

# Use multiple keywords
keyword_root = ['雇佣黑社会', '带领黑社会', '组织黑社会', '指使黑社会', '勾结黑社会', '安排黑社会', '聘请黑社会', '打手', '小混混', '闲散人员', '地痞流氓']

# Prepend 'site:clb.org.hk ' to each keyword
keywords = [f"site:clb.org.hk {keyword}" for keyword in keyword_root]

all_search_results = []

for keyword in keywords:
    print(f"Scraping results for: {keyword}")
    search_results = scrape_all_pages(keyword)
    
    # Add the keyword to each result and convert to a DataFrame
    search_results_with_keyword = [(keyword, title, link) for title, link in search_results]
    df = pd.DataFrame(search_results_with_keyword, columns=["Keyword", "Title", "URL"])
    
    all_search_results.append(df)

# Save the combined results for all keywords into an Excel file
save_to_excel(all_search_results, "../CLB/multi_keyword_search_results.xlsx")

# Close the browser when done
driver.quit()


In [None]:
# Download webpages with links

class Downloader_new:
    def __init__(self):
        self.s = requests.session()
        retries = Retry(total=5,
                        backoff_factor=0.1,
                        status_forcelist=[ 500, 502, 503, 504 ])
        self.s.mount('http://', HTTPAdapter(max_retries=retries))
        self.s.mount('https://', HTTPAdapter(max_retries=retries))
    
    def get(self, url):       
        resp = self.s.get(url,  
                          headers={
                                "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
                                "accept-encoding": "gzip, deflate, br, zstd",
                                "accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
                                "cache-control": "max-age=0",
                                "priority": "u=0, i",
                                "sec-ch-ua": "\"Google Chrome\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"",
                                "sec-ch-ua-mobile": "?0",
                                "sec-ch-ua-platform": "\"macOS\"",
                                "sec-fetch-dest": "document",
                                "sec-fetch-mode": "navigate",
                                "sec-fetch-site": "none",
                                "sec-fetch-user": "?1",
                                "upgrade-insecure-requests": "1",
                                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
                            },
                            cookies={
                                "_ga": "GA1.1.260611918.1729007018",
                                "gdpr_compliance": "agreed",
                                "_ga_JDSZ292J02": "GS1.1.1729525109.3.1.1729526164.60.0.0",
                                "_ga_LM1DL45R6M": "GS1.1.1729625385.8.1.1729625418.27.0.0",
                            })

        return resp.content

# all_search_results = pd.concat(all_search_results, ignore_index=True)
all_search_results = pd.concat(all_search_results)
all_url = list(all_search_results['URL'])

# Sanitize URL to create a safe filename
def sanitize_url(url):
    return re.sub(r'[\\/*?:"<>|]', "", url)
    
def main():
    d = Downloader_new()
    
    # Ensure the output directory exists
    output_dir = Path('../CLB/all/')
    output_dir.mkdir(parents=True, exist_ok=True)
    
    for url in tqdm(all_url):
        safe_filename = sanitize_url(url)[:100]  # Ensure the filename is valid and not too long
        outf_new = output_dir / f'{safe_filename}.html'
        
        if not outf_new.exists():
            try:
                result = d.get(url).decode('utf-8')
                time.sleep(0.1)
                
                # Write HTML content to file
                with open(outf_new, 'w', encoding='utf-8') as f:
                    f.write(result)
                print(f"Downloaded: {outf_new}")
                
            except Exception as e:
                print(f"Failed to download {url}: {e}")
        else:
            print(f"Already exists: {outf_new}")

main()

In [None]:
# parse information from html using BeautifulSoup


def parse_blog_info(data):
    soup = BeautifulSoup(data, 'html.parser')
    
    # Extract URL
    link_tag = soup.find('link', rel='alternate')
    if link_tag and link_tag.has_attr('href'):
        url = link_tag['href']
    else:
        url = "URL not found"
    
    # Extract posted date
    date_div = soup.find('div', class_='author')
    if date_div:
        date = date_div.get_text(strip=True)
    else:
        date = "Date not found"
    
    # Extract title
    title_tag = soup.find('title')
    if title_tag:
        title = title_tag.get_text()
    else:
        title = "Title not found"

    # Extract article content
    content_div = soup.find('div', class_="field field--name-body field--type-text-with-summary field--label-hidden field--item")
    if content_div:
        content = content_div.get_text(separator="\n").strip()
    else:
        content = "Content not found"

    # Construct the tweet object with the relevant data
    tweet = {
        "blogid": url,  # Blog id
        "posted_date": date,  # Article posted time
        "title": title,  # Extracted title
        "content": content  # Extracted content
    }

    return tweet





In [None]:
# Define the directory containing the files
directory = "../CLB/all/"

# Initialize a list to hold all parsed data
parsed_data = []


# Loop through all files in the directory
for filename in tqdm(os.listdir(directory)):
    file_path = os.path.join(directory, filename)

    # Skip if it's a directory or a checkpoint directory
    if os.path.isdir(file_path) or '.ipynb_checkpoints' in filename:
        continue
        
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
        try:
            tweet_info = parse_blog_info(content)
        except:
            print('failed', filename)
            import pprint
            # pprint.pprint(data)
            raise
        # Append the parsed data to the list
        parsed_data.append(tweet_info)

# Convert the list of dictionaries into a DataFrame
df = pd.DataFrame(parsed_data)
df

In [None]:
df.to_excel("../CLB/all_posts_clb.xlsx", index=False)