In [1]:
import time
import random
import string
import re
import os
import pandas as pd
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import chromedriver_autoinstaller
from tqdm import tqdm


def generate_session_id(length=10):
    """Generate a random session ID consisting of lowercase letters and digits."""
    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))


def clean_subdistrict(subdistrict):
    """
    Clean the subdistrict string to generate a URL-friendly slug.
    Any sequence of non-alphanumeric characters is replaced by a hyphen.
    The result is lowercased and stripped of extra hyphens.
    """
    cleaned = re.sub(r'[^A-Za-z0-9]+', '-', subdistrict)
    return cleaned.strip('-').lower()


def initialize_driver():
    """
    Initializes ChromeDriver with custom options including headless mode.
    chromedriver_autoinstaller installs the correct version if needed.
    """
    chromedriver_autoinstaller.install()
    options = webdriver.ChromeOptions()
    options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                         "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.6943.127 Safari/537.36")
    options.add_argument("--ignore-certificate-errors")
    options.add_argument("--disable-extensions")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--headless")  # Enable headless mode for background execution
    return webdriver.Chrome(options=options)


def random_sleep(min_delay=1, max_delay=3):
    """Pause execution for a random duration between min_delay and max_delay seconds."""
    time.sleep(random.uniform(min_delay, max_delay))


def scroll_down(driver):
    """Scrolls down to the bottom of the page to trigger lazy-loaded content."""
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    random_sleep()


def extract_estate_data(driver):
    """
    Extracts estate information from the current page.

    The expected DOM structure is:
      - Name and Address are inside <div class="flex f-dir-col basic-info">
      - Other details are inside <div class="flex basic-data hidden-xs-only">
            Blocks, Units, Unit Rate, MoM, Trans Record, For Sale, For Rent.
      - Estate link is inside <a class="property-text flex def-property-box"> via href attribute.
    """
    data = []
    try:
        estate_items = WebDriverWait(driver, 20).until(
            EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a.property-text.flex.def-property-box"))
        )
        for item in estate_items:
            try:
                # Extract estate link from anchor tag
                estate_link = item.get_attribute("href")

                # Extract name and address from basic-info section
                name = item.find_element(By.CSS_SELECTOR, "div.main-text").text.strip()
                address = item.find_element(By.CSS_SELECTOR, "div.address.f-middle").text.strip()

                # Extract other details from basic-data section
                blocks = item.find_element(By.XPATH, ".//div[contains(text(), 'No. of Block(s)')]/following-sibling::div").text.strip()
                units = item.find_element(By.XPATH, ".//div[contains(text(), 'No. of Units')]/following-sibling::div").text.strip()
                unit_rate = item.find_element(By.XPATH, ".//div[contains(text(), 'Unit Rate of Saleable Area')]/following-sibling::div").text.strip()
                mom = item.find_element(By.XPATH, ".//div[contains(text(), 'MoM')]/following-sibling::div").text.strip()
                trans_record = item.find_element(By.XPATH, ".//div[contains(text(), 'Trans. Record')]/following-sibling::div").text.strip()
                for_sale = item.find_element(By.XPATH, ".//div[contains(text(), 'For Sale')]/following-sibling::div").text.strip()
                for_rent = item.find_element(By.XPATH, ".//div[contains(text(), 'For Rent')]/following-sibling::div").text.strip()

                data.append([name, address, blocks, units, unit_rate, mom, trans_record, for_sale, for_rent, estate_link])
            except Exception:
                continue  # Skip item if any field fails to extract
    except Exception:
        pass  # Skip page if no estate items found
    return data


