In [1]:
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, InvalidSessionIdException, WebDriverException, StaleElementReferenceException, TimeoutException 
from rapidfuzz import process
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import sessionmaker, declarative_base
from contextlib import contextmanager
import time, re, os
from dotenv import load_dotenv

In [2]:
# This loads the variables db_user, db_pass, db_name from .env
load_dotenv()  
# Import the database name
db_user = 'postgres'
db_pass = os.getenv("DB_PASS")
db_name = os.getenv("DB_NAME")
# Path to your GeckoDriver
gecko_driver_path = '/Users/sternsemasuka/Desktop/ML/Project/Car-price-prediction-regression/mozilladriver/geckodriver'

In [3]:
options = Options()
options.headless = True
# initialize the firefox driver
driver = webdriver.Firefox(executable_path=gecko_driver_path, options=options)

In [4]:
wait = WebDriverWait(driver, 5)
# get the website URL
driver.get("https://www.autotrader.ca/")

In [5]:
# will wait for an element to be interactable.
driver.implicitly_wait(5)


# postgresql engine
engine = create_engine(f'postgresql://{db_user}:{db_pass}@localhost/{db_name}')

# Create a session for the db connection
Session = sessionmaker(bind=engine)


In [6]:
@contextmanager
def session_scope():
    session = Session()
    try:
        yield session
        session.commit()
    except:
        session.rollback()
        raise
    finally:
        session.close()

In [7]:
# Initialize Dictionary that will store the make as key and model as value
car_data = {}

# Locate the 'select' element that host all the cars brand names
makes_drop_down_element = wait.until(EC.presence_of_element_located((By.ID, "rfMakes")))

# Find the 'optgroup' with label "All Makes"
all_makes_optgroup = makes_drop_down_element.find_element(By.XPATH, "./optgroup[@label='All Makes']")

# Find all 'option' elements under the 'optgroup'
all_makes_options = all_makes_optgroup.find_elements(By.TAG_NAME, "option")

# Loop through each 'option' and click it
for option in all_makes_options:
    car_make = option.text
    option.click()
    time.sleep(2)
    
    # Locate the 'select' element for models
    model_drop_down_element = wait.until(EC.presence_of_element_located((By.ID, "rfModel")))
    
    # Find all 'option' elements for models
    model_options = model_drop_down_element.find_elements(By.TAG_NAME, "option")
    
    # Initialize list for models
    all_models_options = []
    
    # Loop through each 'option' for models
    for model_option in model_options[1:]:  # Skip the first 'option'
        all_models_options.append(model_option.text)
    
    # Store in dictionary
    car_data[car_make] = all_models_options
# adding non specified car make and model as "Other"
car_data['Other'] = 'Other'

In [8]:
# Initialize the dictionary to None for all the specification and to 0 for the highlights and features
car_specs = {
    'make': None,
    'model': None,
    'year of manufacturing': None,
    'kilometres': None,
    'kilometres condition': None,
    'status': None,
    'trim': None,
    'body type': None,
    'cylinder': None,
    'transmission': None,
    'drivetrain': None,
    'exterior colour': None,
    'interior colour': None,
    'passengers': None,
    'doors': None,
    'fuel type': None,
    'fuel consumption': None,
    'price': None
}

In [9]:
# select "Any Make" on the dropdown menu to get all the make
selector = Select(makes_drop_down_element)
selector.select_by_visible_text('Any Make')


In [10]:

# Wait for the element to be present
postal_code_input_element_present = wait.until(EC.presence_of_element_located((By.ID, "locationAddressV2")))
# Wait for the element to be clickable
postal_code_input_element = wait.until(EC.element_to_be_clickable((By.ID, "locationAddressV2")))

postal_code_input_element.send_keys("M5V 3L9")
# Wait for the presence of the element
wait.until(EC.presence_of_element_located((By.ID, "SearchButton")))
# Now wait for the element to be clickable
show_me_cars_btn = wait.until(EC.element_to_be_clickable((By.ID, "SearchButton")))
show_me_cars_btn.click()

In [11]:
def find_closest_words(input_word, list_of_words):
    # Use RapidFuzz to find the closest matches for the make
    closest_matches = process.extractOne(input_word, list_of_words)
    # Extract the closest make
    closest_match = closest_matches[0]
    
    return closest_match

