In [1]:
from datetime import datetime
import logging
from pathlib import Path
import random
import time
from typing import Any, Tuple

import pprint
import psycopg2
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.remote.webdriver import WebDriver

from carGPT.scraper.scraper.translations import TRANSLATIONS

In [2]:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
logs_location = Path("logs") / timestamp
logs_location.mkdir(parents=True, exist_ok=True)
log_filename = logs_location / "log.txt"

file_handler = logging.FileHandler(log_filename, encoding='utf-8')

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
logger.addHandler(file_handler)


In [3]:
class ChromeDriveConnection:
    def __enter__(self):
        self.driver = webdriver.Chrome()
        return self.driver

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.driver.quit()

In [4]:
class DatabaseConnection:
    def __enter__(self):
        self.connection = psycopg2.connect(
            dbname="ads_db",  # Default database
            user="adsuser",
            password="pass",
            host="localhost",  # or your server IP
            port="5432",  # Default PostgreSQL port
        )
        return self.connection

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.connection.close()

In [5]:
def transform_data(data):
    def year_transform(year: str):
        return int(year.split(".")[0]) if "." in year else int(year)

    def boolean_transform(value: str):
        return value.lower() == "da"

    def price_transform(price: str):
        price = price.replace(".", "")
        print(price)
        price = price.replace(",", ".")
        print(price)
        price = price.replace("€", "").strip()
        print(price)
        price = float(price)
        print(price)
        return price

    transformations = {
        "manufacture_year": lambda x: year_transform(x),
        "model_year": lambda x: year_transform(x),
        "mileage": lambda x: int(x.split()[0].replace(".", "")),
        "power": lambda x: int(x.split()[0]),
        "service_book": lambda x: boolean_transform(x),
        "fuel_consumption": lambda x: float(x.split()[0].replace(",", ".")),
        "average_CO2_emission": lambda x: float(
            x.split()[0].replace(",", ".")
        ),
        "owner": lambda x: int(x.split()[0]) if x.split()[0].isdigit() else x,
        "displacement": lambda x: int(x.replace(".", "").replace(" cm3", "")),
        "in_traffic_since": lambda x: year_transform(x),
        "first_registration_in_croatia": lambda x: year_transform(x),
        "garaged": lambda x: boolean_transform(x),
        "video_call_viewing": lambda x: boolean_transform(x),
        "gas": lambda x: boolean_transform(x),
        "price": lambda x: price_transform(x),
    }

    transformed_data = {}
    for key, value in data.items():
        if key in transformations:
            try:
                transformed_data[key] = transformations[key](value)
            except Exception as e:
                logger.error(f"Error transforming {key}: {e}")
                transformed_data[key] = value  # fallback to original value
        else:
            transformed_data[key] = value  # no transformation needed

    return transformed_data

In [7]:
class NjuskaloScraper:
    page_template = "https://www.njuskalo.hr/auti?page={page_num}"

    def __init__(self):
        self.page_num = 1

    def get_ads(self, driver: WebDriver):
        ads = (
            driver.find_element(
                By.CSS_SELECTOR, ".EntityList--ListItemRegularAd"
            )
            .find_element(By.CLASS_NAME, "EntityList-items")
            .find_elements(By.CLASS_NAME, "EntityList-item")
        )
        logger.info(f"Found {len(ads)} ads on the page")
        return ads

    @staticmethod
    def get_ad_columns(
        driver: WebDriver,
    ) -> Tuple[list[WebElement], list[WebElement]]:
        ad_info = driver.find_element(
            By.CLASS_NAME, "ClassifiedDetailBasicDetails-list"
        )
        ad_left_column = ad_info.find_elements(
            By.CLASS_NAME, "ClassifiedDetailBasicDetails-listTerm"
        )
        ad_right_column = ad_info.find_elements(
            By.CLASS_NAME, "ClassifiedDetailBasicDetails-listDefinition"
        )
        return ad_left_column, ad_right_column

    @staticmethod
    def get_ad_details(
        left_column: list[WebElement], right_column: list[WebElement]
    ) -> dict[str, str]:
        ad_details = {}
        for prop_name, prop_value in zip(left_column, right_column):
            prop_name = prop_name.find_element(
                By.CLASS_NAME, "ClassifiedDetailBasicDetails-textWrapContainer"
            ).text
            prop_value = prop_value.find_element(
                By.CLASS_NAME, "ClassifiedDetailBasicDetails-textWrapContainer"
            ).text
            try:
                ad_details[TRANSLATIONS[prop_name]] = prop_value
            except KeyError:
                logger.error(f"No key for: {prop_name} - value: {prop_value}")

        return ad_details

    def extract_article_info(self, driver: WebDriver) -> dict[str, Any]:
        left_column, right_column = self.get_ad_columns(driver)
        ad_details = self.get_ad_details(left_column, right_column)

        published_elem = driver.find_element(
            By.CLASS_NAME, "ClassifiedDetailSystemDetails-listData"
        )
        date_time_format = "%d.%m.%Y. u %H:%M"
        date_time_obj = datetime.strptime(
            published_elem.text, date_time_format
        )
        ad_details["date_created"] = date_time_obj.isoformat()

        price_elem = driver.find_element(
            By.CLASS_NAME, "ClassifiedDetailSummary-priceDomestic"
        )
        price = price_elem.text.strip()
        ad_details["price"] = price

        ad_details = transform_data(ad_details)
        logger.info(f"Constructed article info: {pprint.pformat(ad_details)}")
        return ad_details

    @staticmethod
    def get_ad_links(page_ads: list[WebElement]) -> list[str]:
        ad_links = []
        for ad in page_ads:
            try:
                ad_class = ad.get_attribute("class")
                if "EntityList-bannerContainer" in ad_class:
                    logger.info("Skipping something that is not an add")
                    continue
                article = ad.find_element(By.TAG_NAME, "article")
                article_title = article.find_element(
                    By.CLASS_NAME, "entity-title"
                )
                article_link = article_title.find_element(By.TAG_NAME, "a")
                article_link_url = article_link.get_attribute("href")
                ad_links.append(article_link_url)
            except Exception as e:
                logger.error(f"Error happened: {e}")
        return ad_links

    def save_article(self, article_info: dict[str, Any]) -> None:
        with DatabaseConnection() as db_conn:
            cursor = db_conn.cursor()
            columns = ", ".join(article_info.keys())
            values = ", ".join([f"%({key})s" for key in article_info.keys()])
            insert_query = f"""
            INSERT INTO ads ({columns})
            VALUES ({values});
            """
            cursor.execute(insert_query, article_info)
            db_conn.commit()

    def handle_link(self, link: str, driver: WebDriver) -> None:
        driver.get(link)
        logger.info(f"Went to page {link}")
        article_info = self.extract_article_info(driver)
        self.save_article(article_info)
        sleep_time = random.randint(1, 10)
        logger.info(f"Sleeping for {sleep_time}s")
        time.sleep(sleep_time)

    def handle_page(self, driver: WebDriver) -> None:
        page_ads = self.get_ads(driver)
        ad_links = self.get_ad_links(page_ads)
        for link in ad_links:
            self.handle_link(link, driver)

    def start(self, pages: int) -> None:
        with ChromeDriveConnection() as driver:
            while True:
                driver.get(self.page_template.format(page_num=self.page_num))
                self.handle_page(driver)
                if self.page_num >= pages:
                    break
                self.page_num += 1

In [None]:
njws = NjuskaloScraper()
njws.start(pages=5)