In [1]:
import requests
from requests.sessions import Session
from bs4 import BeautifulSoup
from queue import PriorityQueue
from urllib.parse import urlparse, urljoin, urlunparse
from urllib.robotparser import RobotFileParser
import re
import os.path
import time
import math
import logging
# import threading
# from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from urllib.error import URLError
from requests.exceptions import ConnectTimeout, ReadTimeout, RequestException
import socket

### 1. Configuration & Variable Declarations

In [2]:
COMMON_URLS = ['https://en.wikipedia.org/wiki/List_of_epidemics',
               'https://www.livescience.com/worst-epidemics-and-pandemics-in-history.html',
               'https://en.wikipedia.org/wiki/Pandemic']

UNIQUE_URLS = ['https://www.cdc.gov/vhf/ebola/history/2014-2016-outbreak/index.html',
               'https://www.idsociety.org/public-health/ebola/ebola-resources/ebola-facts/',
               'https://preventepidemics.org/epidemics-that-didnt-happen/ebola/']

SEEDS_URLS = COMMON_URLS + UNIQUE_URLS

cwd = os.getcwd() # get the current working directory                 
PATH_SCRIPT = os.path.abspath(cwd) 
PATH_DIR_RESULTS = os.path.join(PATH_SCRIPT, '..', 'Results', )
PATH_DIR_CRAWLED_DATA = os.path.join(PATH_DIR_RESULTS, "data")
PATH_INLINKS = os.path.join(PATH_DIR_RESULTS, 'links', 'inlinks.json')
PATH_OUTLINKS = os.path.join(PATH_DIR_RESULTS, 'links', 'outlinks.json')
PATH_INLINKS_COUNTS = os.path.join(PATH_DIR_RESULTS, 'links', 'inlinks_counts.json')

KEYWORDS = ["pandemic", "pandemics", "epidemic", "epidemics", "evd", "ebola", "africa", "african", "congo", "west", "disease", "death", "infection", "illness", "transmission", "dead", "virus", "outbreak"]
LIMIT = 30000
BATCH_SIZE = 1000