In [12]:
### Transformation of the dictionary
def modify_dict(car_dict):
    modified_dict = {}
    for key, value in car_dict.items():
        # Handle None values
        if value is None and key is not None:
            modified_dict[key] = None
            continue

        # Modify specific keys
        if key == "year of manufacturing" or key == "cylinder" or key == "door" or key == "passengers":
            modified_dict[key] = int(value)
        elif key == "price":
            modified_dict[key] = int(value.replace(",", ""))
        elif key == "kilometres":
            modified_dict[key] = int(value.replace(",", "").replace(" km", ""))
        elif key == "trim":
            modified_dict[key] = re.split('[!\"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~]', value)[0].strip()
        elif key == "fuel consumption" and not isinstance(car_specs['fuel consumption'], float):
            modified_dict[key] = float(re.search(r"\d+\.\d+", value).group())
        elif key == "kilometres condition":
            modified_dict[key] = value.replace(" KM", "").lower()
        else:
            modified_dict[key] = value
    return modified_dict

In [13]:
def set_page_filter():
    # # Wait for the presence of the element
    # wait.until(EC.presence_of_element_located((By.ID, "pageSize")))
    # # Now wait for the element to be clickable
    # display_dropdown_element = wait.until(EC.element_to_be_clickable((By.ID, "pageSize")))


    # # Initialize Select class
    # select = Select(display_dropdown_element)

    # # Select the '100' option by visible text
    # select.select_by_visible_text("100")
    
    # click on the postal code box on the left side and change the radius to national to get all the cars listing
    wait.until(EC.presence_of_element_located((By.ID, "faceted-Location")))
    postal_code_element = wait.until(EC.element_to_be_clickable((By.ID, "faceted-Location")))

    postal_code_element.click()

    # Locate the dropdown element
    wait.until(EC.presence_of_element_located((By.ID, "proximity")))
    dropdown_element = wait.until(EC.element_to_be_clickable((By.ID, "proximity")))


    # Initialize Select class
    select = Select(dropdown_element)

    # Select the 'National' option
    select.select_by_visible_text("National")

    # save by clicking on the apply location button
    wait.until(EC.presence_of_element_located((By.ID, "applyLocation")))
    apply_location_btn = wait.until(EC.element_to_be_clickable((By.ID, "applyLocation")))

    apply_location_btn.click()

    # Locate the checkbox element
    damaged_checkbox_element = driver.find_element(By.ID, "rfDamaged")

    # Use JavaScript to click the checkbox
    driver.execute_script("arguments[0].click();", damaged_checkbox_element)

    try:
        # Wait until the apply button becomes clickable
        wait.until(EC.presence_of_element_located((By.ID, "applyCondition")))
        apply_condition_btn = wait.until(EC.element_to_be_clickable((By.ID, "applyCondition")))

        # Click the button
        apply_condition_btn.click()
    except TimeoutException:
        driver.execute_script("document.getElementById('applyCondition').click();")

    # click on "Other Options" menu
    wait.until(EC.presence_of_element_located((By.ID, 'faceted-parent-Other')))
    other_option_menu = wait.until(EC.element_to_be_clickable((By.ID, 'faceted-parent-Other')))

    driver.execute_script("arguments[0].click();", other_option_menu)

    # Locate the "With photos" checkbox element
    with_photos_checkbox = driver.find_element(By.ID, "rfPhoto")

    # Use JavaScript to uncheck the "with photo"checkbox
    if with_photos_checkbox.is_selected():
        driver.execute_script("arguments[0].click();", with_photos_checkbox)

    # Get the apply button element
    apply_others_btn = driver.find_element(By.ID, "applyOthers")
    driver.execute_script("arguments[0].click();", apply_others_btn)
    