def main():
    # Base URL for the estate listings.
    base_url = "https://hk.centanet.com/findproperty/en/list/estate"
    
    # Read area codes from the Excel file.
    try:
        area_df = pd.read_excel("Centanet_Res_Area_Code.xlsx", engine="openpyxl")
    except Exception as e:
        print("Error reading Centanet_Res_Area_Code.xlsx:", e)
        return

    driver = initialize_driver()
    file_path = f"{datetime.today().strftime('%Y-%m-%d')}_centanet_estates.csv"
    
    # Remove existing CSV file if exists.
    if os.path.exists(file_path):
        os.remove(file_path)
    
    try:
        # Iterate over each area with a progress bar.
        for idx, row in tqdm(area_df.iterrows(), total=area_df.shape[0], desc="Processing areas"):
            region = row["Region"]
            district = row["District"]
            subdistrict = row["Subdistrict"]
            code = row["Code"]
            subdistrict_part = clean_subdistrict(subdistrict)
            session_id = generate_session_id()
            area_url = f"{base_url}/{subdistrict_part}_19-{code}?q={session_id}"
            driver.get(area_url)
            #random_sleep()

            current_page = 1
            area_rows = []
            while True:
                scroll_down(driver)
                page_data = extract_estate_data(driver)
                if page_data:
                    for row_data in page_data:
                        area_rows.append(row_data + [region, district, subdistrict, code])
                else:
                    break  # Exit loop if no data found on this page

                try:
                    next_button = WebDriverWait(driver, 10).until(
                        EC.element_to_be_clickable((By.CSS_SELECTOR, "button.btn-next:not([disabled])"))
                    )
                    driver.execute_script("arguments[0].scrollIntoView(true);", next_button)
                    driver.execute_script("arguments[0].click();", next_button)
                    random_sleep()
                    current_page += 1
                except Exception:
                    break  # Exit loop if no next page button found

            if area_rows:
                df = pd.DataFrame(area_rows,
                                  columns=["Name", "Address", "Blocks", "Units", "Unit Rate", "MoM", "Trans Record",
                                           "For Sale", "For Rent", "Estate Link", "Region", "District", "Subdistrict",
                                           "Code"])
                df.to_csv(file_path, mode="a", index=False, header=not os.path.exists(file_path), encoding="utf-8-sig")
            driver.delete_all_cookies()
            random_sleep()
    finally:
        driver.quit()


if __name__ == "__main__":
    main()
    
#input: Centanet_Res_Area_Code.xlsx
#output: file_path = f"{datetime.today().strftime('%Y-%m-%d')}_centanet_estates.csv"


Processing areas:   0%|          | 0/178 [00:02<?, ?it/s]


KeyboardInterrupt: 

In [19]:
import random
import string
import re
import time
import glob
import pandas as pd
from tqdm import tqdm
import chromedriver_autoinstaller
from selenium import webdriver
from selenium.webdriver.common.by import By

def generate_session_id(length=10):
    """Generate a random session ID consisting of lowercase letters and digits."""
    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))

def clean_subdistrict(subdistrict):
    """
    Clean the subdistrict string to generate a URL-friendly slug.
    Any sequence of non-alphanumeric characters is replaced by a hyphen.
    The result is lowercased and stripped of extra hyphens.
    """
    cleaned = re.sub(r'[^A-Za-z0-9]+', '-', subdistrict)
    return cleaned.strip('-').lower()

def initialize_driver():
    """
    Initializes ChromeDriver with custom options including headless mode.
    chromedriver_autoinstaller installs the correct version if needed.
    """
    chromedriver_autoinstaller.install()  # Automatically installs/updates chromedriver
    options = webdriver.ChromeOptions()
    options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                         "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.6943.127 Safari/537.36")
    options.add_argument("--ignore-certificate-errors")
    options.add_argument("--disable-extensions")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--headless")  # Run headlessly to speed up scraping
    return webdriver.Chrome(options=options)

def random_sleep(min_delay=1, max_delay=3):
    """Pause execution for a random duration between min_delay and max_delay seconds."""
    time.sleep(random.uniform(min_delay, max_delay))

def scroll_down(driver):
    """Scrolls down to trigger lazy-loaded content."""
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    random_sleep()

# Initialize the Selenium driver
driver = initialize_driver()

# Find all CSV files that follow the naming pattern *_centanet_estates.csv
csv_files = glob.glob("*_centanet_estates.csv")
date_pattern = re.compile(r"(\d{4}-\d{2}-\d{2})_centanet_estates\.csv")
dated_files = [
    (pd.to_datetime(match.group(1)), file)
    for file in csv_files if (match := date_pattern.search(file))
]

if not dated_files:
    print("No CSV files matching the specified pattern found.")
    driver.quit()
    exit()

# Select the CSV with the latest date in its filename
latest_date, latest_file = max(dated_files, key=lambda x: x[0])
print("Latest file found:", latest_file)

# Read the original CSV without modifying it directly.
df = pd.read_csv(latest_file)

