# helper

> auxilary helper functions

In [None]:
#| default_exp helper

In [None]:
#| export 
from urllib.parse import urlparse, parse_qs
import pandas as pd
from io import StringIO
from playwright.async_api import  Page, Locator
from functools import partialmethod
from html2text import HTML2Text
from typing import Union
from IPython.display import Markdown, display

In [None]:
#| hide
#| export
h2t = HTML2Text(bodywidth=20000)
h2t.ignore_links = True
h2t.mark_code = True
h2t.ignore_images = True

In [None]:
#| hide
from pw.core import *

## View MD in `notebook`

In [None]:
#| export 

def print_md(s : str):
    """
    Given a string display markdown in Notebook
    """
    display(Markdown(s))

## Extract table to dataframe

In [None]:
#| export

async def table2df(table:Locator):
    """Given a html table element it extracts the table obj and convert it to pandas dataframe"""
    try:
        await table.wait_for(state='visible')
        
        table_html = await table.evaluate('element => element.outerHTML')
        
        df = pd.read_html(StringIO(table_html))[0]
        return df
        
    except Exception as e:
        print(f"Error converting table to DataFrame: {e}")
        return None

Locator.table2df = partialmethod(table2df)

In [None]:
async with setup_browser(n=1) as obj:
    if obj.is_valid:
        page = obj.pages[0]
        await page.goto("https://en.wikipedia.org/wiki/List_of_largest_companies_by_revenue")
        await page.wait()
        ele = await page.find_ele('//table[@class="wikitable sortable plainrowheaders jquery-tablesorter"]') 
        assert len(ele) != 0

        df = await ele[0].table2df()
        assert len(df) != 0

df.head()

Unnamed: 0,Rank,Country,Companies
0,1,United States of America,22
1,2,China,11
2,3,Germany,4
3,4,United Kingdom,2
4,4,Switzerland,2


## Extract html object to md

In [None]:
#| export

async def h2md(ele : Union[Page, Locator]):
    "Convert HTML `h` to markdown using `HTML2Text"

    obj = await ele.inner_html() if isinstance(ele, Locator) else await ele.content() 

    return h2t.handle(str(obj))

Page.h2md = partialmethod(h2md)
Locator.h2md = partialmethod(h2md)

In [None]:
async with setup_browser(n=1) as obj:
    if obj.is_valid:
        page = obj.pages[0]
        await page.goto("https://example.com/")
        await page.wait()        
        print_md(await page.h2md())

# Example Domain  
  
This domain is for use in illustrative examples in documents. You may use this domain in literature without prior coordination or asking for permission.

More information...



In [None]:
async with setup_browser(n=1) as obj:
    if obj.is_valid:
        page = obj.pages[0]
        await page.goto("https://en.wikipedia.org/wiki/List_of_largest_companies_by_revenue")
        await page.wait()
        ele = await page.find_ele('//table[@class="wikitable sortable plainrowheaders jquery-tablesorter"]') 
        print_md(await ele[0].h2md())

Breakdown by country  Rank  | Country  | Companies   
1  |  United States of America | 22   
2  |  China | 11   
3  |  Germany | 4   
4  |  United Kingdom | 2   
4  |   Switzerland | 2   
6  |  Japan | 1   
6  |  France | 1   
6  |  Italy | 1   
6  |  India | 1   
6  |  Netherlands | 1   
6  |  South Korea | 1   
6  |  Saudi Arabia | 1   
6  |  Singapore | 1   
6  |  Taiwan | 1 



## Domain helpers

In [None]:
#| export
def domain(url:str): 
    """
    Extract domain i.e. netloc given a url
    """
    return urlparse(url).netloc

In [None]:
urls = ['https://fast.ai/getting_started.html', 'https://fast.ai/getting_started.html#copyright', 'https://fast.ai/getting_started.html#year=2008-09&quarter=quarter1?a=3']
assert domain("") == ""
assert domain(urls[0]) == 'fast.ai'

In [None]:
#| export 

def is_same_resource(url1: str, url2: str) -> bool:
    """
    Takes in two urls and check if two url have any wuery param
    """
    p1, p2= urlparse(url1), urlparse(url2)
    
    # Parse query and fragment parameters separately
    q1, f1 = parse_qs(p1.query), parse_qs(p1.fragment)
    q2, f2 = parse_qs(p2.query), parse_qs(p2.fragment)
    
    # Combine parameters
    params1 = {**q1, **f1}
    params2 = {**q2, **f2}
    #print(f"{q1=} {q2=} {f1=} {f2=}")
    has_quarter = ('quarter' in params1 or 'quarter' in params2)
    
    base_same = (
        p1.scheme == p2.scheme and
        p1.netloc == p2.netloc and
        p1.path == p2.path
    )
    
    return base_same and not has_quarter

In [None]:
assert is_same_resource(*urls[:-1])
assert not is_same_resource(*urls[1:])

In [None]:
#| export
import re

def url2fn(url: str) -> str:
    """takes in a url and return a filename by substituting it with _."""
    # Remove scheme (http:// or https://)
    url = url.split('://')[-1]
    
    fn = re.sub(r'[^a-zA-Z0-9]', '_', url)
    fn = re.sub(r'_+', '_', fn)
    fn = fn.strip('_')
    
    return fn

In [None]:
[url2fn(i) for i in urls]

['fast_ai_getting_started_html',
 'fast_ai_getting_started_html_copyright',
 'fast_ai_getting_started_html_year_2008_09_quarter_quarter1_a_3']

In [None]:
#| export
IGNORE_EXT = ['.css', '.jpg', '.jpeg', '.png', '.gif', '.ico', '.js', '.svg', 'api']

In [None]:
#| hide 
#| export
from graphviz import Digraph

def _f():
    dot = Digraph(comment='Crawler Flow')
    dot.attr(rankdir='LR')  # Make the graph horizontal

    # Create subgraph to force alignment
    with dot.subgraph() as s:
        s.attr(rank='same')  # Force nodes to same rank (level)
        s.node('before', 'before_visit\ncallback')
        s.node('visit', 'one_visit')
        s.node('after', 'after_visit\ncallback')

    # Add start/end nodes
    dot.node('start', 'Start', shape='doublecircle')
    dot.node('end', 'End', shape='doublecircle')

    # Add edges
    dot.edge('start', 'before')
    dot.edge('before', 'visit')
    dot.edge('visit', 'after')
    dot.edge('after', 'end')
    dot.edge('after', 'before', constraint='false',  curve_style='curved')
    dot.render("flow", format='png')
#_f()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()