In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import re
import time
import numpy as np
from bs4.element import Tag
from bs4 import BeautifulSoup
from news_summarizer.webdriver import WebDriverFactory, ShutilBrowserLocator
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from abc import abstractmethod
from pydantic import HttpUrl
from datetime import datetime
from typing import Optional

### Fake the Mongo database

In [3]:
class FakeMongoCollection:
    def __init__(self):
        self.data = {}

    def insert_one(self, document):
        if "_id" not in document:
            raise ValueError("Document must contain an '_id' field")
        if document["_id"] in self.data:
            raise ValueError("Duplicate _id found")
        self.data[document["_id"]] = document
        print(f"Inserted document: {document}")

    def insert_many(self, documents):
        if not documents:
            raise ValueError("Documents list cannot be empty")
        for document in documents:
            if "_id" not in document:
                raise ValueError("Each document must contain an '_id' field")
            if document["_id"] in self.data:
                raise ValueError("Duplicate _id found")
            self.data[document["_id"]] = document
        print(f"Inserted documents: {documents}")

    def find_one(self, query):
        for document in self.data.values():
            if all(
                self._match_query(document, key, value) for key, value in query.items()
            ):
                print(f"Found document: {document}")
                return document
        print("No document found")
        return None

    def find(self, query):
        results = [
            document
            for document in self.data.values()
            if all(
                self._match_query(document, key, value) for key, value in query.items()
            )
        ]
        print(f"Found documents: {results}")
        return results

    def _match_query(self, document, key, value):
        if isinstance(value, dict) and "$regex" in value:
            return re.search(value["$regex"], document.get(key, "")) is not None
        return document.get(key) == value


class FakeDatabase:
    def __init__(self):
        self.collections = {}

    def __getitem__(self, collection_name: str) -> FakeMongoCollection:
        # Automatically create a collection if it doesn't exist
        if collection_name not in self.collections:
            self.collections[collection_name] = FakeMongoCollection()
        return self.collections[collection_name]

    def __setitem__(self, collection_name: str, collection: FakeMongoCollection):
        self.collections[collection_name] = collection


class FakeMongoClient:
    def __init__(self):
        self.databases = {}

    def __getitem__(self, db_name: str) -> FakeDatabase:
        # Automatically create a database if it doesn't exist
        if db_name not in self.databases:
            self.databases[db_name] = FakeDatabase()
        return self.databases[db_name]

    def __setitem__(self, db_name: str, database: FakeDatabase):
        self.databases[db_name] = database

In [4]:
import uuid
from abc import ABC
from pydantic import UUID4, BaseModel, Field
from typing import Generic, Type, TypeVar, Dict, List
from news_summarizer.database.mongo import fake_connection

_database = fake_connection["null_database"]

T = TypeVar("T", bound="NoSQLBaseLink")

class NoSQLBaseLink(BaseModel, Generic[T], ABC):
    id: UUID4 = Field(default_factory=uuid.uuid4)

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, self.__class__):
            return False
        return self.id == value.id
    
    def __hash__(self) -> int:
        return hash(self.id)
    
    @classmethod
    def from_mongo(cls: Type[T], data:Dict) -> T:
        if not data:
            raise ValueError("Data is empty.")
        
        id = data.pop("_id")

        return cls(**dict(data, id=id))
    
    def to_mongo(self: T, **kwargs) -> Dict:
        exclude_unset = kwargs.pop("exclude_unset", False)
        by_alias = kwargs.pop("by_alias", True)

        parsed = self.model_dump(exclude_unset=exclude_unset, by_alias=by_alias, **kwargs)

        if "_id" not in parsed and "id" in parsed:
            parsed["_id"] = str(parsed.pop("id"))

        for key, value in parsed.items():
            if isinstance(value, uuid.UUID):
                parsed[key] = str(value)


        return parsed
    
    def model_dump(self: T, **kwargs) -> Dict:
        dict_ = super().model_dump(**kwargs)

        for key, value in dict_.items():
            if isinstance(value, uuid.UUID):
                dict_[key] = str(value)

        return dict_
    
    def save(self: T, **kwargs) -> T | None:
        collection = _database[self.get_collection_name()]
        try:
            collection.insert_one(self.to_mongo(**kwargs))
            return self
        except Exception:
            return None
        
    @classmethod
    def get_or_create(cls: Type[T], **filter_options) -> T:
        collection = _database[cls.get_collection_name()]
        try:
            instance = collection.find_one(filter_options)
            if instance:
                return cls.from_mongo(instance)

            new_instance = cls(**filter_options)
            new_instance = new_instance.save()

            return new_instance
        except Exception:
            raise

    @classmethod
    def bulk_insert(cls: Type[T], links: List[T], **kwargs) -> bool:
        collection = _database[cls.get_collection_name()]
        try:
            collection.insert_many(link.to_mongo(**kwargs) for link in links)
            return True
        except Exception:
            return False

    @classmethod
    def find(cls: Type[T], **filter_options) -> T | None:
        collection = _database[cls.get_collection_name()]
        try:
            instance = collection.find_one(filter_options)
            if instance:
                return cls.from_mongo(instance)
            return None
        except Exception:
            return None

    @classmethod
    def bulk_find(cls: Type[T], **filter_options) -> List[T]:
        collection = _database[cls.get_collection_name()]
        try:
            instances = collection.find(filter_options)
            return [link for instance in instances if (link := cls.from_mongo(instance)) is not None]
        except Exception:
            return []
    

    @classmethod
    def get_collection_name(cls: Type[T]) -> str:
        if not hasattr(cls, "Settings") or not hasattr(cls.Settings, "name"):
            raise NotImplementedError
        
        return cls.Settings.name

