# Web Scraping
This script is used to search for and download data on studios from otodom.pl.

In [3]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import StaleElementReferenceException, ElementClickInterceptedException, NoSuchElementException, TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import random
import csv

In [None]:
# Selenium configuration
options = webdriver.ChromeOptions()
options.add_argument("--start-maximized")  # Maximize the browser window
options.add_argument("--disable-blink-features=AutomationControlled")  # Bypass automation detection
options.add_experimental_option("excludeSwitches", ["enable-automation"])  # Prevent automation detection
options.add_experimental_option("useAutomationExtension", False)  # Disable automation extension


# Path to CSV file
CSV_PATH = "otodom_studio_offers.csv" # Define the path for the CSV file

# Base URL
BASE_URL = "https://www.otodom.pl/pl/wyniki/sprzedaz/kawalerka/cala-polska"  # Define the base URL for scraping

# Close cookies pop-up
def close_cookies_popup(driver):
    try:
        cookie_button = WebDriverWait(driver, 5).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "button#onetrust-accept-btn-handler"))  # Wait for the cookie accept button
        )
        cookie_button.click()  # Click the cookie accept button
        print("Cookies pop-up closed.")
        time.sleep(2)  # Allow time for the pop-up to close
    except TimeoutException:
        print("No cookies pop-up to close.")

# Expand building details section
def expand_building_details(driver):
    try:
        header = WebDriverWait(driver, 5).until(
            EC.element_to_be_clickable((By.XPATH, '//header[p[contains(text(), "Budynek i materiały")]]')) # Wait for the "Building and materials" section
        )

        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", header)  # Scroll to the header
        time.sleep(1)
        header.click()  # Click the header to expand
        time.sleep(2)
        
        details_section = driver.find_elements(By.XPATH, '//div[contains(@class, "css-1ftuvmu") and not(@hidden)]')  # Check if details are visible
        if details_section:
            print("Building and materials section expanded!")
        else:
            print("Selenium still can't see the section!")
    except Exception as e:
        print(f"Error: {e}")

# Get value after a given label
def get_value_after_label(label, driver):
    try:
        element = driver.find_element(By.XPATH, f'//p[text()="{label}"]/following-sibling::p')  # Find the value after the label
        return element.text.strip()
    except NoSuchElementException:
        return "Brak danych"  # Return "Brak danych" if element is not found

# Get value from details section, retrying if necessary
def get_value_from_details(label, driver):
    try:
        time.sleep(2) 

        elements = driver.find_elements(By.XPATH, f'//p[contains(text(), "{label}")]/following-sibling::p') # Locate the detail value
        if elements:
            return elements[0].text.strip()
        else:
            print(f"Selenium don't found '{label}'")
            return "Brak danych"
    except Exception as e:
        print(f"Error during downloading '{label}': {e}")
        return "Brak danych"


# Get value from button elements
def get_value_from_buttons(index, driver):
    try:
        buttons = driver.find_elements(By.CSS_SELECTOR, 'button.eezlw8k1.css-1nk40gi')  # Find all buttons
        if len(buttons) > index:
            return buttons[index].find_element(By.CSS_SELECTOR, 'div.css-1ftqasz').text.strip()  # Extract text
        return "Brak danych"
    except NoSuchElementException:
        return "Brak danych"