# config logging
logging.basicConfig(
    filename='main.log',  
    filemode='a',       # append mode    
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

socket.setdefaulttimeout(5)

In [4]:
frontier = PriorityQueue() # Frontier: priority queue => store tuples in format (priority_score, url, wave_number, raw_content)
inlinks_counts = {}  # Inlink Counts: dictionary => key: url, value: the inlink counts (to be easily updated)
inlinks_dict = {} # Inlinks: dictionary => key: url, value: urls that derived from the key url
outlinks_dict = {} # Outlinks: dictionary => key: url, value: the outlinks of current url

# count = 1 # Count the # of pages crawled 

frontier_urls = set()  # after being added to frontier => unique
visited_urls = set()  # after crawling the outlinks and write to files  -- final length = 30,000
bad_urls = set()      # mark links that have been examined as invalid

domain_delays = {} # dictionary to store the crawl delay and the time of the last request for each domain
last_request_time = {} # dict to keep track of the last request time for each domain


BLACK_LIST_EXTENSIONS = (".jpg", ".svg", ".png", ".pdf", ".gif")
BLACK_LIST_DOMAINS = ("youtube", "amazon", "google", "yahoo", "bing", "facebook", "twitter")
MIN_ABS_SCORE = 3



## Helper Methods

### 1. URL: Canonicalization

In [5]:
"""
Canonicalizes a given URL according to specific rules:
"""
def canonicalize_url(url):
    try:
        # use the urlparse to parse the URL
        parse_res = urlparse(url) # ParseResult(scheme='https', netloc='www.mphonline.org', path='/worst-pandemics-in-history/', params='', query='', fragment='')

        # 1. Convert scheme and host to lower case
        scheme = parse_res.scheme.lower()
        netloc = parse_res.netloc.lower()

        # 2. Remove default port 80 for HTTP and 443 for HTTPS
        if scheme == 'http' and parse_res.port == 80:
            netloc = parse_res.hostname
        elif scheme == 'https' and parse_res.port == 443:
            netloc = parse_res.hostname

        path = parse_res.path
        # 3. Make relative URLs absolute
        if not path.startswith('/'):
            path = '/' + path

        # 4. Remove the fragment: make the fragment empty
        fragment = ''

        # 5. Remove duplicate slashes
        path = path.replace('//', '/')

        # use urlunparse to reconstruct the URL
        canonicalized_url = urlunparse((scheme, netloc, path, parse_res.params, parse_res.query, fragment))
        return canonicalized_url
    except Exception as e:
        print(f"An error occurred during URL canonicalization: {e}")
        canonicalized_url = url

    return canonicalized_url

### 2. Politeness Policy: Crawl Delay

In [6]:
"""Helper method to get the crawl delay from robots.txt
"""
def get_crawl_delay(url):
    parsed_url = urlparse(url)
    robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
    rp = RobotFileParser()
    rp.set_url(robots_url)
    
    try:
        rp.read()
    except UnicodeDecodeError as e:
        # logging.error(f"Failed to decode robots.txt for {url}: {e}")
        return 1 
    except URLError as e:
        # logging.error(f"URLError when accessing robots.txt for {url}: {e}")
        return 1 
    except Exception as e:
        # logging.error(f"An unexpected error occurred when accessing robots.txt for {url}: {e}")
        return 1  
    
    # use RobotFileParser to get the 'Crawl-Delay' for the user-agent '*'
    crawl_delay = rp.crawl_delay("*")
    if crawl_delay is None:
        crawl_delay = 1  # if not present, default to one request per second
    
    return crawl_delay


In [7]:
""" Helper method to enforce delay between requests to the same domain.
It ensures that there is a minimum delay between subsequent requests to the same domain to avoid overloading the server 
and to comply with the crawl delay specified by the website.
"""

def enforce_delay(url):
    domain = urlparse(url).netloc
    current_time = time.time()
    sleep_time = 0

    if domain not in domain_delays:
        crawl_delay = get_crawl_delay(url)
        domain_delays[domain] = {'crawl_delay': crawl_delay, 'last_request_time': current_time}
    else:
        last_request_time = domain_delays[domain]['last_request_time']
        crawl_delay = domain_delays[domain]['crawl_delay']
        time_since_last_request = current_time - last_request_time

        if time_since_last_request < crawl_delay:
            sleep_time = crawl_delay - time_since_last_request

        domain_delays[domain]['last_request_time'] = current_time + sleep_time

    if sleep_time > 0:
        try:
            time.sleep(sleep_time)
        except KeyboardInterrupt:
            # Handle the interrupt, possibly by saving state or cleaning up resources
            print(f"Interrupted during sleep: {sleep_time}s delay was enforced for domain {domain}.")



### 3. URL: Qualification Check 

In [8]:
""" Check if the URL uses HTTP or HTTPS protocol.
"""
def is_http_or_https(url):
    parsed_url = urlparse(url)
    if parsed_url.scheme not in ['http', 'https']:
        return False
    return True

"""Determine if the given URL is blacklisted based on its domain and file extension
"""
def is_blacklisted(url):
    if url.lower().endswith(BLACK_LIST_EXTENSIONS):
        return True
    
    domain_pattern = r"https?://(www\.)?({})".format("|".join(BLACK_LIST_DOMAINS))
    if re.search(domain_pattern, url.lower()):
        return True
    
    return False


"""Check the Content-Type and language of the URL's response header.
"""
def is_ideal_type(header_response):

    content_type = header_response.get('Content-Type', '')
    content_language = header_response.get('Content-Language', '')
    
    # Check if the response Content-Type is 'text/html'
    is_html = 'text/html' in content_type
    
    # Check if the Content-Language is English or not specified
    is_english = ('en' in content_language or content_language == '')

    return is_html and is_english

    
"""Use RobotFileParser to check if crawling the URL is allowed.
"""
def is_allowed_by_robots(url):
    parsed_url = urlparse(url)
    robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
    
    rp = RobotFileParser()
    try:
        rp.set_url(robots_url)
        rp.read()
        return rp.can_fetch('*', url) 
    except Exception:
        return True


--------------------------------------------------------
STAGE 2: PRIORITY SCORE CALCULATION

### 1. Inlinks and Outlinks Handling


In [9]:
def update_inlink_count(url):
    if url in inlinks_counts:               
        inlinks_counts[url] += 1           # if the URL is already in the dictionary(duplicate), increment the inlink count 
    else:
        inlinks_counts[url] = 0           # If not, initialize it with a count of 1


def update_inlink_dict(url, source_url):
    if url in inlinks_dict:
        inlinks_dict[url].add(source_url)
    else:
        inlinks_dict[url] = {source_url}


def dump_links(links_dict, filename):
    json_ready_dict = {}

    for key, value in links_dict.items():
        # convert any sets to lists
        if isinstance(value, set):
            json_ready_dict[key] = list(value)
        else:
            json_ready_dict[key] = value

    with open(filename, 'w') as file:
        json.dump(json_ready_dict, file, indent=4)


### 2. Extract Content
- title, anchor text => priority score calculation
- out-links => analyzing outlinks

In [10]:
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}

