In [None]:
import datetime
import os
import requests
import bs4
import numpy as np
import pandas as pd
from io import StringIO
from urllib.parse import urljoin, urlparse
import time

import matplotlib.pyplot as plt

In [2]:
def get_parent(tag, pfr_list=True):
    """
    Retrieves the parent tag of a given HTML tag.
    It immediately returns the tag 'i' if found in the parent-finding.

    Parameters
    ----------
    tag : bs4.element.Tag
        The HTML tag whose parent is to be obtained.
    pfr_list : bool, optional
        If `True`, iterates through the primary tags (p, li, th, td) to find 
        the parent. If `False`, only returns the immediate parent.
        Defaults to True.

    Returns
    -------
    parent : bs4.element.Tag or None
        The parent tag of the input tag. Returns None if no parent is found.
    """

    parent = tag.parent
    if parent and parent.name == 'i':  # italics for exclusion later
        return parent
    parents = ['p', 'li', 'th', 'td']
    if pfr_list:  # if not usual container, just use the first parent
        while parent and parent.name not in parents:  # iterate until tag
            parent = parent.parent
            if parent and parent.name == 'i':  # italics for exclusion later
                return parent
    return parent


def enclosed(tag, pfr_list=True):
    """
    Checks if an HTML tag is enclosed within parentheses.

    Parameters
    ----------
    tag : bs4.element.Tag
        The HTML tag to be checked.
    pfr_list : bool, optional
        If `True`, considers the tag's immediate parent and its ancestors. If 
        `False`, only considers the immediate parent. Defaults to True.

    Returns
    -------
    bool
        True if the tag is enclosed within parentheses, False otherwise.
    """

    parent = get_parent(tag, pfr_list)
    if parent is None:
        return False  # not enclosed if no parent
    if parent.name == 'i':  # enclosed in italic tag
        return True
    parent_text = str(parent)  # includes the html tags
    tag_text = str(tag)  # includes the html tags
    idx = parent_text.index(tag_text)
    l_cnt = parent_text[:idx].count('(')
    r_cnt = parent_text[:idx].count(')')
    if l_cnt == 0 or l_cnt - r_cnt != 1:  # left ( not found, or no open left
        return False
    l_cnt = parent_text[idx+len(tag_text):].count('(')
    r_cnt = parent_text[idx+len(tag_text):].count(')')
    # print('right >> ' + parent_text[idx+len(tag_text):])
    if r_cnt == 0 or r_cnt - l_cnt != 1:  # right ( not found, or no open right
        return False
    return True  # enclosed


def online(alink, url):
    """
    Checks if an anchor link is online and accessible.

    Parameters
    ----------
    alink : bs4.element.Tag
        The anchor link tag to be checked.
    url : str
        The base URL to which the anchor link is appended.

    Returns
    -------
    bool
        True if the anchor link is accessible, False otherwise.
    """

    try:
        response = requests.get(urljoin(url, alink['href']))
    except Exception:
        return False
    if response.status_code == 200:
        return True
    return False


def wiki(alink, stop_words):
    """
    Checks if an anchor link is a valid Wikipedia link.

    Parameters
    ----------
    alink : bs4.element.Tag
        The anchor link tag to be checked.
    stop_words : list
        List of stop words to exclude invalid links.

    Returns
    -------
    bool
        True if the anchor link is a valid Wikipedia link, False otherwise.
    """

    link = alink['href']
    netloc = urlparse(link).netloc
    if netloc == '' or 'wikipedia.org' in netloc:
        return not any([word for word in stop_words if word in link])
    return False


