In [1]:
from __future__ import annotations
from typing import TYPE_CHECKING

import re
import time
import json

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options

from util import noValueBuild, getSingleValueInt

if TYPE_CHECKING:
    from selenium.webdriver.remote.webelement import WebElement
    from selenium.webdriver.remote.webdriver import WebDriver
    from interface import Review, CrawlOptions, NamedDataset

In [2]:
# ---------------------------------------------------------------------------- #
#                                 Utils Driver                                 #
# ---------------------------------------------------------------------------- #

def driverUp () -> WebDriver:
    # set browser locale
    browser_locale = "en_EN"

    options = Options()
    prefs = {
        "profile.default_content_setting_values.geolocation": 2
    }
    options.add_argument("--incognito")
    options.add_argument("--lang={}".format(browser_locale))
    options.add_experimental_option("prefs", prefs)

    # manually using ttw instead of WebDriverWait (must be note it also depends on your internet)
    TIME_TO_WAIT = 1.45

    # config for scrolling, if fail to fulfilled by the logic with maximum attempt
    TIMEOUT_ATTEMPT = 5

    driver = webdriver.Chrome(options)

    # make life easier ;)
    return driver

In [3]:
defaultOptions: CrawlOptions = {
    "CRAWL_NAME"
    "TIME_TO_WAIT": 1.45
}

def crawl (driver: WebDriver, url: str, show_result: bool = False, options: CrawlOptions = defaultOptions) -> list[Review]:
    driver.get(url);
    driver.implicitly_wait(1.1);

    # execution time for the whole process
    start = time.time()

    dom_review_pane = driver.find_elements(by=By.CSS_SELECTOR, value="[aria-label='Refine reviews']")

    dataReviews: list[Review] = []

    # container scrollable
    dom_review_container: WebElement | None = driver.execute_script("""
        return document.querySelector('[role="main"]')?.children[1]
    """)

    if dom_review_container:
        print('[CRAWL.Initialize: {}]: Container initialized. {}'.format(options.get("CRAWL_NAME", "-"), dom_review_container.get_property(name="scrollHeight")))
        fst = True
        while fst:
            time.sleep(options.get("TIME_TO_WAIT", 1))

            sT = driver.execute_script("""
                return document.querySelector('[role="main"]')?.children[1].scrollTop
            """)
            sH = driver.execute_script("""
                return document.querySelector('[role="main"]')?.children[1].scrollHeight
            """)
            oH = driver.execute_script("""
                return document.querySelector('[role="main"]')?.children[1].offsetHeight
            """)
            
            if sT < (sH - oH) - 1:
                driver.execute_script("""
                    return arguments[0].scrollTop = arguments[1]
                """, dom_review_container, sH)
                print("[CRAWL.Scroll]: Please wait... [{},{}]".format(dom_review_container.get_property(name="scrollTop"), sH))
            else: fst = False

    print("[CRAWL.Scroll]: Done.")


    dom_review_pane_by_rr = driver.execute_script("""
        return document.querySelector('[aria-label="Refine reviews"]')
    """)
    dom_review_pane_by_cta_sort = driver.execute_script("""
        return document.querySelector('[aria-label="Sort reviews"]')?.parentElement?.parentElement
    """)

    dom_review_pane_sibling = None

    if dom_review_pane_by_rr or dom_review_pane_by_cta_sort:
        dom_review_pane_sibling = driver.execute_script("""
            return arguments[0].nextElementSibling
        """, dom_review_pane_by_rr if dom_review_pane_by_rr else dom_review_pane_by_cta_sort)

    if dom_review_container and dom_review_pane_sibling:
        dom_reviews = driver.execute_script("""
            return arguments[0].querySelectorAll(":scope > [data-review-id]")
        """, dom_review_pane_sibling)

        if len(dom_reviews) <= 0: return []

        if len(dom_reviews) >= 0:
            for review in dom_reviews:
                authorName, authorContrib, authorReview, ariaStars, authorTimestamp, reviewImages = None, None, None, None, None, []
                
                src = driver.execute_script("""
                    return arguments[0].querySelector("[data-review-id]")?.firstElementChild
                """, review)
                if src:
                    # get author's name
                    authorName = driver.execute_script("""
                        return arguments[0].children[1]?.querySelector("[data-review-id]")?.children[0]?.textContent
                    """, src)

                    # get author's contrib
                    authorContrib = driver.execute_script("""
                        return arguments[0].children[1]?.querySelector("[data-review-id]")?.children[1]?.textContent
                    """, src)

                    # get stars aria-label
                    ariaStars = driver.execute_script("""
                        return arguments[0].children[3]?.firstElementChild?.children[0]?.getAttribute("aria-label")
                    """, src)

                    # get humanized_timestamp value
                    authorTimestamp = driver.execute_script("""
                        return arguments[0].children[3]?.firstElementChild?.children[1]?.textContent
                    """, src)

                    # get review
                    com_reviews = driver.execute_script("""
                        return arguments[0].children[3]?.children[1]?.querySelector("[id]")?.children
                    """, src)
                    if com_reviews and len(com_reviews) > 0:
                        if len(com_reviews) > 1:
                            driver.execute_script("""
                                return arguments[0].firstElementChild?.click()
                            """, com_reviews[1])
                        authorReview = driver.execute_script("""
                            return arguments[0].textContent
                        """, com_reviews[0])

                    # get review images
                    domImages: list[WebElement] | None = driver.execute_script("""
                        return arguments[0].children[3].querySelectorAll("[data-photo-index]")
                    """, src)
                    if domImages and len(domImages) > 0:
                        for image in domImages:
                            style = image.get_attribute("style")
                            urls = re.findall(r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))", style)
                            reviewImages.append([x[0] for x in urls][0])

                dataReviews.append({
                    "name": noValueBuild(authorName, "author"),
                    "review": noValueBuild(authorReview, "review"),
                    "contrib": noValueBuild(authorContrib, "contrib"),
                    "humanized_timestamp": noValueBuild(authorTimestamp, "humanized_timestamp"),
                    "stars": {
                        "label": noValueBuild(ariaStars, "stars"),
                        "value": getSingleValueInt(ariaStars, 0),
                    },
                    "minires_images": noValueBuild(reviewImages, "minires_images")
                })

    print("[CRAWL.Finished] Process finished. Thank you.")

    if show_result:
        print(json.dumps(dataReviews, indent=2, sort_keys=False))

    # evaluate the time taken
    print("[CRAWL.Time]: Execution time: {:.2f} seconds".format(time.time() - start))
    print("Total reviews: {}".format(len(dataReviews)))
    print("=====================================================================================================")

    driver.quit()

    return dataReviews