In [11]:
def get_response(url):
    enforce_delay(url)  
    try:
        if not is_allowed_by_robots(url):
            bad_urls.add(url)
            return None, None
        response = requests.get(url, headers=HEADERS, timeout=5)
        response.raise_for_status() 
        if response.status_code == 200:
            return response.text, response.headers  
        else:
            logging.info(f"Skipped: status code not 200.")
            return None, None
    except socket.timeout:
        logging.error(f"Socket operation timed out for URL: {url}")
        return None, None
    except requests.RequestException as e:
        logging.error(f"Request failed: fetch_raw_html_content -- {e}")
        return None, None
    except Exception as e:
        logging.error(f"Skipped. An error occurred while fetching content from {url}: {e}")
        return None, None

In [12]:
def extract_title_and_anchor_texts(raw_html_content):
    try:
        soup = BeautifulSoup(raw_html_content, 'html.parser')
        
        # Extract the title
        title_tag = soup.find('title')
        page_title = title_tag.get_text(strip=True) if title_tag else "No title found"
        
        # Extract anchor texts
        anchor_texts = []
        for link in soup.find_all('a', href=True):
            text = link.get_text(strip=True)
            if text:  
                anchor_texts.append(text)

        return page_title, anchor_texts

    except Exception as e:
        # logging.exception(f"An error occurred in extract_title_and_anchor_texts while extracting from {url}: {e}")
        return "No title found", []

In [13]:
def extract_outlinks(url, raw_html_content):
    
    soup = BeautifulSoup(raw_html_content, 'html.parser')
    
    # extract outlinks within the raw html content
    outlinks = [] 

    for link in soup.find_all('a', href=True): # use BeautifulSoup find_all method which searches the soup object for all HTML a elements that have an href attribute.
        absolute_link = urljoin(url, link['href'])
        outlinks.append(absolute_link)
    
    return outlinks

### 3. URL Priority Score Calculation & Adding

#### Quality: wave number, domain, in-links count

In [14]:
def cal_domain_score(url, weight):
    domain_scores = {'.gov': 5, '.edu': 4, '.org': 3}     # assign scores based on domain extension
    default_score = 1                                                           # default score for unlisted domain extensions
    parsed_url = urlparse(url)                                                  # get domain extension
    domain_extension = ''.join(parsed_url.netloc.split('.')[-1:])
    # print(f'Domain for:{url}: {domain_extension}')
    return domain_scores.get('.' + domain_extension, default_score)  * weight    # use the domain extension to get the score, default to 1 if not found



def cal_wave_score(wave_number, base_score=1000, decay_rate=0.9):
    wave_number = max(wave_number, 0)  # Ensure the wave number is a positive value
    score = base_score * math.pow(decay_rate, wave_number)  # Calculate the wave score using exponential decay
    return max(score, 1)  # Ensure the weighted score does not drop below a certain threshold (e.g., 1)