def get_link(body, stop_words, url):
    """
    Retrieves the first valid link within the primary tags (p, li, th, td)
    of the main body.

    Parameters
    ----------
    body : bs4.element.Tag
        The main content of the web page.
    stop_words : list
        List of stop words to exclude invalid links.
    url : str
        The base URL of the web page.

    Returns
    -------
    link : str
        The URL of the first valid link found.
    """

    link = ''
    a_str = ('a[href]:not([href^="#cite_note"]):not([href^="#"])'
             ':not(.mw-disambig)')
    p_str = ['p', 'li', 'th', 'td']
    alinks = body.select(', '.join([f'{p} {a_str} ' for p in p_str]))
    for alink in alinks:
        if (wiki(alink, stop_words) and online(alink, url)
                and not enclosed(alink)):
            link = alink['href']
            break
    return link


def get_other_link(body, extags, stop_words, url):
    """
    Retrieves a valid link from the main body if no valid links are found
    in get_link().

    Parameters
    ----------
    body : bs4.element.Tag
        The main content of the web page.
    extags : list
        List of extracted tags from the main content to be added back.
    stop_words : list
        List of stop words to exclude invalid links.
    url : str
        The base URL of the web page.

    Returns
    -------
    link : str
        The URL of another valid link found.

    Raises
    ------
    IndexError
        If no valid links are found.
    """

    link = ''
    a_str = ('a[href]:not([href^="#cite_note"]):not([href^="#"]):'
             'not(.mw-disambig)')
    for extag in extags:  # add back extracted tags for full search
        body.append(extag)
    alinks = body.select(f'{a_str}')
    for alink in alinks:
        if (wiki(alink, stop_words) and online(alink, url)
                and not enclosed(alink, False)):
            link = alink['href']
            break
    if link == '':  # no valid links in the body
        raise IndexError
    return link

