# Utilities
> Useful function and utilities

In [None]:
# default_exp utils

In [None]:
# export
import logging
import requests
from pathlib import Path
from typing import Callable, List, Union

from bs4 import BeautifulSoup
from IPython.core import magic
from IPython.display import Javascript
from ipywidgets import Output, VBox, Button, HTML
from PIL import Image
from tqdm.notebook import tqdm
from uuid import uuid4

PathStr = Union[Path, str]

In [None]:
# hide
# To be able to run the tests in the Notebook
!pip install -q ipytest pytest
from pathlib import Path
import ipytest
import sys

ipytest.autoconfig()

root_dir = Path("..").resolve()
sys.path.append(str(root_dir / "test"))


You should consider upgrading via the 'c:\python37-32\python.exe -m pip install --upgrade pip' command.


In [None]:
# exportest
# For Test Cases (might have duplicate import because it will be in a dedicated file)
from pathlib import Path
from shutil import copy, rmtree

import pytest
from PIL import Image
from test_common.utils_4_tests import DATA_DIR, IMG_DIR, check_no_log, check_only_warning

## Image

### Cleaning up error image

In some cases, we have error image that will interrupt model trainging. We can use ```clean_error_img``` to clean the image folder

In [None]:
# export
def check_img(
    img: Path,
    formats: List[str] = [".jpg", ".jpeg", ".png", ".bmp"],
) -> None:
    """
    Check on a single image,
    If it's quality is troublesome
        we unlink/ditch the image
    """
    img = Path(img)
    # check if this path is an image
    if img.suffix.lower().split("?")[0] not in formats:
        return

    try:
        # try to open that image and then (to check truncated image)
        # We need to include in a with block to close the image before deleting
        with Image.open(img) as im:
            im.load()
    except Exception:
        if img.exists():
            img.unlink()
            logging.warning(f"Removed erroneous img: {img}")
            return


def clean_error_img(
    path: Path,
    progress: bool = True,
) -> None:
    """
    - path: an image directory
    - progress: do we print out progress bar or not
        default True
    """
    path = Path(path)

    # check directory existence
    if path.exists() == False:
        raise FileExistsError(
            f"""path does not exists on:{path}, 
    make sure there is a directory "{path.name}".
    under directory "{path.parent}"
    """)

    # create iterator, probably with progress bar
    iterator = tqdm(list(path.iterdir()), leave=False)\
        if progress else path.iterdir()

    for obj in iterator:
        if obj.is_dir():
            # use recursion to clean the sub folder
            clean_error_img(obj, progress=progress)
        else:
            # cheking on a single image
            check_img(obj)

In [None]:
# exportest

images_rob = list((IMG_DIR / "robustness").glob("*.*"))
IMG_RGB = Image.new("RGB", (60, 30), color=(73, 109, 137))
IMG_RGBA = Image.new("RGBA", (60, 30), color=(73, 109, 137, 100))

@pytest.mark.parametrize("img", images_rob, ids=[i.name for i in images_rob])
def test_check_img_error(img: Path, tmpdir, caplog):
    """Test check_img with incorrect images"""
    img_copy = Path(tmpdir) / img.name
    copy(img, img_copy)
    check_img(img_copy)
    check_only_warning(caplog, img.name)
    assert not img_copy.is_file(), f"File {img_copy} not be deleted"


def test_check_img_empty_wrong_suffix(tmpdir, caplog):
    """Test check_img with wrong image that does not have a correct extension"""
    img_path = Path(tmpdir) / "empty.txt"
    img_path.write_bytes(b"")
    check_img(img_path)
    check_no_log(caplog)
    assert img_path.is_file(), f"Image {img_path} not found"


@pytest.mark.parametrize("img", [IMG_RGB, IMG_RGBA])
@pytest.mark.parametrize("ext", ["png", "bmp", "jpg", "jpeg"])
def test_check_img_correct(img: Image, ext: str, tmpdir, caplog):
    """Correct that correct image is not removed"""
    alpha_suffix = "_alpha" if len(img.getcolors()[0][1]) == 4 else ""
    if alpha_suffix and ext.startswith("jp"):
        pytest.skip("JPG does not support RGBA")

    img_path = Path(tmpdir) / f"correct_image_blue{alpha_suffix}.{ext}"
    img.save(img_path)
    check_img(img_path)
    check_no_log(caplog)
    assert img_path.is_file(), f"Image {img_path} not found"


In [None]:
# exportest