In [5]:
class Link(NoSQLBaseLink):
    title: str = Field(..., description="The title of the link")
    url: HttpUrl = Field(..., description="The URL of the link")
    source: Optional[str] = Field(None, description="The source of the link")
    published_at: Optional[datetime] = Field(None, description="The publication date of the link")
    extracted_at: datetime = Field(default_factory=datetime.now, description="The timestamp when the link was extracted")

    class Settings:
        name = "link"

In [None]:
class BaseCrawler(ABC):
    @abstractmethod
    def search(self, link: str, **kwargs) -> None:
        raise NotImplementedError


class BaseSeleniumCrawler(BaseCrawler, ABC):
    def __init__(self, scroll_limit: int = 5) -> None:
        self.driver = WebDriverFactory(ShutilBrowserLocator()).get_webdriver()
        self.scroll_limit = scroll_limit
        self.soup = None

def extract_date_from_url(url: str) -> str:
    # Regular expression to match the date in the format YYYY/MM/DD
    match = re.search(r"(\d{4}/\d{2}/\d{2})", url)

    if match:
        date_str = match.group(1)
        # Convert the date string to a datetime object
        return datetime.strptime(date_str, "%Y/%m/%d")
    else:
        return None

def extract_title(url: str) -> str:
    last_segment = url.rsplit("/", 1)[-1]

    # Remove HTML-like extensions
    last_segment = re.sub(r"\.html?|\.htm|\.ghtml$", "", last_segment)

    # Replace separators (-, _, etc.) with spaces and convert to lowercase
    title = re.sub(r"[-_]", " ", last_segment)

    # Optional: Replace multiple spaces with a single space
    title = re.sub(r"\s+", " ", title).strip()

    return title

def extract_links(elements: List[Tag]):
    data = []
    for element in elements:
        url = element.get("href")

        title = element.text
        if len(title) < 5:
            title = extract_title(url)

        published_at = extract_date_from_url(url)

        link = {
            "title": title,
            "url": url,
            "published_at": published_at,
        }

        data.append(link)
    
    return data


class G1Crawler(BaseSeleniumCrawler):
    model = Link

    def __init__(self, scroll_limit: int = 5) -> None:
        super().__init__(scroll_limit=scroll_limit)

    def scroll_page(self) -> None:
        load_mode = 0
        page_number = 0
        last_page_number = 0

        while True:
            self.driver.execute_script(
                "window.scrollTo(0, document.body.scrollHeight);"
            )
            time.sleep(np.random.randint(2, 5))
            # Wait for the "Veja mais" link to appear with the next page number
            try:

                load_more_link = WebDriverWait(self.driver, 10).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, "div.load-more a"))
                )

                url = load_more_link.get_dom_attribute("href")
                page_number = self._extract_page_number(url)

                if page_number > last_page_number:
                    load_mode += 1
                    last_page_number = page_number

                    if load_mode >= 6:
                        break
                load_more_link.click()
            except Exception as e:
                print("see more link not found yet, scrolling more...")

    def _extract_page_number(self, url):
        match = re.search(r"pagina-(\d+)", url)
        if match:
            return int(match.group(1))
        return None

    def search(self, link: str, **kwargs) -> None:
        self.driver.get(link)
        time.sleep(5)
        self.scroll_page()
        soup = BeautifulSoup(self.driver.page_source, "html.parser")
        elements = soup.find_all("a", href=True)
        hyperlinks = extract_links(elements)
        self.driver.close()

        hyperlink_list = []
        for hyperlink in hyperlinks:
            try:
                hyperlink_list.append(Link(title=hyperlink["title"], url=hyperlink["url"], source=link, published_at=hyperlink["published_at"]))
            except ValueError as e:
                continue
            
        self.model.bulk_insert(hyperlink_list)