In [4]:
from endpoint import urls

In [5]:
big_datas: list[NamedDataset] = []

for url in urls():
    driver = driverUp()
    result = crawl(driver=driver, url=url.get("url", ""), options={
        "CRAWL_NAME": url.get("filename", ""),
        "TIME_TO_WAIT": 2.35
    })
    big_datas.append({
        "label": url.get("filename", ""),
        "dataset": result
    })
    time.sleep(2)

[CRAWL.Initialize: dataset_jembatan_penyebrangan_pulau_kumala]: Container initialized. 5055
[CRAWL.Scroll]: Please wait... [4380,5055]
[CRAWL.Scroll]: Please wait... [9005.599609375,9681]
[CRAWL.Scroll]: Please wait... [12596.7998046875,13272]
[CRAWL.Scroll]: Please wait... [15999.2001953125,16674]
[CRAWL.Scroll]: Please wait... [19972,20647]
[CRAWL.Scroll]: Please wait... [23564.80078125,24240]
[CRAWL.Scroll]: Please wait... [26820,27495]
[CRAWL.Scroll]: Please wait... [29780,30455]
[CRAWL.Scroll]: Please wait... [31768,32443]
[CRAWL.Scroll]: Please wait... [33756,34431]
[CRAWL.Scroll]: Please wait... [36060.80078125,36736]
[CRAWL.Scroll]: Please wait... [38048.80078125,38724]
[CRAWL.Scroll]: Please wait... [40036.80078125,40712]
[CRAWL.Scroll]: Please wait... [42341.6015625,43016]
[CRAWL.Scroll]: Please wait... [44329.6015625,45004]
[CRAWL.Scroll]: Please wait... [50511.19921875,51186]
[CRAWL.Scroll]: Please wait... [56502.3984375,57178]
[CRAWL.Scroll]: Please wait... [62810.3984375,

In [6]:
import pandas as pd

In [8]:
# write dataset(s) to csv through iteration
for dataset in big_datas:
    df = pd.json_normalize(dataset.get("dataset", ))
    df.to_csv(path_or_buf="./datasets/{}.csv".format(dataset.get("label", "_")), encoding="utf-8", index=False, header=True)
    time.sleep(2)

In [15]:
from glob import glob
import os

In [17]:
path_to_datasets = "{}\\datasets".format(os.getcwd())
all_dataset = (pd.read_csv(f) for f in glob(os.path.join(path_to_datasets, "*.csv")))
dfs = pd.concat(all_dataset, ignore_index=True)
dfs

d:\suarasiy\skripsi\datasets