# Create new columns for scraped data if they don't already exist.
for col in ["Scraped Estate Name", "Occupation Permit", "Scraped Blocks",
            "Scraped Units", "School Net Info", "Estate Detailed Address", "Developer"]:
    if col not in df.columns:
        df[col] = None

# Define the output file path (adding a _scraped suffix)
new_file_path = latest_file.replace("_centanet_estates.csv", "_centanet_estates_scraped.csv")

# Iterate over each row using tqdm for progress indication
for idx, row in tqdm(df.iterrows(), total=len(df), desc="Processing URLs"):
    url = row["Estate Link"]
    #print(f"Processing URL: {url}")
    try:
        driver.get(url)
        random_sleep(2, 3)  # Allow the page to load
        scroll_down(driver)  # Scroll to load lazy-loaded content if needed

        # Extract Estate Name
        try:
            estate_name_elem = driver.find_element(By.CLASS_NAME, "estate-detail-banner-title")
            estate_name = estate_name_elem.text.strip()
        except Exception:
            estate_name = None

        # Extract other details: Occupation Permit, Blocks, Units
        occupation, blocks_text, units_text = None, None, None
        try:
            table_items = driver.find_elements(By.CLASS_NAME, "table-item")
            for item in table_items:
                try:
                    title_elem = item.find_element(By.CLASS_NAME, "table-item-title")
                    text_elem = item.find_element(By.CLASS_NAME, "table-item-text")
                    text_content = text_elem.text.strip()
                    if "Date of Occupation Permit" in text_content:
                        occupation = title_elem.text.strip()
                    elif "No. of Blocks" in text_content:
                        blocks_text = title_elem.text.strip().split()[0]
                    elif "No. of Units" in text_content:
                        units_text = title_elem.text.strip()
                except Exception:
                    continue
        except Exception:
            pass

        # Extract School Net information
        school_net_val = None
        try:
            items_divs = driver.find_elements(By.CLASS_NAME, "item")
            for div in items_divs:
                try:
                    label_elem = div.find_element(By.CLASS_NAME, "label-item-left")
                    if "School Net" in label_elem.text.strip():
                        links_elems = div.find_elements(By.TAG_NAME, "a")
                        if len(links_elems) >= 2:
                            primary_net = links_elems[0].text.strip()
                            secondary_net = links_elems[1].text.strip()
                            school_net_val = f"{primary_net} | {secondary_net}"
                        break
                except Exception:
                    continue
        except Exception:
            pass

        # Extract Estate Detailed Address
        estate_address = None
        try:
            address_elem = driver.find_element(By.CLASS_NAME, "estate-detail-banner-position")
            estate_address = address_elem.text.strip()
        except Exception:
            pass

        # Extract Developer information
        developer_val = None
        try:
            developer_divs = driver.find_elements(By.CLASS_NAME, "item")
            for div in developer_divs:
                try:
                    label_elem = div.find_element(By.CLASS_NAME, "label-item-left")
                    if "Developer" in label_elem.text.strip():
                        developer_span_elem = div.find_element(By.CLASS_NAME, "label-item-right")
                        developer_val = developer_span_elem.text.strip()
                        break
                except Exception:
                    continue
        except Exception:
            pass

        # Save the scraped data into the DataFrame (for the current row only)
        df.at[idx, "Scraped Estate Name"] = estate_name
        df.at[idx, "Occupation Permit"] = occupation
        df.at[idx, "Scraped Blocks"] = blocks_text
        df.at[idx, "Scraped Units"] = units_text
        df.at[idx, "School Net Info"] = school_net_val
        df.at[idx, "Estate Detailed Address"] = estate_address
        df.at[idx, "Developer"] = developer_val

    except Exception as e:
        print(f"Error processing URL {url}: {e}")
    
    # Write the current DataFrame to CSV to prevent data loss after each iteration
    df.to_csv(new_file_path, index=False)
    
    # Pause briefly before processing the next URL
    random_sleep(2, 3)

print(f"Scraped data saved to: {new_file_path}")
driver.quit()


Latest file found: 2025-03-07_centanet_estates.csv


Processing URLs: 100%|██████████| 19685/19685 [50:07:05<00:00,  9.17s/it]   


Scraped data saved to: 2025-03-07_centanet_estates_scraped.csv