def cal_inlink_score(inlink_count, weight):
    return (math.log(inlink_count + 1) if inlink_count > 0 else 0) * weight


def cal_quality_score(url, inlink_count, qual_weight):
    
    # wave_weight = 0.5
    domain_weight = 0.5
    inlink_weight = 0.5
    
    # wave_score = cal_wave_score(wave_number, wave_weight) 
    domain_score = cal_domain_score(url, domain_weight) 
    inlink_score = cal_inlink_score(inlink_count, inlink_weight)
    # print(f'wave_score:{wave_score}\ndomain_score: {domain_score}\ninlink_score: {inlink_score}')

        
    return (domain_score + inlink_score) * qual_weight
    

#### Relevance: url, title, anchor text

In [15]:
# ----------------------------------------------------------------------------------
# Relevance: url keyword, title keyword, anchor text keyword
# ----------------------------------------------------------------------------------

def cal_keyword_score(text, weight):
    if not text:
        return 0
    
    text = text.lower()                                                  # normalize the text to lower case
    keyword_count = sum(text.count(keyword) for keyword in KEYWORDS)     # count occurrences of each keyword
    return keyword_count * weight                                        # multiply by weight, might want to normalize by text length


def cal_anchor_text_score(anchor_texts, weight):
    score = 0
    total_anchor_text_length = sum(len(anchor_text.split()) for anchor_text in anchor_texts if anchor_text.strip())

    for anchor_text in anchor_texts:
        anchor_text = anchor_text.strip()
        if anchor_text:
            normalized_anchor_text = anchor_text.lower()
            keyword_count = sum(normalized_anchor_text.count(keyword) for keyword in KEYWORDS)
            anchor_text_length = len(anchor_text.split())
            keyword_density = keyword_count / anchor_text_length if anchor_text_length else 0             # Calculate the density of keywords in this anchor text
            score += keyword_density

    average_density_score = (score / len(anchor_texts)) if anchor_texts else 0     # If there are anchor texts, calculate the average keyword density score
    normalized_score = average_density_score * weight

    return normalized_score

def cal_relevance_score(url, title, anchor_texts, rele_weight):
    url_keyword_weight = 0.4
    title_keyword_weight = 0.4
    anchor_texts_weight = 0.3
    # get url keyword score
    url_keyword_score = cal_keyword_score(url, url_keyword_weight)
    # get title keyword score
    if title == "No title found" or not title:
        title_keyword_score = 0
    else:
        title_keyword_score = cal_keyword_score(title, title_keyword_weight)
    # get anchor text score
    anchor_texts_score = cal_anchor_text_score(anchor_texts, anchor_texts_weight)
    
    # print(f'url_keyword_score:{url_keyword_score}\ntitle_keyword_score: {title_keyword_score}\nanchor_texts_score: {anchor_texts_score}')
    return (url_keyword_score 
            + title_keyword_score
            + anchor_texts_score) * rele_weight

#### Main Method

In [16]:
def priority_score(url, wave_number, inlink_count, 
                   title, anchor_texts):
    
    qual_weight = 0.2
    rele_weight = 0.8
    
    wave_score = cal_wave_score(wave_number) 
    qual_score = cal_quality_score(url, inlink_count, qual_weight)
    rele_score = cal_relevance_score(url, title, anchor_texts, rele_weight)
    # print(f'{url}: quality: {qual_score}, relevance: {rele_score}\n')
    result = wave_score + (qual_score + rele_score) * 10
    # logging.info(f"Score calculation DONE: {result} - wave_score: {wave_score} -- wave: {wave_number}")

    return result

In [17]:
def get_absolute_score(prior_score, wave_number):
    wave_score = cal_wave_score(wave_number)
    abs_score = prior_score - wave_score
    return abs_score

--------------------------------------------------------
STAGE 3: ADD TO FRONTIER

### Adding method


In [18]:
def add_url(url, priority_score, wave_number, text_response):
    frontier.put((-priority_score, url, wave_number, text_response))
    frontier_urls.add(url)
    logging.info(f"Frontier Progress: {len(frontier_urls)}/{LIMIT} - score: {priority_score}, wave: {wave_number}")