def test_clean_error_img(tmpdir, monkeypatch) -> None:
    """Test clean_error_img"""
    for img in images_rob:
        copy(img, Path(tmpdir) / img.name)

    monkeypatch.chdir(tmpdir)
    root = Path(".")

    sub1 = root / "sub1"
    sub2 = root / "sub2"
    sub1.mkdir()
    sub2.mkdir()
    sub3 = sub2 / "sub3"
    sub3.mkdir()


    (sub1 / "file11.BMP").write_text("fake image")
    (sub1/"😱file12 haha.jpg").write_text("fake image")
    (sub1 / "file13 haha.txt").write_text("not image")
    IMG_RGB.save(sub1 / "img14_good.jpg")
    IMG_RGBA.save(sub1 / "img15_good.png")

    (sub2 / "file21.jpg").write_text("fake image")
    (sub2 / "file22.jpeg").write_text("fake image")

    (sub3 / "file31.jpeg").write_text("fake image")
    IMG_RGB.save(sub3 / "img32_good.jpeg")
    IMG_RGBA.save(sub3 / "img33_good.bmp")


    good: List[Path] = list()
    bad: List[Path] = list()

    print("Existing files:")
    for file in root.rglob("*.*"):
        print(f" * {file}")
        if file.suffix.lower() in [".jpg", ".jpeg", ".png", ".bmp"]:
            (good if "good" in file.name else bad).append(file)

    print(" => CLEANING")
    clean_error_img(root, progress=False)

    good_removed = [f for f in good if not f.is_file()]
    assert not good_removed, f"Good pictures deleted: {good_removed}"

    bad_still_here = [f for f in bad if f.is_file()]
    assert not bad_still_here, f"Bad pictures not deleted: {bad_still_here}"


## Magic function ```hush```
> Run things quietly...

This is a magical cell function, so remember to use the ```%%```

Now we have 2 versions of hush, the 1st on is the backup one, that using ipywidget as event trigger

In [None]:
# hide
# @register_cell_magic
def hush(line, cell):
    """
    A magic cell function to collapse the print out
    %%hush
    how_loud = 100
    how_verbose = "very"
    do_some_python_thing(how_loud, how_verbose)
    """
    # the current output widget
    out = Output()
    output_box = VBox([out])
    # default setting is to hide the print out
    output_box.layout.display = "none"
    # the toggling button
    show_btn = Button(description="Show output")

    def toggle_output(o):
        # show output, change the button to hide
        if output_box.layout.display == "none":
            output_box.layout.display = "block"
            show_btn.description = "Hide output"
        # hide output, change the button to show
        else:
            output_box.layout.display = "none"
            show_btn.description = "Show output"

    # assign toggle to event
    show_btn.on_click(toggle_output)

    # A control panel containing a button
    # and the output box
    total_control = VBox([show_btn, output_box])
    display(total_control)

    with out:
        ishell = get_ipython()
        # excute the code in cell
        result = ishell.run_cell(
            cell, silent=False)

    # we still want the error to be proclaimed loudly
    if result.error_in_exec:
        logging.error(f"'{result.error_in_exec}' happened, breaking silence now")
        result.raise_error()

But this version of ```toggle action``` will be stuck by the ongoing interactive

If the process is working on some thing, you can't toggle until the end of the execution.

The further improvement will be move the toggle entirely to JavaScript, hence the second version

### Hush event in JS
> As not going through any amount of python backend after run

In [None]:
# export

@magic.register_cell_magic
def hush(line, cell):
    """
    A magic cell function to collapse the print out
    %%hush
    how_loud = 100
    how_verbose = "very"
    do_some_python_thing(how_loud, how_verbose)
    """
    # the current output widget
    out = Output()
    output_box = VBox([out])
    
    # create uuid for DOM identifying
    uuid = str(uuid4())

    # default setting is to hide the print out
    output_box.layout.display = "none"
    # the toggling button
    show_btn = Button(description="toggle output")
    show_btn.add_class(f"hush_toggle_{uuid}")
    output_box.add_class(f"hush_output_{uuid}")
    
    
    # A control panel containing a button
    # and the output box
    total_control = VBox([show_btn, output_box])
    display(total_control)
    
    # assign JS listener
    display(Javascript(f"""
    console.info("loading toggle event: {uuid}")
    const toggle_hush = (e) =>\u007b
        var op = document.querySelector(".hush_output_{uuid}");
        if(op.style.display=="none")\u007b
             op.style.display="block"

        \u007d else \u007b
            op.style.display="none"
        \u007d

    \u007d
    document.querySelector('.hush_toggle_{uuid}').onclick=toggle_hush
    """))

    with out:
        ishell = get_ipython()
        # excute the code in cell
        result = ishell.run_cell(
            cell, silent=False)

    # we still want the error to be proclaimed loudly
    if result.error_in_exec:
        logging.error(f"'{result.error_in_exec}' happened, breaking silence now")
        result.raise_error()