# Retrieve offer links
def get_offer_links(driver, max_links=1000):
    driver.get(BASE_URL)  # Open the base URL
    wait = WebDriverWait(driver, 15)  # Set an explicit wait time  

    close_cookies_popup(driver)  # Close cookies pop-up

    all_links = set()
    previous_links = set() 

    while len(all_links) < max_links:
        try:
            offers = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'a[href^="/pl/oferta/"]')))  # Find all offer links
            new_links = {offer.get_attribute('href') for offer in offers if offer.get_attribute('href')}  # Extract href attributes
            
            # Checking that the site has actually changed
            if new_links == previous_links:
                print("Page did not update offers. Retrying...")
                time.sleep(3)
                continue  # Repeat the download attempt

            previous_links = new_links  # Updating previous links
            all_links.update(new_links)
            print(f"Retrieved {len(new_links)} new offers, total: {len(all_links)}")

            # Checking the offer limit
            if len(all_links) >= max_links:
                print("Offer limit reached, stopping link collection.")
                break

            # Scroll to the ‘Next page’ button
            try:
                next_button = wait.until(EC.element_to_be_clickable((By.XPATH, '//li[@title="Go to next Page" and @aria-disabled="false"]')))  # Locate pagination button
                driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", next_button)  # Scroll to button
                time.sleep(random.uniform(1, 2))

                # Re-attempting to click
                attempts = 0
                while attempts < 3:
                    try:
                        next_button.click()
                        time.sleep(random.uniform(4, 6))  # Longer waiting time for loading
                        break
                    except ElementClickInterceptedException:
                        print("Click failed, retrying...")
                        driver.execute_script("arguments[0].click();", next_button)  # Force click via JS
                        time.sleep(1)
                        attempts += 1

            except (NoSuchElementException, TimeoutException):
                print("Pagination button not found. Ending link collection.")
                break
            
        except StaleElementReferenceException:
            print("Page updated, retrying...")
            continue

    return list(all_links)[:max_links]   

# Downloading offer details
def get_offer_details(offer_url, driver):
    driver.get(offer_url)  # Open the offer URL
    time.sleep(random.uniform(2, 4))  # Wait for the page to load

    data = {"URL": offer_url}  # Initialize data dictionary

    def get_text(selector, driver):
        try:
            return driver.find_element(By.CSS_SELECTOR, selector).text.strip()  # Extract text from given selector
        except:
            return "Brak danych"

    # Extracting offer details
    data["Cena"] = get_text('strong[data-cy="adPageHeaderPrice"]', driver)
    data["Cena za m²"] = get_text('div[aria-label="Cena za metr kwadratowy"]', driver)
    data["Adres"] = get_text('a.css-1jjm9oe', driver)
    data["Powierzchnia (m²)"] = get_value_from_buttons(0, driver)
    data["Liczba pokoi"] = get_value_from_buttons(1, driver)
    data["Ogrzewanie"] = get_value_after_label("Ogrzewanie", driver)
    data["Rynek"] = get_value_after_label("Rynek", driver)
    data["Typ ogłoszeniodawcy"] = get_value_after_label("Typ ogłoszeniodawcy", driver)

    expand_building_details(driver)  # Expand additional building details
    time.sleep(15)  # Wait for details to load
    data["Rok budowy"] = get_value_from_details("Rok budowy", driver)
    time.sleep(5)
    data["Rodzaj zabudowy"] = get_value_from_details("Rodzaj zabudowy", driver)
    time.sleep(5)
    data["Okna"] = get_value_from_details("Okna", driver)
    time.sleep(5)
    data["Materiał budynku"] = get_value_from_details("Materiał budynku", driver)

    print("\n Oferta pobrana:")
    for key, value in data.items():
        print(f"   {key}: {value}")

    return data

# Scrape Otodom webpage
def scrape_otodom(offer_links, driver):
    all_data = []  # Initialize an empty list to store the scraped data

    # Loop through each offer link, using enumerate to track the index and link
    for idx, link in enumerate(offer_links, 1):
        print(f"\n Downloading offer number {idx}: {link}") 
        details = get_offer_details(link, driver)  # Get the offer details using the provided link and driver
        all_data.append(details)  
        time.sleep(random.uniform(2, 5))  

    save_to_csv(all_data)  # Save the collected data to a CSV file

# Save data to CSV
def save_to_csv(data):
    keys = data[0].keys() if data else []  # Get the keys from the first dictionary in the data list (column names)

    # Open the CSV file at the given path (CSV_PATH) in write mode
    with open(CSV_PATH, mode="w", newline="", encoding="utf-8") as file:
        writer = csv.DictWriter(file, fieldnames=keys) 
        writer.writeheader()  # Write the header (column names) to the CSV file
        writer.writerows(data)  
    print(f"\n Data saved to file: {CSV_PATH}") 


In [None]:
# Initialize WebDriver instance
driver = webdriver.Chrome(service=service, options=options)  # Start the Chrome browser with specified options
offer_links = get_offer_links(driver)  # Retrieve offer links from the target website

In [None]:
scrape_otodom(offer_links, driver)  # Start scraping offers

In [None]:
driver.quit()  # Close the browser and end the WebDriver session