class BandCrawler(BaseSeleniumCrawler):
    model = Link

    def __init__(self, scroll_limit: int = 2) -> None:
        super().__init__(scroll_limit=scroll_limit)
        self.links = None

    def scroll_page(self) -> None:
        """Scroll through the LinkedIn page based on the scroll limit."""
        last_height = self.driver.execute_script("return document.body.scrollHeight")
        current_scroll = 0
        while True:
            self.driver.execute_script(
                "window.scrollTo(0, document.body.scrollHeight);"
            )
            time.sleep(5)
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height or (
                self.scroll_limit and current_scroll >= self.scroll_limit
            ):
                break
            last_height = new_height
            current_scroll += 1

    def search(self, link: str, **kwargs) -> None:
        self.driver.get(link)
        time.sleep(5)
        self.scroll_page()
        soup = BeautifulSoup(self.driver.page_source, "html.parser")
        elements = soup.find_all("a", href=True)
        hyperlinks = extract_links(elements)
        self.driver.close()

        hyperlink_list = []
        for hyperlink in hyperlinks:
            try:
                hyperlink_list.append(Link(title=hyperlink["title"], url=hyperlink["url"], source=link, published_at=hyperlink["published_at"]))
            except ValueError as e:
                continue
            
        self.model.bulk_insert(hyperlink_list)

class R7Crawler(BaseSeleniumCrawler):
    model = Link
    
    def __init__(self, scroll_limit: int = 2) -> None:
        super().__init__(scroll_limit=scroll_limit)
        self.links = None

    def scroll_page(self) -> None:
        """Scroll through the LinkedIn page based on the scroll limit."""
        last_height = self.driver.execute_script("return document.body.scrollHeight")
        current_scroll = 0
        while True:
            self.driver.execute_script(
                "window.scrollTo(0, document.body.scrollHeight);"
            )
            time.sleep(5)
            new_height = self.driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height or (
                self.scroll_limit and current_scroll >= self.scroll_limit
            ):
                break
            last_height = new_height
            current_scroll += 1

    def search(self, link: str, **kwargs) -> None:
        self.driver.get(link)
        time.sleep(5)
        self.scroll_page()
        soup = BeautifulSoup(self.driver.page_source, "html.parser")
        elements = soup.find_all("a", href=True)
        hyperlinks = extract_links(elements)
        self.driver.close()

        hyperlink_list = []
        for hyperlink in hyperlinks:
            try:
                hyperlink_list.append(Link(title=hyperlink["title"], url=hyperlink["url"], source=link, published_at=hyperlink["published_at"]))
            except ValueError:
                continue
            
        self.model.bulk_insert(hyperlink_list)

In [7]:
#g1_crawler = G1Crawler()
#g1_crawler.search(link='https://g1.globo.com')

In [8]:
#band_crawler = BandCrawler()
#band_crawler.search(link='https://bandnewstv.uol.com.br')

In [9]:
#r7_crawler = R7Crawler()
#r7_crawler.search(link='https://www.r7.com')

In [10]:
from urllib.parse import urlparse


class CrawlerRegistry:
    def __init__(self):
        self._crawlers = {}

    def register(self, name, crawler: BaseCrawler):
        if name in self._crawlers:
            raise ValueError("Component '%s' is already registered.", name)
        self._crawlers[name] = crawler

    def get(self, name):
        parsed_domain = urlparse(name)
        name = self._extract_netloc(parsed_domain)

        if name not in self._crawlers:
            raise KeyError("Component '%s' not found.")
        return self._crawlers[name]()

    def _extract_netloc(self, domain):
        return f"{domain.scheme}://{domain.netloc}/"

    def list_crawlers(self):
        return list(self._components.keys())

    
registry = CrawlerRegistry()
registry.register("http://g1.globo.com/", G1Crawler)
registry.register("https://www.r7.com/", R7Crawler)
registry.register('https://bandnewstv.uol.com.br/', BandCrawler)

In [12]:
g1_crawler = registry.get("http://g1.globo.com/noticia-dia-2")

In [13]:
g1_crawler.search("http://g1.globo.com/")

see more link not found yet, scrolling more...