### Try hushing various kinds of info

* html display
* print
* logging

In [None]:
%%hush
import pandas as pd
import logging
logging.getLogger().setLevel('DEBUG')
a = 1
for i in range(1000):
    print(i, end="\t")
    
logging.info("a lots of text, VERBOSITY!!"*100)

# a big dataframe
display(pd.DataFrame({"a":[1,2]*50}))

# a big output
pd.DataFrame({"b":range(100)})

### Capable of open/close verbosity during the main process

In [None]:
# Testing hush
%%hush
from time import sleep
for i in tqdm(range(10)):
    sleep(1)

### But we still want the error to be loud
> As such information should interrupt the user

In [None]:
# Testing hush when exception is raised
%%hush
def _test_hush():
    for i in range(20):
        print(f"some logging:\t{i}!")
    raise ValueError("but some error, because life!")



## Fetching static files

In [None]:
# export
def static_root() -> Path:
    """
    Return static path
    """
    import unpackai
    unpackai_path = Path(unpackai.__path__[0])
    if unpackai_path.exists() == False:
        with open(f"{unpackai_path}.egg-link","r") as f:
            unpackai_path = Path(f.read())
    return unpackai_path/"static"

STATIC = static_root()

Test the validation of such static path

In [None]:
# exportest
def test_find_static():
    error_report_html = STATIC/"html"/"bug"/"error_report.html"
    assert (error_report_html).is_file(), f"'{STATIC}' is not a valid static path"

## Download and unzip utilities

In [None]:
# export


def get_url_size(url: str) -> int:
    """Returns the size of URL, or -1 if it cannot get it"""
    with requests.request("HEAD", url) as resp:
        if resp.status_code != 200:
            raise ConnectionError(f"Error when retrieving size from {url}")

        return int(resp.headers.get("Content-Length", -1))


In [None]:
# export


def download(url: str, dest: PathStr = None) -> Path:
    """Download file and return the Path of the downloaded file

    If the destination is not specified, download in the current directory
    with the name specified in the URL
    """
    # We can allow empty destination, in which case we use file name in URL
    # We need to ensure that destination is a Path
    dest = Path(dest or url.rpartition("/")[-1])

    # For big files, we will split into chunks
    # We might also consider adding a progress bar
    size = get_url_size(url)
    if size < 1024 * 1024:
        with requests.get(url) as resp:
            resp.raise_for_status()
            dest.write_bytes(resp.content)

    else:
        with requests.get(url, stream=True) as resp:
            resp.raise_for_status()
            with dest.open("wb") as f:
                for chunk in resp.iter_content(chunk_size=8192):
                    f.write(chunk)

    print(f"Downloaded {url} to {dest}")
    return dest


def url_2_text(url: str) -> str:
    """Extract text content from an URL (textual content for an HTML)"""
    resp = requests.get(url)
    if resp.status_code != 200:
        raise ConnectionError(f"Error when retrieving text content from {url}")

    resp.encoding = "utf-8"
    content_type = resp.headers["Content-Type"]
    if "html" in content_type:
        text = BeautifulSoup(resp.text).text
    else:
        text = resp.text
    return text


In [None]:
# exportest

url_raw_txt = "https://raw.githubusercontent.com/unpackAI/unpackai/main/test/test_data/to_download.txt"
test_data_txt = (DATA_DIR / "to_download.txt").read_text()

def test_get_url_size():
    assert get_url_size(url_raw_txt) == 264, f"Wrong size for {url_raw_txt}"

def test_download_dest(tmpdir):
    """Test download of file to a destination"""
    dest = Path(tmpdir / "to_download.txt")
    download(url_raw_txt, dest)
    assert dest.is_file()
    assert dest.read_text() == test_data_txt

def test_download_empty(tmpdir):
    """Test download of file without destination"""
    dest = Path("to_download.txt")
    download(url_raw_txt)
    try:
        assert dest.is_file()
        assert dest.read_text() == test_data_txt
    finally:
        dest.unlink()


def test_url_2_text():
    """Test extraction of text from URL"""
    assert url_2_text(url_raw_txt) == test_data_txt


264

<!-- Hide -->
# Running all Test Cases

In [None]:
# export
try:
    ishell = get_ipython()
except NameError as e:
    from IPython.testing.globalipapp import get_ipython, start_ipython
    ishell = start_ipython()
    print(get_ipython())

In [None]:
# hide
ipytest.run()

##vso[results.publish type=JUnit;runTitle='Pytest results';]e:\AnsysDev\_perso_repo\unpackai\nbs\test-output.xml

-- generated xml file: e:\AnsysDev\_perso_repo\unpackai\nbs\test-output.xml ---
no tests ran in 0.01s
