# Utils
> supporting functions

In [59]:
# | default_exp utils.utils

In [60]:
#| export
from PIL.Image import Image

In [61]:
# | exporti

import os
from typing import List, Tuple, Union

import re
import pathlib
import unicodedata
import json
import chardet
from urllib.parse import urljoin, urlparse

from bs4 import BeautifulSoup
from markdownify import MarkdownConverter
import PIL

import datetime as dt
import base64

import asyncio

import pptx2md

import zipfile
import io


import datetime as dt
from dateutil.parser import parse as dtu_parse

from dotenv import set_key, load_dotenv

from nbdev.showdoc import patch_to

In [62]:
# | hide
from nbdev.showdoc import show_doc
from dotenv import load_dotenv
import domolibrary_extensions.google.auth as ga

load_dotenv('../.env')



True

# Code Execution

In [63]:
#| export
async def gather_with_concurrency(
    *coros,  # list of coroutines to await
    n=60,  # number of open coroutines
):
    """limits the number of open coroutines at a time."""

    semaphore = asyncio.Semaphore(n)

    async def sem_coro(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(*(sem_coro(c) for c in coros))
     

# File Management

In [64]:
#|exports

def rename_filepath_to_match_datatype(data, file_path):

    is_path_ext = os.path.splitext(file_path)[-1].lower()

    old_suffix = pathlib.Path(file_path).suffix if is_path_ext else None

    new_suffix = ''

    if isinstance(data, str) or isinstance(data, bytes) or isinstance(data, bytearray) : new_suffix = '.txt'
    if isinstance(data, dict) : new_suffix = '.json'

    file_path = file_path+new_suffix
    
    if old_suffix:
        file_path = file_path.replace(old_suffix,'')

    return file_path

In [65]:
content = [ "hello world" , {"a" : "b"}, b'\xC3\xA9', bytearray(b'\x02\x03\x05\x07')]

[rename_filepath_to_match_datatype(test, "/Users/pankaj/abc") for test in content]

<class 'str'>
<class 'dict'>
<class 'bytes'>
<class 'bytearray'>


['/Users/pankaj/abc.txt',
 '/Users/pankaj/abc.json',
 '/Users/pankaj/abc.txt',
 '/Users/pankaj/abc.txt']

In [66]:
# | exports
def detect_encoding(file_path, debug_prn: bool = False):
    detector = chardet.universaldetector.UniversalDetector()
    with open(file_path, "rb") as f:
        for line in f:
            detector.feed(line)
            if detector.done:
                break
    detector.close()

    encoding = detector.result

    return encoding

In [67]:
detect_encoding("./utils.ipynb")

{'encoding': 'utf-8', 'confidence': 0.99, 'language': ''}

In [68]:
# | export
def read_html_file(
    file_path, is_convert_to_soup: bool = True
) -> Union[str, BeautifulSoup]:
    if not os.path.exists(file_path):
        raise FileNotFoundError(file_path)

    page_encoding = detect_encoding(file_path)

    with open(file_path, encoding=page_encoding["encoding"]) as fp:
        if is_convert_to_soup:
            return BeautifulSoup(fp, "lxml")

        return fp.read()

# Handle URLS

In [69]:
# | exports
def remove_query_params_from_url(url):
    u = urlparse(url)
    return urljoin(url, urlparse(url).path)

In [70]:
test_urls = [
    "https://domo-support.domo.com/s/article/36004740075",
    "https://domo-support.domo.com/s/topic/0TO5w000000ZlOmGAK/20202023",  # list of articles
    "https://domo-support.domo.com/s/topic/0TO5w000000Zan7GAC/archived-feature-release-notes",  # list of topics
    "https://domo-support.domo.com/s/knowledge-base",
]

[remove_query_params_from_url(url) for url in test_urls]

['https://domo-support.domo.com/s/article/36004740075',
 'https://domo-support.domo.com/s/topic/0TO5w000000ZlOmGAK/20202023',
 'https://domo-support.domo.com/s/topic/0TO5w000000Zan7GAC/archived-feature-release-notes',
 'https://domo-support.domo.com/s/knowledge-base']

In [71]:
# | exports
def update_env(env_path: str, key: str, value: str, debug_prn: bool = False) -> dict:
    """
    updates a .env file with a key value pair
    then reloads the env_file
    """

    if not os.path.exists(env_path):
        with open(env_path, "w", encoding="utf-8") as f:
            f.write("")

    quote_mode = "always"

    if isinstance(value, dict):
        quote_mode = "never"
        value = json.dumps(value)

    if debug_prn:
        from pprint import pprint

        pprint(
            {
                "env_path": env_path,
                "key": key,
                "value": value,
                "type": type(value),
                "quote_mode": quote_mode,
            }
        )

    set_key(env_path, key, value, quote_mode=quote_mode)

    set_key(env_path, "env_last_modified", f"updated - {dt.date.today()}")

    load_dotenv(env_path, override=True)

    return {key: os.getenv(key)}

In [72]:
# | exports
def upsert_folder(folder_path: str, debug_prn: bool = False):
    folder_path = os.path.dirname(folder_path)

    if debug_prn:
        print(
            {
                "upsert_folder": os.path.abspath(folder_path),
                "is_exist": os.path.exists(folder_path),
            }
        )

    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

In [73]:
# |exports


def get_all_files_and_folders(
    directory, file_type=None  # to only retrieve a specific file type
) -> Union[Tuple, List]:
    """walk a directory and retrieve a list of files and a list of directory
    returns Tuple of file_ls , dir_ls OR file_ls if file_type supplied
    """
    if not os.path.exists(directory):
        raise FileNotFoundError(directory)

    file_ls = []
    dir_ls = []
    for root, dirs, files in os.walk(directory):
        for name in files:
            if file_type:
                if not name.lower().endswith(file_type.lower()):
                    continue
            file_ls.append(os.path.join(root, name))

        if file_type:
            continue

        for name in dirs:
            dir_ls.append(os.path.join(root, name))

    if file_type:
        return file_ls

    return file_ls, dir_ls

In [74]:
# Use the function
get_all_files_and_folders("../jira", ".json")

['../jira/CACHE/onyxreporting_atlassian_net/rest/agile/1_0/board.json',
 '../jira/CACHE/onyxreporting_atlassian_net/rest/agile/1_0/epic/10002.json',
 '../jira/CACHE/onyxreporting_atlassian_net/rest/agile/1_0/board/1.json',
 '../jira/CACHE/onyxreporting_atlassian_net/rest/agile/1_0/board/3/epic.json',
 '../jira/CACHE/onyxreporting_atlassian_net/rest/agile/1_0/board/3/issue.json',
 '../jira/CACHE/onyxreporting_atlassian_net/rest/api/2/myself.json']

## handle converting Files to Markdown

In [75]:
# | exports


class ImageBlockConverter(MarkdownConverter):
    """
    Create a custom MarkdownConverter that adds two newlines after an image
    """

    def convert_img(self, el, text, convert_as_inline, is_resize: bool = True):
        """
        custom image downloader for ImabeBlockConverter
        will handle resize
        """

        if is_resize:
            style_obj = {
                (obj.split(":")[0].strip()): obj.split(":")[1].strip()
                for obj in el.get("style").split(";")
                if ":" in obj
            }

            file_path = os.path.join(
                os.path.dirname(self.options["file_path"]), el["src"]
            )

            image = PIL.Image.open(file_path)

            width = style_obj["width"].replace("px", "")
            width = int(float(width))

            height = style_obj["height"].replace("px", "")
            height = int(float(height))

            new_image = image.resize((width, height))
            new_image.save(file_path)

        return super().convert_img(el, text, convert_as_inline)


def md(html, **options):
    """Create shorthand method for handling conversion"""
    return ImageBlockConverter(**options).convert(html)

In [76]:
# |exports
def convert_html_to_markdown(file_path):
    """converts html file to markdown in place"""

    with open(file_path, encoding="utf-8") as f:
        html = f.read()

    markdown_content = md(
        str(html),
        keep_inline_images_in=["td", "span"],
        file_path=file_path,
        is_resize=True,
    )

    md_path = file_path.replace(".html", ".md")

    with open(md_path, "w+", encoding="utf-8") as f:
        f.write(markdown_content)

    return

In [77]:
# | exports


def download_zip(zip_bytes_content, output_folder, is_convert_to_markdown: bool = True):
    """save bytes content to a zip file then convert html to markdown"""

    zip = zipfile.ZipFile(io.BytesIO(zip_bytes_content), "r")
    zip.extractall(output_folder)

    file_ls = os.listdir(output_folder)

    # rename the html file to index.html
    for file_name in file_ls:
        if file_name.endswith(".html"):
            output_index = os.path.join(output_folder, "index.html")
            os.replace(os.path.join(output_folder, file_name), output_index)

            if is_convert_to_markdown:
                convert_html_to_markdown(os.path.join(output_folder, "index.html"))

    return f"successfully downloaded zip to {output_folder}"

#### sample implementation of downloading a zip from google docs and converting it to markdown

In [78]:
# import domolibrary_extensions.google.auth as ga

DOCUMENT_ID = "1j7XsbvFy0xUgGL6i-3LSChKvzSmTZSOyimEt6tQS-Kk"

# generates Credentials object
try:
    google_auth = ga.GoogleAuth.get_creds_from_env(
        credentials_env_key="GDOC_KEY",
        token_env_key="GDOC_TOKEN",
    )

        
    content = (
        google_auth.service.files()
        .export(fileId=DOCUMENT_ID, mimeType="application/zip")
        .execute()
    )

    download_zip(content, "../TEST/utils/drive_converter-download_zip")

except Exception as e:
    print(e)

using saved token
refreshing creds using saved token


HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x7f829807b4c0>: Failed to resolve 'oauth2.googleapis.com' ([Errno -3] Temporary failure in name resolution)"))


In [79]:
# |exports


def download_pptx(
    pptx_bytes_content, output_folder, is_convert_to_markdown: bool = True
):
    """save bytes content to a pptx file then converts to markdown"""

    upsert_folder(output_folder)

    output_ppt_index = os.path.join(output_folder, "index.pptx")

    with open(output_ppt_index, "wb+") as binary_file:
        # Write bytes to file
        binary_file.write(pptx_bytes_content)

    if is_convert_to_markdown:
        pptx2md.convert(
            output_ppt_index,
            output=os.path.join(output_folder, "index.md"),
            image_dir=os.path.join(output_folder, "images"),
        )

    return f"successfully downloaded content to {output_folder}"

#### sample implementation of download pptx from google drive

In [80]:
# | import domolibrary_extensions.google.auth as ga

SLIDE_ID = "1_k4NRraKI1TmHNlpQCuqJrWr6dP7DNracdMCtfN8XlM"

try:
    # generates Credentials object
    google_auth = ga.GoogleAuth.get_creds_from_env(
        credentials_env_key="GDOC_KEY", token_env_key="GDOC_TOKEN"
    )

    content = (
        google_auth.service.files()
        .export(
            fileId=SLIDE_ID,
            mimeType="application/vnd.openxmlformats-officedocument.presentationml.presentation",
        )
        .execute()
    )
    download_pptx(content, "../TEST/utils/drive_converter-download_pptx")

except Exception as e:
    print(e)

using saved token
refreshing creds using saved token
HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: /token (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x7f829807bc10>: Failed to resolve 'oauth2.googleapis.com' ([Errno -3] Temporary failure in name resolution)"))


# Conversion

In [81]:
# | exports
def convert_str_to_snake_case(text_str):
    """converts 'snake_case_str' to 'snakeCaseStr'"""

    return text_str.replace(" ", "_").lower()

In [82]:
# | exports
def convert_str_remove_accents(text_str: str) -> str:
    return "".join(
        c
        for c in unicodedata.normalize("NFD", text_str)
        if unicodedata.category(c) != "Mn"
    )

In [83]:
convert_str_remove_accents("est être"), convert_str_remove_accents("kožušček")

('est etre', 'kozuscek')

In [84]:
# | exports
def convert_str_keep_alphanumeric(text_str) -> str:
    pattern = "[^0-9a-zA-Z_\s]+"

    return re.sub(pattern, "", text_str)

In [85]:
# | exports
def convert_str_file_name(text_str: str) -> str:
    """convert strings to clean file name or url"""

    return convert_str_keep_alphanumeric(
        convert_str_to_snake_case(convert_str_remove_accents(text_str))
    )

In [86]:
convert_str_file_name("Register Snowflake with Cloud Amplifier"), convert_str_file_name(
    "Kožušček and Beast Modes"
)

('register_snowflake_with_cloud_amplifier', 'kozuscek_and_beast_modes')

In [87]:
# | exports
def convert_str_to_date(datefield: str) -> dt.datetime:
    """converts string date to datetime object"""
    return dtu_parse(datefield) if datefield else None

In [88]:
convert_str_to_date("2023-10-01")

datetime.datetime(2023, 10, 1, 0, 0)

In [89]:
# | hide
import nbdev

nbdev.nbdev_export("./utils.ipynb")