In [14]:
def get_and_store_car_data():

    try:
        # Locate the car head info element using CSS selector
        wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'hero-title')))
        car_header_info = wait.until(EC.element_to_be_clickable((By.CLASS_NAME, 'hero-title'))).text

        
        # Split the string by spaces and tabs
        split_string = car_header_info.split()
        # Assign the first, second, and third words to respective variables
        year_of_manufacturing = split_string[0]
        make_estimate = split_string[1]
        model_estimate = split_string[2]
        car_specs["year of manufacturing"] = year_of_manufacturing
    except (TimeoutException, UnboundLocalError, WebDriverException):
        # ignore if element is not found
        pass

    # Added a word matcher using rapidfuzz to match make and model name that is extracted from the title and the dictionary collection of make and model
    def find_closest_words(input_word, list_of_words):
        # Use RapidFuzz to find the closest matches for the make
        closest_matches = process.extractOne(input_word, list_of_words)
        
        # Extract the closest make
        closest_match = closest_matches[0]
        
        return closest_match

    try: 
        make = find_closest_words(make_estimate, car_data.keys())
        all_models = car_data[make]

        model = find_closest_words(model_estimate, all_models)
        price_elements = driver.find_elements(By.XPATH,'//p[@class="hero-price"]')

        price = price_elements[0].text


        car_specs["make"] = make
        car_specs["model"] = model
        car_specs["price"] = price
        
        # Expand to see all the specs
        wait.until(EC.presence_of_element_located((By.ID, "btn-vdp-specs-toggle")))
        all_spec_toggle = wait.until(EC.element_to_be_clickable((By.ID, "btn-vdp-specs-toggle")))

        driver.execute_script("arguments[0].click();", all_spec_toggle)
        # wait for the element to load
        time.sleep(1)

        # Find the number of list items in the unordered list in the Specifications block item
        car_specs_items = driver.find_elements(By.CSS_SELECTOR, "#sl-card-body li")
        
        # Loop through each list item
        for i in range(len(car_specs_items)):
            try:
                wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, f"#spec-key-{i}")))
                key_element = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, f"#spec-key-{i}")))

                wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, f"#spec-value-{i}")))
                value_element = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, f"#spec-value-{i}")))

                # Get the text from the elements
                key = key_element.text
                # only add the text that are present in the dictionary key
                if key in car_specs.keys():
                    value = value_element.text      
                    # Store the key-value pair in the dictionary
                    car_specs[key] = value
                else:
                    if key == "city fuel economy":
                        city_fuel = float(re.search(r"\d+\.\d+",value_element.text).group())
                    elif key == "hwy fuel economy":
                        hwy_fuel = float(re.search(r"\d+\.\d+",value_element.text).group())
                    else:
                        #if the spec is not found in the car_specs dictionary key and not "City Fuel Consumption" or "Hwy Fuel Consumption", skip it
                        pass
            except (WebDriverException, AttributeError):
                pass
        
    except (TimeoutException,WebDriverException, UnboundLocalError, InvalidSessionIdException):
        pass

    


    # Find the combined Fuel Consumption car Kilometres condition and add it to the dictionary
    try:
        # Find the combined Fuel Consumption
        cbn_fuel_eco = driver.find_element(By.ID,"vdp-fv-combined").text
        car_specs['fuel consumption'] = cbn_fuel_eco + 'L/100km'
    except (NoSuchElementException, WebDriverException):
        # If not found, get the average of city Fuel Consumption and highway Fuel Consumption
        try:
            if isinstance(city_fuel, float) and isinstance(hwy_fuel, float):
                # Find the average of city and highway
                average_fuel = (city_fuel + hwy_fuel) / 2
                car_specs['fuel consumption'] = average_fuel
            elif isinstance(city_fuel, float) and not isinstance(hwy_fuel, float):
                # set the Fuel Consumption to the city fuel
                car_specs['fuel consumption'] = city_fuel
            elif not isinstance(city_fuel, float) and isinstance(hwy_fuel, float):
                # set the Fuel Consumption to the hwy fuel
                car_specs['fuel consumption'] = hwy_fuel
            else:
                car_specs['fuel consumption'] = None
        except NameError:
            car_specs['fuel consumption'] = None
    # Find the combined Fuel Consumption car Kilometres condition and add it to the dictionary
    try:
        # Find the element with the class="ca-indicator-active" and get its text and add it to the dictionary
        Kilometres_condition = wait.until(EC.presence_of_element_located(((By.CSS_SELECTOR, "p.ca-indicator-active")))).text
        car_specs['kilometres condition'] = Kilometres_condition
    except (NoSuchElementException, WebDriverException):
        # If not found, do nothing
        pass

    def modify_dict(car_dict):
        modified_dict = {}
        for key, value in car_dict.items():
            # Handle None values
            if value is None and key is not None:
                modified_dict[key] = None
                continue

            # Modify specific keys
            if key == "year of manufacturing" or key == "cylinder" or key == "door" or key == "passengers":
                modified_dict[key] = int(value)
            elif key == "price":
                modified_dict[key] = int(value.replace(",", ""))
            elif key == "kilometres":
                modified_dict[key] = int(value.replace(",", "").replace(" km", ""))
            elif key == "trim":
                modified_dict[key] = re.split('[!\"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~]', value)[0].strip()
            elif key == "fuel consumption" and not isinstance(car_specs['fuel consumption'], float):
                modified_dict[key] = float(re.search(r"\d+\.\d+", value).group())
            elif key == "kilometres condition":
                modified_dict[key] = value.replace(" KM", "").lower()
            else:
                modified_dict[key] = value
        return modified_dict
    car_specs_mod = modify_dict(car_specs)
    car_specs_mod_no_space = {key.replace(" ", "_"): value for key, value in car_specs_mod.items()}



    # Define the base class for declarative models
    Base = declarative_base()

    # Define the Car model
    class Car(Base):
        __tablename__ = 'cars'
        
        id = Column(Integer, primary_key=True, autoincrement=True)
        make = Column(String)
        model = Column(String)
        year_of_manufacturing = Column(Integer)
        kilometres = Column(Integer)
        kilometres_condition = Column(String)
        status = Column(String)
        trim = Column(String)
        body_type = Column(String)
        cylinder = Column(Integer)
        transmission = Column(String)
        drivetrain = Column(String)
        exterior_colour = Column(String)
        interior_colour = Column(String)
        passengers = Column(Integer)
        doors = Column(String)
        fuel_type = Column(String)
        fuel_consumption = Column(Float)
        price = Column(Integer)

    # Create the table
    Base.metadata.create_all(engine)
    
    # Define unique attributes for checking
    unique_attrs = {
        'year_of_manufacturing': car_specs_mod_no_space['year_of_manufacturing'],
        'kilometres': car_specs_mod_no_space['kilometres'],
	    'price': car_specs_mod_no_space['price']
    }

    with session_scope() as session:
        # Check if the record already exists
        existing_record = session.query(Car).filter_by(**unique_attrs).first()

        # Add the record if it doesn't exist
        if existing_record is None:
            new_car = Car(**car_specs_mod_no_space)
            session.add(new_car)
            session.commit()
        else:
            print("Car already in the database, skipping...")
        
        try:
            # Go back to the main car listing with driver
            driver.back()
        except (TimeoutException,WebDriverException):
            pass