--------------------------------------------------------
STAGE 4: DOCUMENT PROCESSING
### Write Document Data

In [19]:
def remove_punctuation(text):
    # Define punctuation characters to be removed
    punctuation_pattern = re.compile(r'[^\w\s]', re.UNICODE)
    # Remove punctuation from the text
    return punctuation_pattern.sub('', text)

def get_title_and_text(raw_html_content):
    try:        
        if raw_html_content is None:
            return "", ""
        
        soup = BeautifulSoup(raw_html_content, 'html.parser')
        
        # get the title
        title_tag = soup.find('title')
        page_title = title_tag.get_text(strip=True) if title_tag else ""
        
        # remove punctuation from the title
        page_title = remove_punctuation(page_title)
        
        # extract the main text and remove punctuation from paragraphs
        paragraphs = soup.find_all('p')
        # english_paragraphs = [remove_punctuation(p.get_text(strip=True)) for p in paragraphs]
        stripped = [p.get_text(strip=True) for p in paragraphs]

        text = ' '.join(stripped)

        return page_title, text

    except Exception as e:
        # logging.exception(f"An error occurred while extracting data from {url}: {e}")
        return "", ""
    

def formatter(canon_url, title, text):    
    # Use the provided canon_url instead of the one from the article object
    docno = canon_url
    
    # Format the content
    content = f'<DOC>\n<DOCNO>{docno}</DOCNO>\n'
    content += f'<HEAD>{title if title else ""}</HEAD>\n'
    content += f'<TEXT>{text if text else ""}</TEXT>\n</DOC>\n'
    
    return content

def write_batch_to_file(batch_content, batch_num):
    batch_filename = f'wb-data-{batch_num}.txt'
    file_path = os.path.join(PATH_DIR_CRAWLED_DATA, batch_filename)
    
    try:
        with open(file_path, 'w', encoding='utf-8', errors='replace') as file:
            file.write(batch_content)
        logging.info(f"Batch {batch_num} written to {file_path}")
    except IOError as e:
        logging.error(f"IOError in write_batch_to_file for batch {batch_num}: {e}", exc_info=True)




### COMBINED HELPER METHODS

In [20]:
def is_qualified(url, header_response):
    
    if url in bad_urls:
        logging.info(f"SKIPPED -- examined as BAD URL: {url}")
        return False

    if not is_http_or_https(url):
        logging.info(f"SKIPPED -- not an HTTP or HTTPS URL: {url}")
        bad_urls.add(url)
        return False
    
    if is_blacklisted(url):
        logging.info(f"SKIPPED -- URL is blacklisted: {url}")
        bad_urls.add(url)
        return False

    if not is_ideal_type(header_response):
        logging.info(f"SKIPPED -- not ideal type for URL: {url}")
        bad_urls.add(url)
        return False

    return True

####  Seeds URL Initialization

In [21]:
def seeds_initialization(seeds):
    wave_num = 0
    logging.info("======================= SEEDS INITIALIZATION STARTED ========================= ")
    canon_seeds = [canonicalize_url(seed) for seed in seeds]

    for canon_seed in canon_seeds:
        text_response, header_response = get_response(canon_seed)
        inlinks_dict[canon_seed] = set()
        update_inlink_count(canon_seed)
        inlink_count = inlinks_counts.get(canon_seed)
        title, anchor_texts = extract_title_and_anchor_texts(text_response)
        # ----- priority score calculation
        score = priority_score(canon_seed, wave_num, inlink_count, title, anchor_texts)
        text_response, _ = get_response(canon_seed)
        add_url(canon_seed, score, wave_num, text_response)
    logging.info("========================= SEEDS INITIALIZATION DONE =========================\n ")


####  Crawl Main