In [3]:
def web_crawler(a, dir_, url=None, lang='en', linkhist={}):
    """
    This function starts from a random Wikipedia page and follows the first
    page link until it reaches the Philosophy page, a page with no links, or
    loops back to a previously visited link.
    This function accepts a link:DoS (linkhist) dictionary which shorts the
    path to Philosopy, removing redundant crawls from previous runs.

    Parameters
    ----------
    a : int
        Index for the web crawl.
    dir_ : str
        Directory path where the output scrape CSV file will be saved.
    url : str, optional
        The starting URL for the web crawl. If not provided, a random Wikipedia
        page URL will be chosen based on the specified language.
    lang : str, optional
        The language code for the Wikipedia pages. Defaults to 'en' (English).
        Another option is 'de' (German).
    linkhist : dict, optional
        Dictionary to store link history - used for the short path, reducing
        crawl iterations.

    Returns
    -------
    flinks : list
        List of visited page URLs during the crawl.
    dosl : list
        List of degrees of separation (DoS) for each visited page.
    msg : str
        Message indicating the status of the crawl (e.g., 'OK - normal path' or
        'NOK - no links found').

    Notes
    -----
    - The function excludes certain types of links (e.g., language links,
      disambiguation pages, external links) during the crawl.
    - It prioritizes links within specific HTML tags and considers the main tex
      of the Wikipedia page.
    - The DoS represents the number of steps from the starting page to the
      Philosophy page.
    - It outputs the single scrape result into a CSV file.

    Example
    -------
    flinks, dosl, msg = web_crawler(1, 'output/', lang='en')
    """

    if not bool(url):
        url = f'https://{lang}.wikipedia.org/wiki/Special:Random'
    philo = {
        'en': 'Philosophy',
        'de': 'Philosophie'
    }
    target = f'https://{lang}.wikipedia.org/wiki/{philo[lang]}'
    stop_words = {
        'en': ['Wikipedia:', 'Wikipedia talk:', 'User_talk:', 'Talk:',
               'Help:', 'Help talk:', 'Project:', 'Project talk:',
               'Portal:', 'Portal talk:', 'Template:', 'Template talk:',
               'File:', 'File talk:', 'Special:', 'index.php',
               'Category:', 'Category talk:', 'Template_talk:', 'MOS:'],
        'de': ['Wikipedia:', 'Wikipedia Diskussion:', 'Benutzer Diskussion:',
               'Diskussion:',
               'Hilfe:', 'Hilfe Diskussion:', 'Projekt:',
               'Projekt Diskussion:',
               'Portal:', 'Portal Diskussion:', 'Vorlage:',
               'Vorlage Diskussion:',
               'Datei:', 'Datei Diskussion:', 'Spezial:', 'index.php',
               'Kategorie:', 'Kategorie Diskussion:', 'Vorlage Diskussion:',
               'MOS:']
    }
    s = requests.Session()
    response = s.get(url)
    link = urlparse(response.url).path
    flink = urljoin(url, link)
    flinks = []
    flinks.append(flink)
    print(link)
    ldos = 0
    dos = -1
    msg = ''
    while True:
        response = s.get(flink)
        soup = bs4.BeautifulSoup(response.text)
        body = soup.find('div', id='bodyContent')
        for table in body.find_all('table', class_=['infobox', 'sidebar',
                                                    'metadata']):
            table.extract()
        for div in body.find_all('div', role='note'):
            div.extract()
        extags = []
        for extag in body.find_all('div', class_='thumbcaption'):
            extags.append(extag.extract())
        for extag in body.find_all('figcaption'):
            extags.append(extag.extract())
        for extag in body.find_all('table', class_=['infobox',
                                                    'standings-box']):
            extags.append(extag.extract())
        try:
            link = get_link(body, stop_words[lang], url)
            if link == '':  # no valid link within p, li, th, or td tags
                link = get_other_link(body, extags, stop_words[lang], url)
            flink = urljoin(url, link)
        except IndexError:
            print('No valid links found in current page.')
            print(f'Failed to reach {philo[lang]}; DoS = -1.')
            msg = f'NOK - no links at {link} (last idx: {len(flinks)-1})'
            break
        # link = link.split('wiki/')[-1]
        if flink == target:
            flinks.append(flink)
            print(link)
            print(f'Successfully reached {philo[lang]} w/ DoS '
                  f'= {len(flinks)-1}.')
            msg = 'OK - normal path.'
            break
        elif flink in flinks:
            print(f'Looped back to {link}.')
            print(f'Failed to reach {philo[lang]}; DoS = -1.')
            msg = (f'NOK - looped at {link} (link idx: {flinks.index(flink)}; '
                   f'last idx: {len(flinks)-1})')
            break
        else:
            flinks.append(flink)
            print(link)
            if link in linkhist:
                ldos = linkhist[link]
                print(f'Short path found ({link}; DoS = {ldos}).')
                dos = -1 if ldos == -1 else len(flinks) - 1 + ldos
                if dos == -1:
                    print(f'Failed to reach {philo[lang]}; DoS = -1.')
                    msg = (f'NOK - short path at {link} '
                           f'(last idx: {len(flinks)-1}).')
                else:
                    print(f'Successfully reached {philo[lang]} '
                          f'w/ DoS = {dos}.')
                    msg = (f'OK - short path at {link} '
                           f'(last idx: {len(flinks)-1}).')
                break
        time.sleep(1.25)

    print(flinks)
    if flinks[-1] == urljoin(url, target):
        dos = len(flinks) - 1
    if dos == -1:
        dosl = -1*np.ones(len(flinks))
    else:
        dosl = np.arange(dos, ldos-1, -1)
    df = pd.DataFrame({'Links': flinks, 'DoS': dosl})
    if not os.path.exists(f'{dir_}'):
        os.makedirs(f'{dir_}')
    df.to_csv(f'{dir_}links{a:03d}.csv', index=False)
    print(f'Crawl result saved to {dir_}links{a:03d}.csv')
    return flinks, dosl, msg

In [5]:
# main function to generate data (can be random, or can be from a list of urls)