In [15]:
def car_listing_loop():
    # Reset the page filter
    set_page_filter()

    index = 1  # Start with 1 as the data-list-numerical-position is likely 1-based
    while True:
        try:
            # Find the car element by its "data-list-numerical-position"
            wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, f"span[data-list-numerical-position='{index}']")))
            car = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, f"span[data-list-numerical-position='{index}']")))

            # Use JavaScript to click the car element
            driver.execute_script("arguments[0].click();", car)

            # scrape the data
            get_and_store_car_data()

            # Navigate back to the original page containing the list of cars
            driver.back()

            # Increment index for the next iteration
            index += 1
        except Exception as e:
            print(f"An exception occurred: {e}")
            break  # Exit loop if any other exception occurs
    # Refresh the current page
    driver.refresh()


In [16]:
def refresh_browser():
    """Refreshes the browser to free up memory."""
    driver.refresh()

def find_and_click_page_element(current_page):
    retry_count = 0
    max_retries = 5
    while retry_count < max_retries:
        try:
            # Update the CSS selector if necessary
            css_selector = f"li[data-page='{current_page}'] a.page-link-{current_page} p.page-link-text"
            page_element = wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, css_selector))
            )
            driver.execute_script("arguments[0].click();", page_element)
            return True
        except (NoSuchElementException, StaleElementReferenceException, TimeoutException):
            print(f"Retrying page {current_page}...")
            refresh_browser()
            retry_count += 1
            time.sleep(3)
        except Exception as e:
            print(f"Exception occurred: {e}")
            refresh_browser()
            break
    return False

# Main scraping logic
current_page = 1
max_pages = 6667  # Set a limit to the number of pages to scrape

while current_page <= max_pages:
    if not find_and_click_page_element(current_page):
        break

    car_no_in_listing = 1
    while True:
        try:
            car = driver.find_element(By.CSS_SELECTOR, f"span[data-list-numerical-position='{car_no_in_listing}']")
            driver.execute_script("arguments[0].click();", car)
            # Call to scrape and store data
            get_and_store_car_data()  
            print(f"We are on page: {current_page} car number {car_no_in_listing}")
            car_no_in_listing += 1
        except NoSuchElementException:
            print(f"Moving to the next page...")
            break
        except Exception as e:
            print(f"Exception occurred - car element: {e}")
            refresh_browser()
            break

    current_page += 1
    # Refresh after every 10 pages to manage memory
    if current_page % 10 == 0:  
        refresh_browser()

# Clean up
driver.quit()

We are on page: 1 car number 1
Car already in the database, skipping...
We are on page: 1 car number 2
Car already in the database, skipping...
We are on page: 1 car number 3
Car already in the database, skipping...
We are on page: 1 car number 4
We are on page: 1 car number 5
Car already in the database, skipping...
We are on page: 1 car number 6
Car already in the database, skipping...
We are on page: 1 car number 7
Car already in the database, skipping...
We are on page: 1 car number 8
Car already in the database, skipping...
We are on page: 1 car number 9
Car already in the database, skipping...
We are on page: 1 car number 10
Car already in the database, skipping...
We are on page: 1 car number 11
Car already in the database, skipping...
We are on page: 1 car number 12
Car already in the database, skipping...
We are on page: 1 car number 13
Car already in the database, skipping...
We are on page: 1 car number 14
Car already in the database, skipping...
We are on page: 1 car number