In [22]:
def process_document_data(url, batch_content, text_response):
    # logging.info(f"---- DOCUMENT DATA PROCESSING: {url}")
    try:
        title, text = get_title_and_text(text_response)
        batch_content += formatter(url, title, text)
        logging.info(f"Processed document data for: {url}")
        return batch_content, True
    except Exception as e:
        logging.error(f"Exception while processing document data for URL {url}: {e}", exc_info=True)
        return batch_content, False

In [23]:
def crawl_main(seed_urls):
    
    # seeds initialization: add all seeds to frontier
    seeds_initialization(seed_urls)
    
    batch_num = 1
    batch_content = ""
    count = 0  # Initialize count
    reach_limit = False

    while not frontier.empty():
        
        prior_score, popped_url, wave_number, raw_html_content = frontier.get()
        logging.info(f"POPPED: {popped_url}")

        # all frontier url's has valid html content

        # for each popped url
        # 1. save outlinks
        outlinks = extract_outlinks(popped_url, raw_html_content)
        total_outlinks = len(outlinks)
        if outlinks:
            # canonicalize
            canon_outlinks = [canonicalize_url(outlink) for outlink in outlinks]
            outlinks_dict[popped_url] = set(canon_outlinks)

            if not reach_limit:
                # if not reach limit, check if could add to frontier
                for index, canon_outlink in enumerate(canon_outlinks, start=1):  # Start enumeration at 1
                    logging.info(f"------------------------------------------------------------------------------")  
                    logging.info(f"Analyzing outlink {index}/{total_outlinks} {canon_outlink}")  
                
                    # if in  bad_url
                    if canon_outlink in bad_urls:
                        continue

                    # if already in frontier_urls    
                    if canon_outlink in frontier_urls:
                        update_inlink_count(canon_outlink)
                        update_inlink_dict(canon_outlink, popped_url)
                        continue

                    text_response, header_response = get_response(canon_outlink)
                    # server no repsonse
                    if text_response is None or header_response is None:
                        continue

                    # not qualified
                    if not is_qualified(canon_outlink, header_response):
                        continue

                    ol_wave_num = wave_number + 1
                    
                    logging.info(f"qualified")

                    # ----- priority score calculation preparation
                    update_inlink_count(canon_outlink)
                    update_inlink_dict(canon_outlink, popped_url)

                    # Check if the length of frontier_urls has reached or exceeded the LIMIT
                    inlink_count = inlinks_counts.get(canon_outlink)
                    title, anchor_texts = extract_title_and_anchor_texts(text_response)
                    # ----- priority score calculation
                    score = priority_score(canon_outlink, ol_wave_num, inlink_count, title, anchor_texts)
                    abs_score = get_absolute_score(score, ol_wave_num)
                    # ----- priority score check
                    if abs_score >= MIN_ABS_SCORE: 
                        add_url(canon_outlink, score, ol_wave_num, text_response)  
                        if len(frontier_urls) >= LIMIT:
                            logging.info(f"Frontier length {len(frontier_urls)} limit reached {LIMIT}. Stopping outlink processing.")
                            reach_limit = True
                            break
                    else:
                        logging.info(f"Score Low skipped - abs score({abs_score}) < threshold")
                        bad_urls.add(canon_outlink)
                        continue

        else:
            outlinks_dict[popped_url] = set()


        # 2. save to doc
        logging.info(f"================================================================================================")
        logging.info(f"2. DOCUMENT DATA PROCESSING STARTED {popped_url}")
        batch_content, success = process_document_data(popped_url, batch_content, raw_html_content)
        if success:
            count += 1
            visited_urls.add(popped_url)
            logging.info(f"===== FINISH ANALYSIS OF No.{count} in the frontier: {popped_url}")

            # Write to file if the batch is complete
            if count % BATCH_SIZE == 0:
                logging.info(f"Writing batch content to file....")
                write_batch_to_file(batch_content, batch_num)
                batch_num += 1
                batch_content = ""


        


In [None]:
crawl_main(SEEDS_URLS)

In [25]:
dump_links(outlinks_dict, PATH_OUTLINKS)
dump_links(inlinks_dict, PATH_INLINKS)
dump_links(inlinks_counts, PATH_INLINKS_COUNTS)