def crawl_iter(basedir, urls=list(), lang='en', a=0, b=100, test=False):
    """
    This function iteratively calls the web_crawler function.
    It also loads and saves a link:DoS (linkhist) dictionary which shorts
    the path to Philosopy in web_crawler, removing redundant crawls from
    previous runs.

    Parameters
    ----------
    basedir : str
        Base directory where output files will be saved.
    urls : list, optional
        List of specific URLs to crawl. If not provided, random Wikipedia pages 
        will be crawled.
    lang : str, optional
        Language code for Wikipedia pages. Defaults to 'en' (English).
        Another option is 'de' (German).
    a : int, optional
        Starting index for iteration. Defaults to 0.
    b : int, optional
        Ending index for iteration. Defaults to 100.
        This index is ignored if 'urls' is passed.
    test : bool, optional
        If `True`, performs a test run without saving Link:DoS history.
        Defaults to False.

    Returns
    -------
    df : pandas.DataFrame
        DataFrame containing consolidated results of the crawl.

    Notes
    -----
    - The function saves Link:DoS history in a CSV file for future iterations.
    - It consolidates crawl results into a CSV file based on the specified 
      iteration range.
    - Specify `test=True` for a trial run without saving Link:DoS history.

    Example
    -------
    df = crawl_iter('output/', lang='en', a=0, b=50)
    """

    dir_ = f'{lang}_{basedir}'
    print(f'Crawling {"English" if lang == "en" else "German"} Wikipedia...')
    try:
        dflhist = pd.read_csv(f'{dir_}linkhist.csv', header=0)
        linkhist = dict(zip(dflhist['Link'], dflhist['DoS']))
        print(f'Link:DoS history loaded from {dir_}linkhist.csv - short paths '
              'from history will be used.')
        print(f'History will{"" if not test else " NOT"} be updated after this '
              'run.')
        # print(linkhist)
    except Exception:
        # print(e)
        print('Link:DoS history not loaded - short paths will occur more '
              'frequency with more iterations.')
        linkhist = dict()
    if bool(urls):
        b = a + len(urls)
    data = []
    for i in range(a, b):
        url = urls[i - a] if bool(urls) else ''
        print(f'\n--- Iter {i} ---')
        flinks, dosl, msg = web_crawler(i, dir_, url, lang, linkhist)
        data.append([urlparse(flinks[0]).path, dosl[0], flinks, msg])
        links = [urlparse(flink).path for flink in flinks]
        linkhist.update({link: dos.astype(int) for link, dos in zip(links, dosl)})
    df = pd.DataFrame(data, columns=['Start', 'DoS', 'Path', 'Msg'])
    df.to_csv(f'{dir_}conso{a:03d}.{b-1:03d}.csv', index=False)
    print(f'\nConsolidated results saved to {dir_}conso{a:03d}.{b-1:03d}.csv')
    # print(linkhist)
    if not test:
        dflhist = pd.DataFrame(list(linkhist.items()), columns=['Link', 'DoS'])
        dflhist.to_csv(f'{dir_}linkhist.csv', index=False)
        print(f'Link:DoS history saved to {dir_}linkhist.csv')
    return df


# urls = ['https://en.wikipedia.org/wiki/Kevin_Bacon',
#         'https://en.wikipedia.org/wiki/Doom_(2016_video_game)']
# urls = ['https://en.wikipedia.org/wiki/wiki/Baarbach',
#         'https://en.wikipedia.org/wiki/wiki/Data']
crawl_iter(basedir='res0513f/', urls=[], lang='de', a=0, b=100)

Crawling English Wikipedia...
Link:DoS history not loaded - short paths will occur more frequency with more iterations.

