In [None]:
import requests
from bs4 import BeautifulSoup
import re
import pandas as pd
from collections import deque
import logging
# from joblib import Memory
import time
from pathlib import Path
from pprint import pprint
import mimetypes
from datetime import datetime
import traceback

In [None]:
logging.basicConfig(format='%(asctime)s - %(levelname)s:%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# logger.setLevel(logging.INFO)

In [None]:
import requests_cache

requests_cache.install_cache('scraper_cache', backend='sqlite', expire_after=3600*24*14)

In [None]:
# requests_cache.clear() 

In [None]:
# cachedir = './cache' 
# memory = Memory(cachedir, verbose=0)

In [None]:
root_url = "https://pleasantontx.civicweb.net"
subdomain = root_url.split("https://")[-1].split(".civicweb.net")[0]
HEADERS = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36'}

In [117]:
# @memory.cache
def fetch_webpage(url, headers=HEADERS):
    try:
        response = requests.get(url, headers=headers)
        return response
    except Exception as e:
        logger.error(e)
        return None
    
# def get_soup(url_append, root_url=root_url, headers=HEADERS):
#     url = root_url + url_append
#     # TODO: implement caching using joblib
#     # cached_result = cache.get('html:%s' % url)
#     # if cached_result:
#     #     return cached_result
#     print(f"Getting BeautifulSoup object from {url}")
#     try:
#         response = requests.get(url, headers=headers)
#         # cache.set('html:%s' % url, page)
#         bs = BeautifulSoup(response.content,'html.parser')
#         return bs
#     except Exception as e:
#         print("Error: ", e)
#         return None
    
# def get_item(bs:BeautifulSoup, items, parent_path="/", is_folder=True):
#     ''' 
#     parent_path (str):
#         The path to the current folder, relative to the root of the website. Must start with a slash. At least one slash is required to indicate the root of the website.
#     '''
#     if is_folder:
#         class_str = 'folder-link'
#     else:
#         class_str = 'file-link'

#     for item in bs.find_all("a", class_=class_str):
#         if item.get('href') not in [i["url"] for i in items]:
#             item_info  = {
#                 "name": item.text, 
#                 "url": item.get('href'), 
#                 "parent_path": parent_path
#             }
#             items.append(item_info)
#     return items
    
def get_folders(bs:BeautifulSoup, folders:deque, parent_url:str, parent):
    ''' 
    parents : list(str)
        List of the folders on the path to the current folder, relative to the root of the website. parent_path[0] should be the first folder on the path, and parent_path[-1] is the current folder. parent_path=[] indicates that the folder belongs to the root.
    '''
    logger.debug("folder parents: %s", parent)
    for folder in bs.find_all("a", class_='folder-link'):
        folder_info = {
                "name": folder.text.strip(), 
                "url": folder.get('href'),
                "parent": parent,
                "parent_url": parent_url
            }
        logger.debug(f"{folder}")

        if folder.get('href') not in [f["url"] for f in folders]:
            logger.debug("Adding to folders deque to visit")
            folders.append(folder_info)
    return folders

def get_items(bs:BeautifulSoup, parent_url:str, parent=[], is_folder=True)->list[dict]:
    ''' 
    Return a list of dictionaries containing information for each item in the current folder. Items are either documents or folders.
    
    Parameters
    ----------
    bs: BeautifulSoup
        The BeautifulSoup object to parse. Should be the documents site for a website under the civicweb.net domain.
    parent_url : str
        The url of the current folder. This is the url appended onto the root of the website, and should start with "/".
    parent : list(str)
        List of the folders on the path to the current item, relative to the root of the website. parent[0] should be the first folder on the path, and parent[-1] is the current folder. parent=[] indicates that the folder belongs to the root.
    is_folder  : bool
        Whether or not the current folder is a folder (True) or a document (False).
    
    Returns
    -------
    items : list[dict]
        A list of dictionaries containing information for each item in the current folder. Each dictionary contains the following keys:
        - name : The name of the item.
        - url : The url of the item.
        - parent_url : The url of the parent folder.
        - parent : A list of the folders on the path to the current item.
    '''
    logger.debug("folder parent: %s", parent)

    child_items = []
    if is_folder: 
        item_class = "folder-link" 
    else: 
        item_class = "document-link" 

    for item in bs.find_all("a", class_=item_class):
        item_info = {
                "name": item.text.strip(), 
                "url": item.get('href'),
                "parent": parent,
                "parent_url": parent_url
            }
        logger.debug(f"Found {item_class} with name {item_info['name']} (url: {item_info['url']}) in directory at {'/'.join(item_info['parent'])} (url: {parent_url}).")
        child_items.append(item_info)
    return child_items

def get_documents(bs:BeautifulSoup, documents:list, parent_url:str, parent:list):
    for doc in bs.find_all("a", class_='document-link'):
        document_info = {
                "name": doc.text.strip(),
                "url": doc["href"],
                "parent": parent,
                "parent_url": parent_url
            }
        logger.debug(f"Found document with name {document_info['name']} in {'/'.join(document_info['parent'])}, fetched from {document_info['url']}")
        if doc.get('href') not in [d["url"] for d in documents]:
            logger.debug(f"Adding {document_info['name']} to documents list to download later")
            documents.append(document_info)
    return documents

def get_filetype(response:requests) -> tuple[str, str]:
    ''' 
    Returns the extension and file type (MIME notation) of a response object based on its Content-Type header.
    '''
    header_mimetype = response.headers['Content-Type']

    # remove mimetype parameter if it exists
    if ";" in header_mimetype:
        header_mimetype = header_mimetype.split(";")[0]
    extension = mimetypes.guess_extension(header_mimetype)

    # raise error if file extension cannot be guessed from response
    if extension is None or extension == "":
        logger.warning("Could not determine file type for response")
        raise TypeError(f"Could not determine file type")
    
    return extension, header_mimetype

# def format_output_path(parents:list, subdomain="", out_folder=Path.cwd() / "out"):
#     '''
#     Formats the output path of the file to be downloaded. The output path will separate the file by subdomain and preserve the file structure from the scraped website.

#     parents : list(str)
#         The list of directories that the file is in from the root folder of the main website.
#     subdomain : str, optional
#         The subdomain of the website to be scraped from. Defaults to "". Used to separate scraped content into specific folders within the out_path.
#     '''
#     return out_folder.joinpath(subdomain, *parents)

def download_file(response:requests, filename:str,  out_path=Path.cwd() / "out"):
    ''' 
    Downloads a PDF file from the given response.
    
    Parameters
    ----------
    response : requests.Response
        The response object.
    filename : str
        The name of the file being downloaded. Should include the extension.
    
    out_path : Path, optional
        The path where the file should be saved on the disk, as a pathlib Path object. Defaults to a folder in the current working directory named "out". Does not need to exist.
    '''
    # out_path = Path(__file__).parent / out_path / subdomain / Path(*parents) / name + ".pdf" # use Path.cwd() for Jupyter Notebook

    out_path.mkdir(parents=True, exist_ok=True) # ensure directories leading up to the output file path exist
    with open(out_path / filename, "wb") as f:
        f.write(response.content)
        f.close()

In [None]:
folders = deque([]) 
done_folders = deque([])

In [None]:
# testpath = Path("out")
# testpath = testpath.joinpath("", "test")
# testpath.mkdir(parents=True, exist_ok=True)

In [None]:
# add the folders at the root to be processed
response = fetch_webpage(url=root_url+"/filepro/documents/")
bs = BeautifulSoup(response.content,'html.parser')
folders.extend(get_items(bs, parent_url="/filepro/documents/",parent=[], is_folder=True))

In [None]:
folders

In [None]:
documents = []

In [None]:
documents

In [None]:
# memory.clear()

In [None]:
# breadth-first search to find all subfolders and documents
while len(folders)>0:
    time.sleep(1)
    logger.debug("folders to visit: %s", "\n".join([str(folder) for folder in folders]))
    logger.debug("completed folders: %s", "\n".join([str(folder) for folder in done_folders]))
    
    curr_folder = folders.popleft()
    logger.info(f"\nSearching folder {curr_folder['name']} at location /{'/'.join(curr_folder['parent'])}...") 
    try:
        response = fetch_webpage(url=root_url+curr_folder["url"])
        bs = BeautifulSoup(response.content,'html.parser')

        # update current folder
        curr_path = curr_folder["parent"].copy()
        curr_path.append(curr_folder["name"])
        logger.debug("currrent folder's parents:%s", curr_folder["parent"])
        logger.debug("current folder's name:%s", curr_folder["name"])
        logger.debug("parent path to add to children folders/documents: %s", curr_path)
        
        # add subfolders to visit from this folder to the folders deque
        # initial_folders = len(folders)
        children_folders = get_items(bs,parent_url=curr_folder["url"], parent=curr_path, is_folder=True)
        folders.extend(children_folders)

        # add documents to download from this folder to the documents list
        # initial_documents = len(documents)
        children_documents = get_items(bs,parent_url=curr_folder["url"], parent=curr_path, is_folder=False)
        documents.extend(children_documents)
        
        logger.info(f"Found {len(children_folders)} folders and {len(children_documents)} documents at this location.")
    except Exception as e:
        tb_str = ''.join(traceback.format_exception(e))
        logger.error(tb_str)
        continue

    done_folders.append(curr_folder)
    curr_path = []
    
    logger.info(f"Completed folder {curr_folder['name']}\n")

In [None]:
print(folders)
print(done_folders)

In [None]:
documents

## downloading documents

In [None]:
# # https://pleasantontx.civicweb.net/filepro/documents/5688/ #has 4 documents
# bs = get_soup(url_append='/filepro/documents/5688/', root_url=root_url)

In [None]:
# documents = []
# documents = get_documents(bs, documents)

In [None]:
# documents

In [None]:
# documents = []
# for doc in bs.find_all("a", class_='document-link'):
#     document_info = {
#         "name": doc.text.strip(),
#         "url": doc["href"],
#         "subdomain": subdomain,
#         }
#     documents.append(document_info)
# print(documents)

In [None]:
len(documents)

In [None]:
# def get_parent_url(document:dict, folders:deque=done_folders):
#     return [folder for folder in folders 
#     if "".join(folder["parents"])+folder["name"] == "".join(document["parents"])
#     ]

In [None]:
OUT_FOLDER = Path.cwd() / "out"
rows = []
for document in documents[:10]:
    error = ""
    file_extension = ""
    try:
        t1 = time.time()
        response = fetch_webpage(url=root_url+document["url"], headers=HEADERS)
        t2 = time.time()
        logger.info((f"Took {round((t2-t1),3)} seconds to get page."))
        out_path = OUT_FOLDER.joinpath(subdomain, *document["parent"])

        file_extension, file_type = get_filetype(response)
        logger.info((f"{document['name']}{file_extension}"))
        download_file(
            response, 
            filename=document["name"]+file_extension,
            out_path=out_path)
    except Exception as e:
        tb_str = ''.join(traceback.format_exception(e))
        logger.error(tb_str)
        error += e # update with error
    finally:
        download_dict = {
            "name": document["name"]+file_extension,
            "file_type": file_type,
            "subdomain": subdomain,
            "parent_path": "/".join(document["parent"]),
            "root_url": root_url, 
            "url": document["url"], 
            "parent_url": document["parent_url"],
            "time_scraped": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "'error'": error
        }
        rows.append(download_dict)

In [None]:
rows

In [None]:
out_df = pd.DataFrame(rows)
out_df

In [None]:
out_df.to_csv(OUT_FOLDER / "scraped_files.csv")

In [None]:
in_df = pd.read_csv(OUT_FOLDER / "scraped_files.csv", index_col=[0])

In [None]:
in_df

In [None]:
# t1 = time.time()
# response = requests.get(root_url+documents[0]["url"], headers=HEADERS)
# t2 = time.time()
# print(f"Took {round((t2-t1),3)} seconds to get page.")

# t3 = time.time()
# with open("./out/"+documents[0]["name"]+".pdf", 'wb') as pdf:
#     pdf.write(response.content)
#     pdf.close()
# t4 = time.time()
# print(f"Took {round((t4-t3),3)} seconds to save pdf.")

In [None]:
# documents[0] and [1] are in folder /filepro/documents/5869 
response = fetch_webpage(url_append='/filepro/documents/5869', root_url=root_url, headers=HEADERS)
bs = BeautifulSoup(response.content,'html.parser')

In [None]:
response = fetch_webpage(url_append=documents[0]["url"], root_url=root_url, headers=HEADERS)
bs = BeautifulSoup(response.content)

In [None]:
download_file(response, filename=documents[0]["name"]+".html", parents=documents[0]["parents"], subdomain=subdomain, out_path = Path.cwd() / "out")

In [None]:
response.headers['Content-Type']

In [None]:
response.headers['Content-Type'].split(";")[0]

In [None]:
# print(mimetypes.guess_extension(response.headers['Content-Type'])) # results in None
print(mimetypes.guess_extension(response.headers['Content-Type'].split(";")[0]))

In [None]:
print(mimetypes.guess_extension('text/html'))

In [None]:
print(mimetypes.guess_extension(response1.headers['Content-Type']))

In [None]:
mimetypes.types_map['.html']

In [None]:
print(response.headers)

In [None]:
response1 = fetch_webpage(url_append=documents[1]["url"], root_url=root_url, headers=HEADERS)
bs1 = BeautifulSoup(response1.content)

In [None]:
print(response1.headers)

In [None]:
print(response1.headers["Content-Disposition"])

In [None]:
print(response1.headers.keys)

In [None]:
print(response.headers["Content-Type"])

In [None]:
response = fetch_webpage(url_append=documents[1]["url"], root_url=root_url, headers=HEADERS)
bs = BeautifulSoup(response.content)

In [None]:
print(response.headers["Content-Type"])

In [None]:
mimetypes.guess_extension('image/jpeg')