In [None]:
import atexit
import hashlib
import json
import os
import pickle
import re
import time
import traceback
from dataclasses import asdict, dataclass

from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.common.by import By


def atexit_handler():
    input("\n\nPress Enter to exit...")


atexit.register(atexit_handler)


@dataclass
class Post:
    url: str
    created_at: int = None
    content: str = None
    comment_count: int = None
    comments: str = None
    reactions: str = None
    share_count: str = None


class Driver(webdriver.Chrome):
    def close_popup(self):
        time.sleep(2)
        btn_close = self.find_element(
            By.CSS_SELECTOR, "[role='dialog'] [aria-label='Close']")
        btn_close.click()

    def scroll_to_bottom(self):
        self.execute_script("window.scrollTo(0, document.body.scrollHeight);")


class Helper:
    @staticmethod
    def try_func(func, *args, max_try=1, delay=1, **kwargs):
        try_count = 0

        while True:
            try:
                return func(*args, **kwargs)
            except Exception:
                if try_count >= max_try:
                    raise

            try_count += 1
            time.sleep(delay)

    @staticmethod
    def cache(key=None):
        cache_dir = os.path.join(os.getcwd(), ".cache")
        os.makedirs(cache_dir, exist_ok=True)

        def decorator(func):
            def wrapper(*args, **kwargs):
                nonlocal key

                if callable(key):
                    _key = key(*args, **kwargs)
                else:
                    _key = func.__name__

                fp = os.path.join(cache_dir, _key)
                if os.path.exists(fp):
                    try:
                        with open(fp, "rb") as f:
                            cached = pickle.load(f)
                            return cached
                    except Exception:
                        pass

                result = func(*args, **kwargs)

                with open(fp, "wb") as f:
                    pickle.dump(result, f)

                return result

            return wrapper

        return decorator


class CrawlPostHelper:
    @staticmethod
    def _get_root_elm(driver: Driver):
        return driver.find_element(By.CSS_SELECTOR, "[role='article'][aria-posinset]")

    @staticmethod
    def get_creation_time(driver: Driver):
        match = re.search(r'{"creation_time":(\d+),', driver.page_source)
        creation_time = match.group(1)
        return creation_time

    @staticmethod
    def get_content(driver: Driver):
        root_elm = CrawlPostHelper._get_root_elm(driver)
        content = root_elm.find_element(
            By.CSS_SELECTOR, "[dir='auto']:has(div[id])").text
        return content

    @staticmethod
    def get_reactions(driver: Driver):
        reactions = []
        match = re.search(
            r'"reaction_count":{.*?}(?=,"reaction_display_config")', driver.page_source)
        s = match.group(0)
        data = json.loads("".join(["{", s, "}"]))

        edges = data["top_reactions"]["edges"]
        for edge in edges:
            reaction_data = {
                "name": edge["node"]["localized_name"],
                "count": edge["reaction_count"],
            }
            reactions.append(reaction_data)
        return reactions

    @staticmethod
    def get_share_count(driver: Driver):
        match = re.search(
            r'"share_count":\{"count":(\d+),', driver.page_source)
        share_count = match.group(1)
        return share_count

    @staticmethod
    def get_comment_count(driver: Driver):
        match = re.search(r'"total_comment_count":(\d+),', driver.page_source)
        comment_count = match.group(1)
        return comment_count

    @staticmethod
    def get_comments(driver: Driver):
        comments = []
        root_elm = CrawlPostHelper._get_root_elm(driver)
        li_elms = root_elm.find_elements(By.CSS_SELECTOR, "h3 ~ ul > li")
        for elm in li_elms:
            user_elm = elm.find_element(
                By.CSS_SELECTOR, ".x1y1aw1k.xn6708d.xwib8y2.x1ye3gou > span")

            try:
                content = elm.find_element(
                    By.CSS_SELECTOR, "[style='text-align: start;']").text
            except NoSuchElementException:
                content = ""

            comment = {
                "user": user_elm.text,
                "content": content,
            }
            comments.append(comment)
        return comments


class Worker:
    driver: Driver

    def __init__(self, post_limit: int = 100) -> None:
        self.post_limit = post_limit

    def run(self):
        with Driver() as self.driver:
            try:
                self.driver.get("https://www.facebook.com/xalo.zodiac")
                self.driver.close_popup()

                posts = self.get_posts()
                result = []

                success_count = fail_count = 0
                for i, post in enumerate(posts, start=1):
                    try:
                        self.crawl_post(post)
                        result.append(post)
                        success_count += 1
                    except Exception:
                        fail_count += 1
                    print(
                        f"\r- Post {i}/{len(posts)} - success {success_count} - fail {fail_count}", end="")

                print()
                return result
            except Exception:
                print(traceback.format_exc())
                input("Press Enter to close chrome...")
                raise

    @Helper.cache()
    def get_posts(self):
        posts = []
        while True:
            articles_elm = self.driver.find_elements(
                By.CSS_SELECTOR, "[role='article'][aria-posinset]")

            if len(articles_elm) >= self.post_limit:
                break

            self.driver.scroll_to_bottom()
            time.sleep(2)

        articles_elm = articles_elm[: self.post_limit]

        for elm in articles_elm:
            delta_date_elm = elm.find_element(
                By.CSS_SELECTOR, "a[aria-label][href]:has(span)")

            href = delta_date_elm.get_attribute("href")
            article = Post(url=href)
            posts.append(article)

        return posts

    @Helper.cache(key=lambda self, post: hashlib.md5(post.url.encode("ascii")).hexdigest())
    def crawl_post(self, post: Post):
        self.driver.get(post.url)
        post.created_at = CrawlPostHelper.get_creation_time(self.driver)
        post.content = CrawlPostHelper.get_content(self.driver)
        post.reactions = CrawlPostHelper.get_reactions(self.driver)
        post.share_count = CrawlPostHelper.get_share_count(self.driver)
        post.comment_count = CrawlPostHelper.get_comment_count(self.driver)
        post.comments = CrawlPostHelper.get_comments(self.driver)


worker = Worker(post_limit=100)
posts = worker.run()

data = []
for post in posts:
    data.append(asdict(post))

with open("data.json", "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=4)