--- Iter 0 ---
/wiki/Doni_River
/wiki/Sangli
/wiki/Sangli_District
/wiki/Districts_of_Maharashtra
/wiki/Maharashtra
/wiki/States_and_union_territories_of_India
/wiki/India
/wiki/South_Asia
/wiki/Subregion#Asia
/wiki/Region
/wiki/Geography
/wiki/Earth
/wiki/Planet
/wiki/Hydrostatic_equilibrium
/wiki/Fluid_mechanics
/wiki/Physics
/wiki/Natural_science
/wiki/Branches_of_science
/wiki/Sciences
/wiki/Scientific_method
/wiki/Empirical_evidence
/wiki/Proposition
/wiki/Philosophy_of_language
/wiki/Analytic_philosophy
/wiki/Contemporary_philosophy
/wiki/Western_philosophy
/wiki/Philosophy
Successfully reached Philosophy w/ DoS = 26.
['https://en.wikipedia.org/wiki/Doni_River', 'https://en.wikipedia.org/wiki/Sangli', 'https://en.wikipedia.org/wiki/Sangli_District', 'https://en.wikipedia.org/wiki/Districts_of_Maharashtra', 'https://en.wikipedia.org/wiki/Maharashtra', 'https://e

Unnamed: 0,Start,DoS,Path,Msg
0,/wiki/Doni_River,26.0,"[https://en.wikipedia.org/wiki/Doni_River, htt...",OK - normal path.
1,/wiki/MU%C5%BBA,21.0,"[https://en.wikipedia.org/wiki/MU%C5%BBA, http...",OK - short path at /wiki/Physics (last idx: 10).
2,/wiki/Arabian_carpet,21.0,"[https://en.wikipedia.org/wiki/Arabian_carpet,...",OK - short path at /wiki/Geography (last idx: 5).
3,/wiki/Fly_line,21.0,"[https://en.wikipedia.org/wiki/Fly_line, https...",OK - short path at /wiki/Organism (last idx: 6).
4,/wiki/Western_Gunfighters,-1.0,[https://en.wikipedia.org/wiki/Western_Gunfigh...,NOK - looped at /wiki/United_States (link idx:...
...,...,...,...,...
195,/wiki/Directed_by_Ken_G._Hall,15.0,[https://en.wikipedia.org/wiki/Directed_by_Ken...,OK - short path at /wiki/Officer_of_the_Order_...
196,/wiki/Steve_Condos,10.0,"[https://en.wikipedia.org/wiki/Steve_Condos, h...",OK - short path at /wiki/The_arts (last idx: 3).
197,/wiki/Ilya_Pervukhin,22.0,"[https://en.wikipedia.org/wiki/Ilya_Pervukhin,...",OK - short path at /wiki/Water_sport (last idx...
198,"/wiki/Cedar_Township,_Lee_County,_Iowa",-1.0,"[https://en.wikipedia.org/wiki/Cedar_Township,...",NOK - short path at /wiki/United_States (last ...


In [None]:
# diff data

url = 'https://en.wikipedia.org/'
basedir = 'res0513_val/'
in_path = 'wiki_scraper_results_val.csv'
data = []
with open(in_path, 'r') as f:
    df = pd.read_csv(f)
a, b = 11, 100  # start and end index to retrieve
df = df.iloc[a: b].reset_index(drop=True)
display(df)
urls = df['article'].apply(lambda x: urljoin(url, x)).values.tolist()
ndf = crawl_iter(basedir=basedir, urls=urls, a=a, b=b)
display(ndf)
df['nDoS'] = ndf['DoS']
df['nPath'] = ndf['Path']
df['diffDoS'] = df['nDoS'] - df['DoS']
df['diffPath'] = df['nPath'] == df['path']
df.to_csv(f'en_{basedir}diff{a:03d}.{b-1:03d}.csv', index=False)
print(f'Diff result saved to en_{basedir}diff{a:03d}.{b-1:03d}.csv')


In [78]:
# data via csv

url = 'https://en.wikipedia.org/wiki/'
data = []
for i in range(0, 6):
    with open(f'links{i:03d}.csv') as f:
        df = pd.read_csv(f)
    start = urlparse(df.iloc[0, 0]).path
    dos = df.iloc[0, 1]
    if not bool(urlparse(df.iloc[0, 0]).netloc):
        path = df.iloc[:, 0].apply(lambda x: urljoin(url, x)).values.tolist()
    else:
        path = df.iloc[:, 0].values.tolist()
    data.append([start, dos, path])
n = 2
df = pd.DataFrame(data, columns=['Start', 'DoS', 'Path'])
df.to_csv(f'conso{n:02d}.csv', index=False)